优秀!可以提个improve issue
Best Regards jinhai...@gmail.com > 2020年3月25日 下午1:40,zhisheng <zhisheng2...@gmail.com> 写道: > > hi,Leonar Xu > > 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢? > > 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png > > Best Wishes! > > zhisheng > > Leonard Xu <xbjt...@gmail.com> 于2020年3月24日周二 下午5:53写道: > >> Hi, 出发 >> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem >> connector只支持csv format,所以会有这个错误。 >> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-json</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> Best, >> Leonard >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >> < >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>> >> >> >>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道: >>> >>> >>> 源码如下: >>> CREATE TABLE buy_cnt_per_hour ( >>> hour_of_day BIGINT, >>> buy_cnt BIGINT >>> ) WITH ( >>> 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6', >>> 'connector.hosts' = 'http://localhost:9200', >>> 'connector.index' = 'buy_cnt_per_hour', >>> 'connector.document-type' = 'user_behavior', >>> 'connector.bulk-flush.max-actions' = '1', >>> 'format.type' = 'json', >>> 'update-mode' = 'append' >>> ) >>> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>> import org.apache.flink.types.Row; >>> >>> public class ESTest { >>> >>> public static void main(String[] args) throws Exception { >>> >>> //2、设置运行环境 >>> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); >>> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv, settings); >>> streamEnv.setParallelism(1); >>> String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, >> buy_cnt BIGINT " >>> + ") WITH ( 'connector.type' = 'elasticsearch', >> 'connector.version' = '6'," >>> + " 'connector.hosts' = 'http://localhost:9200', >> 'connector.index' = 'buy_cnt_per_hour'," >>> + " 'connector.document-type' = 'user_behavior'," >>> + " 'connector.bulk-flush.max-actions' = '1',\n" + " >> 'format.type' = 'json'," >>> + " 'update-mode' = 'append' )"; >>> tableEnv.sqlUpdate(sinkDDL); >>> Table table = tableEnv.sqlQuery("select * from test_es "); >>> tableEnv.toRetractStream(table, Row.class).print(); >>> streamEnv.execute(""); >>> } >>> >>> } >>> 具体error >>> The matching candidates: >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>> Mismatched properties: >>> 'connector.type' expects 'filesystem', but is 'elasticsearch' >>> 'format.type' expects 'csv', but is 'json' >>> >>> The following properties are requested: >>> connector.bulk-flush.max-actions=1 >>> connector.document-type=user_behavior >>> connector.hosts=http://localhost:9200 >>> connector.index=buy_cnt_per_hour >>> connector.type=elasticsearch >>> connector.version=6 >>> format.type=json >>> schema.0.data-type=BIGINT >>> schema.0.name=hour_of_day >>> schema.1.data-type=BIGINT >>> schema.1.name=buy_cnt >>> update-mode=append >> >>