Hi, 据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。
Best, Jark On Tue, 16 Jun 2020 at 16:08, Dian Fu <dian0511...@gmail.com> wrote: > 可以发一下完整的异常吗? > > 在 2020年6月16日,下午3:45,jack <wslyk...@163.com> 写道: > > 连接的版本部分我本地已经修改为 5了,发生了下面的报错; > > >> st_env.connect( > >> Elasticsearch() > >> .version("5") > >> .host("localhost", 9200, "http") > >> .index("taxiid-cnts") > >> .document_type('taxiidcnt') > >> .key_delimiter("$")) \ > > > > > > > 在 2020-06-16 15:38:28,"Dian Fu" <dian0511...@gmail.com> 写道: > >I guess it's because the ES version specified in the job is `6`, however, > >the jar used is `5`. > > > >> 在 2020年6月16日,下午1:47,jack <wslyk...@163.com> 写道: > >> > >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 > >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 > >> 连接es的时候报错,findAndCreateTableSink failed。 > >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 > >> > >> Caused by Could not find a suitable factory for > >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. > >> Reason: Required context properties mismatch > >> > >> > >> > >> from pyflink.datastream import StreamExecutionEnvironment, > >> TimeCharacteristic > >> from pyflink.table import StreamTableEnvironment, DataTypes, > >> EnvironmentSettings > >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, > >> Elasticsearch > >> > >> > >> def area_cnts(): > >> s_env = StreamExecutionEnvironment.get_execution_environment() > >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > >> s_env.set_parallelism(1) > >> > >> # use blink table planner > >> st_env = StreamTableEnvironment \ > >> .create(s_env, environment_settings=EnvironmentSettings > >> .new_instance() > >> .in_streaming_mode() > >> .use_blink_planner().build()) > >> > >> # register source and sink > >> register_rides_source(st_env) > >> register_cnt_sink(st_env) > >> > >> # query > >> st_env.from_path("source")\ > >> .group_by("taxiId")\ > >> .select("taxiId, count(1) as cnt")\ > >> .insert_into("sink") > >> > >> # execute > >> st_env.execute("6-write_with_elasticsearch") > >> > >> > >> def register_rides_source(st_env): > >> st_env \ > >> .connect( # declare the external system to connect to > >> Kafka() > >> .version("universal") > >> .topic("Rides") > >> .start_from_earliest() > >> .property("zookeeper.connect", "zookeeper:2181") > >> .property("bootstrap.servers", "kafka:9092")) \ > >> .with_format( # declare a format for this system > >> Json() > >> .fail_on_missing_field(True) > >> .schema(DataTypes.ROW([ > >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), > >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), > >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), > >> DataTypes.FIELD("lon", DataTypes.FLOAT()), > >> DataTypes.FIELD("lat", DataTypes.FLOAT()), > >> DataTypes.FIELD("psgCnt", DataTypes.INT()), > >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ > >> .with_schema( # declare the schema of the table > >> Schema() > >> .field("rideId", DataTypes.BIGINT()) > >> .field("taxiId", DataTypes.BIGINT()) > >> .field("isStart", DataTypes.BOOLEAN()) > >> .field("lon", DataTypes.FLOAT()) > >> .field("lat", DataTypes.FLOAT()) > >> .field("psgCnt", DataTypes.INT()) > >> .field("rideTime", DataTypes.TIMESTAMP()) > >> .rowtime( > >> Rowtime() > >> .timestamps_from_field("eventTime") > >> .watermarks_periodic_bounded(60000))) \ > >> .in_append_mode() \ > >> .register_table_source("source") > >> > >> > >> def register_cnt_sink(st_env): > >> st_env.connect( > >> Elasticsearch() > >> .version("6") > >> .host("elasticsearch", 9200, "http") > >> .index("taxiid-cnts") > >> .document_type('taxiidcnt') > >> .key_delimiter("$")) \ > >> .with_schema( > >> Schema() > >> .field("taxiId", DataTypes.BIGINT()) > >> .field("cnt", DataTypes.BIGINT())) \ > >> .with_format( > >> Json() > >> .derive_schema()) \ > >> .in_upsert_mode() \ > >> .register_table_sink("sink") > >> > >> > >> if __name__ == '__main__': > >> area_cnts() > >> > > > > >