👍, zhisheng 我觉得支持ES鉴权在生产中是蛮有用的功能,nice to have, 如jinhai所说,可以先提个improvement的issue,在社区里讨论下(具体参数名,这些参数应该是可选的),讨论一致后开PR就可以了。
Best, Leonard > 在 2020年3月25日,13:51,jinhai wang <jinhai...@gmail.com> 写道: > > 优秀!可以提个improve issue > > > Best Regards > > jinhai...@gmail.com > >> 2020年3月25日 下午1:40,zhisheng <zhisheng2...@gmail.com> 写道: >> >> hi,Leonar Xu >> >> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢? >> >> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png >> >> Best Wishes! >> >> zhisheng >> >> Leonard Xu <xbjt...@gmail.com> 于2020年3月24日周二 下午5:53写道: >> >>> Hi, 出发 >>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem >>> connector只支持csv format,所以会有这个错误。 >>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-json</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> Best, >>> Leonard >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>> < >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>>> >>> >>> >>>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道: >>>> >>>> >>>> 源码如下: >>>> CREATE TABLE buy_cnt_per_hour ( >>>> hour_of_day BIGINT, >>>> buy_cnt BIGINT >>>> ) WITH ( >>>> 'connector.type' = 'elasticsearch', >>>> 'connector.version' = '6', >>>> 'connector.hosts' = 'http://localhost:9200', >>>> 'connector.index' = 'buy_cnt_per_hour', >>>> 'connector.document-type' = 'user_behavior', >>>> 'connector.bulk-flush.max-actions' = '1', >>>> 'format.type' = 'json', >>>> 'update-mode' = 'append' >>>> ) >>>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.table.api.EnvironmentSettings; >>>> import org.apache.flink.table.api.Table; >>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>> import org.apache.flink.types.Row; >>>> >>>> public class ESTest { >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> //2、设置运行环境 >>>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); >>>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, settings); >>>> streamEnv.setParallelism(1); >>>> String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, >>> buy_cnt BIGINT " >>>> + ") WITH ( 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6'," >>>> + " 'connector.hosts' = 'http://localhost:9200', >>> 'connector.index' = 'buy_cnt_per_hour'," >>>> + " 'connector.document-type' = 'user_behavior'," >>>> + " 'connector.bulk-flush.max-actions' = '1',\n" + " >>> 'format.type' = 'json'," >>>> + " 'update-mode' = 'append' )"; >>>> tableEnv.sqlUpdate(sinkDDL); >>>> Table table = tableEnv.sqlQuery("select * from test_es "); >>>> tableEnv.toRetractStream(table, Row.class).print(); >>>> streamEnv.execute(""); >>>> } >>>> >>>> } >>>> 具体error >>>> The matching candidates: >>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>> Mismatched properties: >>>> 'connector.type' expects 'filesystem', but is 'elasticsearch' >>>> 'format.type' expects 'csv', but is 'json' >>>> >>>> The following properties are requested: >>>> connector.bulk-flush.max-actions=1 >>>> connector.document-type=user_behavior >>>> connector.hosts=http://localhost:9200 >>>> connector.index=buy_cnt_per_hour >>>> connector.type=elasticsearch >>>> connector.version=6 >>>> format.type=json >>>> schema.0.data-type=BIGINT >>>> schema.0.name=hour_of_day >>>> schema.1.data-type=BIGINT >>>> schema.1.name=buy_cnt >>>> update-mode=append >>> >>> >