👍, zhisheng
    我觉得支持ES鉴权在生产中是蛮有用的功能,nice to have, 
如jinhai所说,可以先提个improvement的issue,在社区里讨论下(具体参数名,这些参数应该是可选的),讨论一致后开PR就可以了。

Best,
Leonard
  


> 在 2020年3月25日,13:51,jinhai wang <jinhai...@gmail.com> 写道:
> 
> 优秀!可以提个improve issue
> 
> 
> Best Regards
> 
> jinhai...@gmail.com
> 
>> 2020年3月25日 下午1:40,zhisheng <zhisheng2...@gmail.com> 写道:
>> 
>> hi,Leonar Xu
>> 
>> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
>> 
>> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
>> 
>> Best Wishes!
>> 
>> zhisheng
>> 
>> Leonard Xu <xbjt...@gmail.com> 于2020年3月24日周二 下午5:53写道:
>> 
>>> Hi, 出发
>>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>>> connector只支持csv format,所以会有这个错误。
>>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>>> 
>>> <dependency>
>>>   <groupId>org.apache.flink</groupId>
>>>   <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
>>>   <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>   <groupId>org.apache.flink</groupId>
>>>   <artifactId>flink-json</artifactId>
>>>   <version>${flink.version}</version>
>>> </dependency>
>>> 
>>> Best,
>>> Leonard
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>> <
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>>> 
>>> 
>>> 
>>>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
>>>> 
>>>> 
>>>> 源码如下:
>>>> CREATE TABLE buy_cnt_per_hour (
>>>>   hour_of_day BIGINT,
>>>>   buy_cnt BIGINT
>>>> ) WITH (
>>>>   'connector.type' = 'elasticsearch',
>>>>   'connector.version' = '6',
>>>>   'connector.hosts' = 'http://localhost:9200',
>>>>   'connector.index' = 'buy_cnt_per_hour',
>>>>   'connector.document-type' = 'user_behavior',
>>>>   'connector.bulk-flush.max-actions' = '1',
>>>>   'format.type' = 'json',
>>>>   'update-mode' = 'append'
>>>> )
>>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.types.Row;
>>>> 
>>>> public class ESTest {
>>>> 
>>>>   public static void main(String[] args) throws Exception {
>>>> 
>>>>       //2、设置运行环境
>>>>       StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>       EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>>>>       StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv, settings);
>>>>       streamEnv.setParallelism(1);
>>>>       String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>>> buy_cnt BIGINT "
>>>>               + ") WITH ( 'connector.type' = 'elasticsearch',
>>> 'connector.version' = '6',"
>>>>               + "    'connector.hosts' = 'http://localhost:9200',
>>> 'connector.index' = 'buy_cnt_per_hour',"
>>>>               + "    'connector.document-type' = 'user_behavior',"
>>>>               + "    'connector.bulk-flush.max-actions' = '1',\n" + "
>>> 'format.type' = 'json',"
>>>>               + "    'update-mode' = 'append' )";
>>>>       tableEnv.sqlUpdate(sinkDDL);
>>>>       Table table = tableEnv.sqlQuery("select * from test_es ");
>>>>       tableEnv.toRetractStream(table, Row.class).print();
>>>>       streamEnv.execute("");
>>>>   }
>>>> 
>>>> }
>>>> 具体error
>>>> The matching candidates:
>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>> Mismatched properties:
>>>> 'connector.type' expects 'filesystem', but is 'elasticsearch'
>>>> 'format.type' expects 'csv', but is 'json'
>>>> 
>>>> The following properties are requested:
>>>> connector.bulk-flush.max-actions=1
>>>> connector.document-type=user_behavior
>>>> connector.hosts=http://localhost:9200
>>>> connector.index=buy_cnt_per_hour
>>>> connector.type=elasticsearch
>>>> connector.version=6
>>>> format.type=json
>>>> schema.0.data-type=BIGINT
>>>> schema.0.name=hour_of_day
>>>> schema.1.data-type=BIGINT
>>>> schema.1.name=buy_cnt
>>>> update-mode=append
>>> 
>>> 
> 

回复