hi,Leonar Xu

官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?

效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png

Best Wishes!

zhisheng

Leonard Xu <xbjt...@gmail.com> 于2020年3月24日周二 下午5:53写道:

> Hi, 出发
> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
> connector只支持csv format,所以会有这个错误。
> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
>     <version>${flink.version}</version>
> </dependency>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-json</artifactId>
>     <version>${flink.version}</version>
> </dependency>
>
> Best,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> >
>
>
> > 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
> >
> >
> > 源码如下:
> > CREATE TABLE buy_cnt_per_hour (
> >     hour_of_day BIGINT,
> >     buy_cnt BIGINT
> > ) WITH (
> >     'connector.type' = 'elasticsearch',
> >     'connector.version' = '6',
> >     'connector.hosts' = 'http://localhost:9200',
> >     'connector.index' = 'buy_cnt_per_hour',
> >     'connector.document-type' = 'user_behavior',
> >     'connector.bulk-flush.max-actions' = '1',
> >     'format.type' = 'json',
> >     'update-mode' = 'append'
> > )
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > public class ESTest {
> >
> >     public static void main(String[] args) throws Exception {
> >
> >         //2、设置运行环境
> >         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >         EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> >         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, settings);
> >         streamEnv.setParallelism(1);
> >         String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
> buy_cnt BIGINT "
> >                 + ") WITH ( 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',"
> >                 + "    'connector.hosts' = 'http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',"
> >                 + "    'connector.document-type' = 'user_behavior',"
> >                 + "    'connector.bulk-flush.max-actions' = '1',\n" + "
>   'format.type' = 'json',"
> >                 + "    'update-mode' = 'append' )";
> >         tableEnv.sqlUpdate(sinkDDL);
> >         Table table = tableEnv.sqlQuery("select * from test_es ");
> >         tableEnv.toRetractStream(table, Row.class).print();
> >         streamEnv.execute("");
> >     }
> >
> > }
> > 具体error
> > The matching candidates:
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'elasticsearch'
> > 'format.type' expects 'csv', but is 'json'
> >
> > The following properties are requested:
> > connector.bulk-flush.max-actions=1
> > connector.document-type=user_behavior
> > connector.hosts=http://localhost:9200
> > connector.index=buy_cnt_per_hour
> > connector.type=elasticsearch
> > connector.version=6
> > format.type=json
> > schema.0.data-type=BIGINT
> > schema.0.name=hour_of_day
> > schema.1.data-type=BIGINT
> > schema.1.name=buy_cnt
> > update-mode=append
>
>

回复