pyflink连接elasticsearch5.4问题

2020-06-15 Thread jack
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink   failed。 
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。


Caused by Could not find a suitable  factory for   
‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch






from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
Elasticsearch




def area_cnts():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)


# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())


# register source and sink
register_rides_source(st_env)
register_cnt_sink(st_env)


# query
st_env.from_path("source")\
.group_by("taxiId")\
.select("taxiId, count(1) as cnt")\
.insert_into("sink")


# execute
st_env.execute("6-write_with_elasticsearch")




def register_rides_source(st_env):
st_env \
.connect(  # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092")) \
.with_format(  # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
.with_schema(  # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("rideTime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("eventTime")
.watermarks_periodic_bounded(6))) \
.in_append_mode() \
.register_table_source("source")




def register_cnt_sink(st_env):
st_env.connect(
Elasticsearch()
.version("6")
.host("elasticsearch", 9200, "http")
.index("taxiid-cnts")
.document_type('taxiidcnt')
.key_delimiter("$")) \
.with_schema(
Schema()
.field("taxiId", DataTypes.BIGINT())
.field("cnt", DataTypes.BIGINT())) \
.with_format(
   Json()
   .derive_schema()) \
.in_upsert_mode() \
.register_table_sink("sink")




if __name__ == '__main__':
area_cnts()



Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread Dian Fu
I guess it's because the ES version specified in the job is `6`, however, the 
jar used is `5`.

> 在 2020年6月16日,下午1:47,jack  写道:
> 
> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> 连接es的时候报错,findAndCreateTableSink   failed。 
> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> 
> Caused by Could not find a suitable  factory for   
> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> Reason: Required context properties mismatch
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes, 
> EnvironmentSettings
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> Elasticsearch
> 
> 
> def area_cnts():
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> 
> # use blink table planner
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> 
> # register source and sink
> register_rides_source(st_env)
> register_cnt_sink(st_env)
> 
> # query
> st_env.from_path("source")\
> .group_by("taxiId")\
> .select("taxiId, count(1) as cnt")\
> .insert_into("sink")
> 
> # execute
> st_env.execute("6-write_with_elasticsearch")
> 
> 
> def register_rides_source(st_env):
> st_env \
> .connect(  # declare the external system to connect to
> Kafka()
> .version("universal")
> .topic("Rides")
> .start_from_earliest()
> .property("zookeeper.connect", "zookeeper:2181")
> .property("bootstrap.servers", "kafka:9092")) \
> .with_format(  # declare a format for this system
> Json()
> .fail_on_missing_field(True)
> .schema(DataTypes.ROW([
> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("rideId", DataTypes.BIGINT())
> .field("taxiId", DataTypes.BIGINT())
> .field("isStart", DataTypes.BOOLEAN())
> .field("lon", DataTypes.FLOAT())
> .field("lat", DataTypes.FLOAT())
> .field("psgCnt", DataTypes.INT())
> .field("rideTime", DataTypes.TIMESTAMP())
> .rowtime(
> Rowtime()
> .timestamps_from_field("eventTime")
> .watermarks_periodic_bounded(6))) \
> .in_append_mode() \
> .register_table_source("source")
> 
> 
> def register_cnt_sink(st_env):
> st_env.connect(
> Elasticsearch()
> .version("6")
> .host("elasticsearch", 9200, "http")
> .index("taxiid-cnts")
> .document_type('taxiidcnt')
> .key_delimiter("$")) \
> .with_schema(
> Schema()
> .field("taxiId", DataTypes.BIGINT())
> .field("cnt", DataTypes.BIGINT())) \
> .with_format(
>Json()
>.derive_schema()) \
> .in_upsert_mode() \
> .register_table_sink("sink")
> 
> 
> if __name__ == '__main__':
> area_cnts()
> 



Re:Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread jack
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>> st_env.connect(
>> Elasticsearch()
>> .version("5")
>> .host("localhost", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \














在 2020-06-16 15:38:28,"Dian Fu"  写道:
>I guess it's because the ES version specified in the job is `6`, however, the 
>jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack  写道:
>> 
>> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
>> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
>> 连接es的时候报错,findAndCreateTableSink   failed。 
>> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
>> 
>> Caused by Could not find a suitable  factory for   
>> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
>> Reason: Required context properties mismatch
>> 
>> 
>> 
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, DataTypes, 
>> EnvironmentSettings
>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
>> Elasticsearch
>> 
>> 
>> def area_cnts():
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> s_env.set_parallelism(1)
>> 
>> # use blink table planner
>> st_env = StreamTableEnvironment \
>> .create(s_env, environment_settings=EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> .use_blink_planner().build())
>> 
>> # register source and sink
>> register_rides_source(st_env)
>> register_cnt_sink(st_env)
>> 
>> # query
>> st_env.from_path("source")\
>> .group_by("taxiId")\
>> .select("taxiId, count(1) as cnt")\
>> .insert_into("sink")
>> 
>> # execute
>> st_env.execute("6-write_with_elasticsearch")
>> 
>> 
>> def register_rides_source(st_env):
>> st_env \
>> .connect(  # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("Rides")
>> .start_from_earliest()
>> .property("zookeeper.connect", "zookeeper:2181")
>> .property("bootstrap.servers", "kafka:9092")) \
>> .with_format(  # declare a format for this system
>> Json()
>> .fail_on_missing_field(True)
>> .schema(DataTypes.ROW([
>> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>> DataTypes.FIELD("lon", DataTypes.FLOAT()),
>> DataTypes.FIELD("lat", DataTypes.FLOAT()),
>> DataTypes.FIELD("psgCnt", DataTypes.INT()),
>> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("rideId", DataTypes.BIGINT())
>> .field("taxiId", DataTypes.BIGINT())
>> .field("isStart", DataTypes.BOOLEAN())
>> .field("lon", DataTypes.FLOAT())
>> .field("lat", DataTypes.FLOAT())
>> .field("psgCnt", DataTypes.INT())
>> .field("rideTime", DataTypes.TIMESTAMP())
>> .rowtime(
>> Rowtime()
>> .timestamps_from_field("eventTime")
>> .watermarks_periodic_bounded(6))) \
>> .in_append_mode() \
>> .register_table_source("source")
>> 
>> 
>> def register_cnt_sink(st_env):
>> st_env.connect(
>> Elasticsearch()
>> .version("6")
>> .host("elasticsearch", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \
>> .with_schema(
>> Schema()
>> .field("taxiId", DataTypes.BIGINT())
>> .field("cnt", DataTypes.BIGINT())) \
>> .with_format(
>>Json()
>>.derive_schema()) \
>> .in_upsert_mode() \
>> .register_table_sink("sink")
>> 
>> 
>> if __name__ == '__main__':
>> area_cnts()
>> 
>


Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread Dian Fu
可以发一下完整的异常吗?

> 在 2020年6月16日,下午3:45,jack  写道:
> 
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("5")
> >> .host("localhost", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> 
> 
> 
> 
> 
> 在 2020-06-16 15:38:28,"Dian Fu"  写道:
> >I guess it's because the ES version specified in the job is `6`, however, 
> >the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack  写道:
> >> 
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。 
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >> 
> >> Caused by Could not find a suitable  factory for   
> >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >> 
> >> 
> >> 
> >> from pyflink.datastream import StreamExecutionEnvironment, 
> >> TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, 
> >> EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> >> Elasticsearch
> >> 
> >> 
> >> def area_cnts():
> >> s_env = StreamExecutionEnvironment.get_execution_environment()
> >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >> s_env.set_parallelism(1)
> >> 
> >> # use blink table planner
> >> st_env = StreamTableEnvironment \
> >> .create(s_env, environment_settings=EnvironmentSettings
> >> .new_instance()
> >> .in_streaming_mode()
> >> .use_blink_planner().build())
> >> 
> >> # register source and sink
> >> register_rides_source(st_env)
> >> register_cnt_sink(st_env)
> >> 
> >> # query
> >> st_env.from_path("source")\
> >> .group_by("taxiId")\
> >> .select("taxiId, count(1) as cnt")\
> >> .insert_into("sink")
> >> 
> >> # execute
> >> st_env.execute("6-write_with_elasticsearch")
> >> 
> >> 
> >> def register_rides_source(st_env):
> >> st_env \
> >> .connect(  # declare the external system to connect to
> >> Kafka()
> >> .version("universal")
> >> .topic("Rides")
> >> .start_from_earliest()
> >> .property("zookeeper.connect", "zookeeper:2181")
> >> .property("bootstrap.servers", "kafka:9092")) \
> >> .with_format(  # declare a format for this system
> >> Json()
> >> .fail_on_missing_field(True)
> >> .schema(DataTypes.ROW([
> >> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >> .with_schema(  # declare the schema of the table
> >> Schema()
> >> .field("rideId", DataTypes.BIGINT())
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("isStart", DataTypes.BOOLEAN())
> >> .field("lon", DataTypes.FLOAT())
> >> .field("lat", DataTypes.FLOAT())
> >> .field("psgCnt", DataTypes.INT())
> >> .field("rideTime", DataTypes.TIMESTAMP())
> >> .rowtime(
> >> Rowtime()
> >> .timestamps_from_field("eventTime")
> >> .watermarks_periodic_bounded(6))) \
> >> .in_append_mode() \
> >> .register_table_source("source")
> >> 
> >> 
> >> def register_cnt_sink(st_env):
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("6")
> >> .host("elasticsearch", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> >> .with_schema(
> >> Schema()
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("cnt", DataTypes.BIGINT())) \
> >> .with_format(
> >>Json()
> >>.derive_schema()) \
> >> .in_upsert_mode() \
> >> .register_table_sink("sink")
> >> 
> >> 
> >> if __name__ == '__main__':
> >> area_cnts()
> >> 
> >



Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread Jark Wu
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu  wrote:

> 可以发一下完整的异常吗?
>
> 在 2020年6月16日,下午3:45,jack  写道:
>
> 连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("5")
> >> .host("localhost", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
>
>
>
>
>
>
> 在 2020-06-16 15:38:28,"Dian Fu"  写道:
> >I guess it's because the ES version specified in the job is `6`, however, 
> >the jar used is `5`.
> >
> >> 在 2020年6月16日,下午1:47,jack  写道:
> >>
> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
> >> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
> >> 连接es的时候报错,findAndCreateTableSink   failed。
> >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
> >>
> >> Caused by Could not find a suitable  factory for   
> >> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
> >> Reason: Required context properties mismatch
> >>
> >>
> >>
> >> from pyflink.datastream import StreamExecutionEnvironment, 
> >> TimeCharacteristic
> >> from pyflink.table import StreamTableEnvironment, DataTypes, 
> >> EnvironmentSettings
> >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
> >> Elasticsearch
> >>
> >>
> >> def area_cnts():
> >> s_env = StreamExecutionEnvironment.get_execution_environment()
> >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> >> s_env.set_parallelism(1)
> >>
> >> # use blink table planner
> >> st_env = StreamTableEnvironment \
> >> .create(s_env, environment_settings=EnvironmentSettings
> >> .new_instance()
> >> .in_streaming_mode()
> >> .use_blink_planner().build())
> >>
> >> # register source and sink
> >> register_rides_source(st_env)
> >> register_cnt_sink(st_env)
> >>
> >> # query
> >> st_env.from_path("source")\
> >> .group_by("taxiId")\
> >> .select("taxiId, count(1) as cnt")\
> >> .insert_into("sink")
> >>
> >> # execute
> >> st_env.execute("6-write_with_elasticsearch")
> >>
> >>
> >> def register_rides_source(st_env):
> >> st_env \
> >> .connect(  # declare the external system to connect to
> >> Kafka()
> >> .version("universal")
> >> .topic("Rides")
> >> .start_from_earliest()
> >> .property("zookeeper.connect", "zookeeper:2181")
> >> .property("bootstrap.servers", "kafka:9092")) \
> >> .with_format(  # declare a format for this system
> >> Json()
> >> .fail_on_missing_field(True)
> >> .schema(DataTypes.ROW([
> >> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
> >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
> >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
> >> DataTypes.FIELD("lon", DataTypes.FLOAT()),
> >> DataTypes.FIELD("lat", DataTypes.FLOAT()),
> >> DataTypes.FIELD("psgCnt", DataTypes.INT()),
> >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
> >> .with_schema(  # declare the schema of the table
> >> Schema()
> >> .field("rideId", DataTypes.BIGINT())
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("isStart", DataTypes.BOOLEAN())
> >> .field("lon", DataTypes.FLOAT())
> >> .field("lat", DataTypes.FLOAT())
> >> .field("psgCnt", DataTypes.INT())
> >> .field("rideTime", DataTypes.TIMESTAMP())
> >> .rowtime(
> >> Rowtime()
> >> .timestamps_from_field("eventTime")
> >> .watermarks_periodic_bounded(6))) \
> >> .in_append_mode() \
> >> .register_table_source("source")
> >>
> >>
> >> def register_cnt_sink(st_env):
> >> st_env.connect(
> >> Elasticsearch()
> >> .version("6")
> >> .host("elasticsearch", 9200, "http")
> >> .index("taxiid-cnts")
> >> .document_type('taxiidcnt')
> >> .key_delimiter("$")) \
> >> .with_schema(
> >> Schema()
> >> .field("taxiId", DataTypes.BIGINT())
> >> .field("cnt", DataTypes.BIGINT())) \
> >> .with_format(
> >>Json()
> >>.derive_schema()) \
> >> .in_upsert_mode() \
> >> .register_table_sink("sink")
> >>
> >>
> >> if __name__ == '__main__':
> >> area_cnts()
> >>
> >
>
>
>