- 浏览: 667849 次
- 性别:
- 来自: 长沙
文章分类
- 全部博客 (364)
- quick start (57)
- bboss aop (43)
- bboss mvc (48)
- bboss persistent (96)
- bboss taglib (30)
- bboss event (10)
- bbossgroups (52)
- bboss (32)
- bboss会话共享 (17)
- bboss rpc (7)
- bboss 国际化 (5)
- bboss 序列化 (9)
- bboss cxf webservice (8)
- bboss hessian (3)
- bboss 安全认证SSO (15)
- bboss 工作流 (6)
- 平台 (18)
- bboss quartz (3)
- 杂谈 (5)
- 大数据 (1)
- bboss elastic (24)
- bboss http (1)
- bboss kafka (1)
- Elasticsearch Scroll和Slice Scroll查询API使用案例 (1)
最新评论
-
qianhao123:
...
采用gradle构建和发布bboss方法介绍 -
qianhao123:
[img][/img]
采用gradle构建和发布bboss方法介绍 -
yin_bp:
欢迎大家参与working
高性能elasticsearch ORM开发库使用介绍 -
qq641879434:
万分感谢
bboss 持久层sql xml配置文件编写和加载方法介绍 -
yin_bp:
qq641879434 写道怎么设置配置文件 可以查看执行的S ...
bboss 持久层sql xml配置文件编写和加载方法介绍
The best elasticsearch highlevel java rest api-----bboss
基于bboss持久层和bboss elasticsearch客户端实现数据库数据导入es案例分享(支持各种数据库和各种es版本)
通过bboss,可以非常方便地将数据库表数据导入到es中:
1.案例对应的源码
批量导入:https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/TestDB2ESImport.java
定时增量导入:https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/ScheduleImportTaskTest.java
2.在工程中导入jdbc es maven坐标
本文从mysql数据库表td_cms_document导入数据到es中,除了导入上述maven坐标,还需要额外导入mysql驱动坐标:
3.配置es地址
新建application.properties文件,内容为:
elasticsearch.rest.hostNames=10.21.20.168:9200
## 集群地址用逗号分隔
#elasticsearch.rest.hostNames=10.180.211.27:9200,10.180.211.28:9200,10.180.211.29:9200
4.编写简单的导入代码
同步批量导入
说明:从数据库检索数据放入批处理列表,到达batchsize就提交一次作业,最多threadcount个工作线程并行处理作业,如果线程都在忙,没有空闲的工作线程,那么作业就会放到队列里面排队,如果队列也满了,则会阻塞等待释放的队列位置,每等待100次打印一次等待次数的日志。
batchsize,queue,threadcount的配置要结合服务器的内存和cpu配置来设置,设置大了容易内存溢出,设置小了影响处理速度,所以要权衡考虑。
导入的时候需要观察服务端的write线程池的状态,如果出现reject任务的情况,就需要调优elasticsearch配置参数:
thread_pool.bulk.queue_size: 1000 es线程等待队列长度
thread_pool.bulk.size: 10 线程数量,与cpu的核数对应
一个有字段属性映射的稍微复杂案例实现
定时增量导入
源码文件 https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/ScheduleImportTaskTest.java
5 开发交流
完整的demo工程
https://gitee.com/bbossgroups/eshelloword-booter
elasticsearch技术交流群:166471282
elasticsearch微信公众号:
bboss微信公众号:
基于bboss持久层和bboss elasticsearch客户端实现数据库数据导入es案例分享(支持各种数据库和各种es版本)
通过bboss,可以非常方便地将数据库表数据导入到es中:
- 支持逐条数据导入
- 批量数据导入
- 批量数据多线程并行导入
- 定时增量(串行/并行)数据导入
- 下面详细介绍本案例。
1.案例对应的源码
批量导入:https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/TestDB2ESImport.java
定时增量导入:https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/ScheduleImportTaskTest.java
2.在工程中导入jdbc es maven坐标
<dependency> <groupId>com.bbossgroups.plugins</groupId> <artifactId>bboss-elasticsearch-rest-jdbc</artifactId> <version>5.2.5</version> </dependency>
本文从mysql数据库表td_cms_document导入数据到es中,除了导入上述maven坐标,还需要额外导入mysql驱动坐标:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency>
3.配置es地址
新建application.properties文件,内容为:
elasticsearch.rest.hostNames=10.21.20.168:9200
## 集群地址用逗号分隔
#elasticsearch.rest.hostNames=10.180.211.27:9200,10.180.211.28:9200,10.180.211.29:9200
4.编写简单的导入代码
同步批量导入
public void testSimpleImportBuilder(){ ImportBuilder importBuilder = ImportBuilder.newInstance(); try { //清除测试表数据 ElasticSearchHelper.getRestClientUtil().dropIndice("dbclobdemo"); } catch (Exception e){ } //数据源相关配置,可选项,可以在外部启动数据源 importBuilder.setDbName("test") .setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true") //通过useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,useCursorFetch必须和jdbcFetchSize参数配合使用,否则不会生效 .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(false);//是否使用连接池 //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑 importBuilder.setSql("select * from td_cms_document"); /** * es相关配置 */ importBuilder .setIndex("dbclobdemo") //必填项 .setIndexType("dbclobdemo") //必填项 .setRefreshOption(null)//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId .setBatchSize(10000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理 .setJdbcFetchSize(20000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效 /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.db2es(); } 可以直接运行上述代码,查看数据导入效果。 异步批量导入 public void testSimpleLogImportBuilderFromExternalDBConfig(){ ImportBuilder importBuilder = ImportBuilder.newInstance(); try { //清除测试表 ElasticSearchHelper.getRestClientUtil().dropIndice("dbdemo"); } catch (Exception e){ } //数据源相关配置,可选项,可以在外部启动数据源 importBuilder.setDbName("test") .setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true") .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(false);//是否使用连接池 //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑 importBuilder.setSql("select * from td_sm_log"); /** * es相关配置 */ importBuilder .setIndex("dbdemo") //必填项 .setIndexType("dbdemo") //必填项 .setRefreshOption(null)//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId .setBatchSize(5000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理 .setJdbcFetchSize(10000);//设置数据库的查询fetchsize /** * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入 importBuilder.setQueue(100);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(200);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回 importBuilder.setRefreshOption("refresh"); // 为了实时验证数据导入的效果,强制刷新数据,生产环境请设置为null或者不指定 /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.db2es(); long count = ElasticSearchHelper.getRestClientUtil().countAll("dbdemo"); System.out.println("数据导入完毕后索引表dbdemo中的文档数量:"+count); }
说明:从数据库检索数据放入批处理列表,到达batchsize就提交一次作业,最多threadcount个工作线程并行处理作业,如果线程都在忙,没有空闲的工作线程,那么作业就会放到队列里面排队,如果队列也满了,则会阻塞等待释放的队列位置,每等待100次打印一次等待次数的日志。
batchsize,queue,threadcount的配置要结合服务器的内存和cpu配置来设置,设置大了容易内存溢出,设置小了影响处理速度,所以要权衡考虑。
导入的时候需要观察服务端的write线程池的状态,如果出现reject任务的情况,就需要调优elasticsearch配置参数:
thread_pool.bulk.queue_size: 1000 es线程等待队列长度
thread_pool.bulk.size: 10 线程数量,与cpu的核数对应
一个有字段属性映射的稍微复杂案例实现
public void testImportBuilder(){ ImportBuilder importBuilder = ImportBuilder.newInstance(); try { //清除测试表 ElasticSearchHelper.getRestClientUtil().dropIndice("dbclobdemo"); } catch (Exception e){ } //数据源相关配置,可选项,可以在外部启动数据源 importBuilder.setDbName("test") .setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true") .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(false);//是否使用连接池 //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑 importBuilder.setSql("select * from td_cms_document"); /** * es相关配置 */ importBuilder .setIndex("dbclobdemo") //必填项 .setIndexType("dbclobdemo") //必填项 .setRefreshOption(null)//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh"); .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId .setEsIdField("documentId")//可选项 .setEsParentIdField(null) //可选项,如果不指定,es自动为文档产生id .setRoutingValue(null) //可选项 importBuilder.setRoutingField(null); .setEsDocAsUpsert(true)//可选项 .setEsRetryOnConflict(3)//可选项 .setEsReturnSource(false)//可选项 .setEsVersionField(null)//可选项 .setEsVersionType(null)//可选项 .setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") //可选项,默认日期格式 .setLocale("zh_CN") //可选项,默认locale .setTimeZone("Etc/UTC") //可选项,默认时区 .setBatchSize(50) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理 .setJdbcFetchSize(10000);//设置数据库的查询fetchsize /** * db-es mapping 表字段名称到es 文档字段的映射:比如document_id -> docId * 可以配置mapping,也可以不配置,默认基于java 驼峰规则进行db field-es field的映射和转换 */ importBuilder.addFieldMapping("document_id","docId") .addFieldMapping("docwtime","docwTime") .addIgnoreFieldMapping("channel_id");//添加忽略字段 /** * 为每条记录添加额外的字段和值 * 可以为基本数据类型,也可以是复杂的对象 */ importBuilder.addFieldValue("testF1","f1value"); importBuilder.addFieldValue("testInt",0); importBuilder.addFieldValue("testDate",new Date()); importBuilder.addFieldValue("testFormateDate","yyyy-MM-dd HH",new Date()); TestObject testObject = new TestObject(); testObject.setId("testid"); testObject.setName("jackson"); importBuilder.addFieldValue("testObject",testObject); /** * 重新设置es数据结构 */ importBuilder.setDataRefactor(new DataRefactor() { public void refactor(Context context) throws Exception { CustomObject customObject = new CustomObject(); customObject.setAuthor((String)context.getValue("author")); customObject.setTitle((String)context.getValue("title")); customObject.setSubtitle((String)context.getValue("subtitle")); context.addFieldValue("docInfo",customObject);//如果还需要构建更多的内部对象,可以继续构建 //上述三个属性已经放置到docInfo中,如果无需再放置到索引文档中,可以忽略掉这些属性 context.addIgnoreFieldMapping("author"); context.addIgnoreFieldMapping("title"); context.addIgnoreFieldMapping("subtitle"); } }); /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.db2es(); }
定时增量导入
源码文件 https://gitee.com/bbossgroups/eshelloword-booter/blob/master/src/test/java/org/bboss/elasticsearchtest/db2es/ScheduleImportTaskTest.java
public void testSimpleLogImportBuilderFromExternalDBConfig(){ ImportBuilder importBuilder = ImportBuilder.newInstance(); //增量定时任务不要删表,但是可以通过删表来做初始化操作 // try { // //清除测试表,导入的时候回重建表,测试的时候加上为了看测试效果,实际线上环境不要删表 // ElasticSearchHelper.getRestClientUtil().dropIndice("dbdemo"); // } // catch (Exception e){ // // } //数据源相关配置,可选项,可以在外部启动数据源 importBuilder.setDbName("test") .setDbDriver("com.mysql.jdbc.Driver") //数据库驱动程序,必须导入相关数据库的驱动jar包 .setDbUrl("jdbc:mysql://localhost:3306/bboss?useCursorFetch=true") .setDbUser("root") .setDbPassword("123456") .setValidateSQL("select 1") .setUsePool(true);//是否使用连接池 //指定导入数据的sql语句,必填项,可以设置自己的提取逻辑,设置增量变量log_id importBuilder.setSql("select * from td_sm_log where log_id > #[log_id]"); /** * es相关配置 */ importBuilder .setIndex("dbdemo") //必填项 .setIndexType("dbdemo") //必填项 // .setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新 .setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId .setBatchSize(5000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理 .setJdbcFetchSize(10000);//设置数据库的查询fetchsize importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明 // .setScheduleDate(date) //指定任务开始执行时间:日期 .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次 // importBuilder.setNumberLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 // importBuilder.setNumberLastValueColumn("log_id");//手动指定日期增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段 importBuilder.setFromFirst(true);//任务重启时,重新开始采集数据,适合于每次全量导入数据的情况,如果是全量导入,可以先删除原来的索引数据 importBuilder.setLastValueStorePath("testdb");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点 // importBuilder.setLastValueStoreTableName("logs");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型 // 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型 // importBuilder. /** * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池 */ importBuilder.setParallel(true);//设置为多线程并行批量导入 importBuilder.setQueue(10);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量 importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行 importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回 importBuilder.setEsIdField("log_id");//设置文档主键,不设置,则自动产生文档id importBuilder.setDebugResponse(true);//设置是否将每次处理的reponse打印到日志文件中,默认false,不打印响应报文将大大提升性能,只有在需要的时候才,log日志级别同时要设置为INFO // importBuilder.setDiscardBulkResponse(true);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认true,如果不需要响应报文将大大提升处理速度 /** * 执行数据库表数据导入es操作 */ DataStream dataStream = importBuilder.builder(); dataStream.db2es();//执行导入操作 System.out.println(); }
5 开发交流
完整的demo工程
https://gitee.com/bbossgroups/eshelloword-booter
elasticsearch技术交流群:166471282
elasticsearch微信公众号:
bboss微信公众号:
发表评论
-
一组获取Elasticsearch 索引表所有文档API使用案例
2018-11-18 16:02 3001The best elasticsearch highle ... -
Elasticsearch Scroll和Slice Scroll查询API使用案例
2018-09-16 18:49 3830Elasticsearch Scroll和Slice Scro ... -
Spring Boot整合ElasticSearch单/多集群案例
2018-07-07 20:12 9761Spring Boot整合ElasticSearch单个集群和 ... -
ElasticSearch DSL Script使用案例分享
2018-06-28 23:52 6168the best elasticsearch highleve ... -
Elasticsearch 6.3.0 SQL功能使用案例分享
2018-06-25 19:12 3270The best elasticsearch highleve ... -
数据库数据导入Elasticsearch案例分享
2018-06-21 22:56 433The best elasticsearch highleve ... -
ElasticSearch From-Size分页案例
2018-06-14 00:17 3375ElasticSearch From-Size分页案例 1. ... -
ElasticSearch客户端注解使用介绍
2018-05-30 00:19 2362The best elasticsearch highleve ... -
基于自定义配置文件初始化ElasticSearch客户端方法介绍
2018-05-24 18:56 1479基于自定义配置文件初始化ElasticSearch客户端方法介 ... -
Elasticsearch关键词高亮检索案例分享
2018-05-10 22:18 62091.准备工作 参考文档《集成Elasticsearch Res ... -
判断ElasticSearch索引Indice和索引类型是否存在
2018-05-05 23:54 8488The best elasticsearch highleve ... -
快速集成Elasticsearch Restful API案例
2018-04-26 14:27 3163The best elasticsearch highleve ... -
Elasticsearch source filter检索案例
2018-04-24 13:00 2212摘要: the best elasticsearch high ... -
Elasticsearch search after分页检索案例
2018-04-21 10:36 3040Elasticsearch search after分页检索案 ... -
Elasticsearch Delete/UpdateByQuery案例
2018-04-16 11:09 7313Elasticsearch Delete/UpdateByQu ... -
Elasticsearch返回父子数据关联查询案例
2018-04-13 12:36 4665在《Elasticsearch 父子关 ... -
Elasticsearch Sliced Scroll分页检索案例分享
2018-04-02 18:28 3734Elasticsearch Sliced Scroll分页检索 ... -
Elasticsearch地理位置维护及检索案例分享
2018-03-31 21:36 1683Elasticsearch地理位置信息维护及检索案例分享 1 ... -
Elasticsearch Scroll分页检索案例分享
2018-03-28 20:40 4052Elasticsearch Scroll分页检索案例分享 1 ... -
Elasticsearch Mget、GetDocSource、索引部分更新案例分享
2018-03-25 08:55 12131.前期准备 参考文档《高性能elasticsearch OR ...
相关推荐
Linux环境下使用sqlplus工具将oracle中的数据导入到elasticsearch中。只需要在es_bulk_tool.properties配置sql即可实现数据的批量导入。在elasticsearch6中测试通过。shell脚本需要使用sqlplus。
该工具支持将arcgis的shp数据导入到ES中,支持多面数据或带洞的面数据
资源主要能用于使用logstash将oracle数据导入到es中,logstash的版本为5.6.1
ElasticSearch数据导入 文件导入 单个索引导入 文件按格式采用Json
本地简单kettle抽MySQL数据到ES中 案例.zip本地简单kettle抽MySQL数据到ES中 案例.zip 本地简单kettle抽MySQL数据到ES中 案例.zip本地简单kettle抽MySQL数据到ES中 案例.zip 本地简单kettle抽MySQL数据到ES中 案例....
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
可以将mysql 等数据库的数据导入到 elasticsearch 中 https://github.com/jprante/elasticsearch-jdbc
ElasticSearch数据导出 elasticsearch单文档数据导出 支持自定义查询 导出数据Json文件
支持定时任务导入mysql数据到elasticsearch 支持SQL语句查询elasticsearch,提供dubbo服务接口 定时任务支持quartz与lts方式
DataTable 快速导入数据库——百万条数据只需几秒
ES数据库,eelasticsearch-sql-2.2.0.1. nosql数据库
同步mysql数据到elasticsearch的工具,功能丰富,用法简单,配置灵活,扩展性强;
弹性的 一种将数据从excel文件移动到elasticsearch的工具。 用法:java -jar excelastic.jar data.xls -es:host localhost -es:index test -es:type mytype
真实开发环境的es设计开发文档,可以参考试试,版本比较老了
sqldb2es-从SQL DB到ElasticSearch导入工具 自动将数据从源SQL数据库同步到目标ElasticSearch存储库。 sqldb2es是一个Java应用程序,它从JDBC数据源(表格数据)中获取数据,以供以结构化(JSon)数据格式进行索引...
ElasticSearch官方测试数据
elasticsearch的一个开源的辅助工具,可以批量,定时将数据库数据导入elasticsearch。
Webmagic爬取数据导入到数据库与Elasticsearch5,详细介绍请参考:http://blog.csdn.net/u011781521/article/details/77866642
ES 官方示例数据
arcgis elasticSearch es 矢量数据导入插件 数据建模 mapping indexsetting 字段映射 索引建模支持geoshape、shape,text索引支持keyword/ngram/edgeNgram/ikSmart,快速导入,兼容multipolygon,带洞,多面,使用时...