Hi, Manas You need to define the schema. You can refer to the following example: t_env.connect( Kafka() .version('0.11') .topic(INPUT_TOPIC) .property("bootstrap.servers", PROD_KAFKA) .property("zookeeper.connect", "localhost:2181") .start_from_latest() ) \ .with_format( Json() .json_schema( "{" " type: 'object'," " properties: {" " lon: {" " type: 'number'" " }," " rideTime: {" " type: 'string'," " format: 'date-time'" " }" " }" "}" ) ) \ .with_schema( # declare the schema of the table Schema() .field("lon", DataTypes.DECIMAL(20, 10)) .field("rideTime", DataTypes.TIMESTAMP(6)) ).register_table_source(INPUT_TABLE)
Best, Xingbo Manas Kale <manaskal...@gmail.com> 于2020年7月2日周四 下午7:59写道: > Hi, > I'm trying to get a simple consumer/producer running using the following > code referred from the provided links : > > from pyflink.dataset import ExecutionEnvironment > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, > StreamTableEnvironment > from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema > > exec_env = StreamExecutionEnvironment.get_execution_environment() > > t_config = TableConfig() > t_env = StreamTableEnvironment.create(exec_env, t_config) > > INPUT_TOPIC = 'xyz' > INPUT_TABLE = 'raw_message' > PROD_ZOOKEEPER = '...' > PROD_KAFKA = '...' > > OUTPUT_TOPIC = 'summary_output' > OUTPUT_TABLE = 'feature_summary' > LOCAL_ZOOKEEPER = 'localhost:2181' > LOCAL_KAFKA = 'localhost:9092' > > > t_env.connect( > Kafka() > .version('universal') > .topic(INPUT_TOPIC) > .property("bootstrap.servers", PROD_KAFKA) > > .start_from_latest() > ) \ > .with_format( > Json() > .json_schema( > "{" > " type: 'object'," > " properties: {" > " lon: {" > " type: 'number'" > " }," > " rideTime: {" > " type: 'string'," > " format: 'date-time'" > " }" > " }" > "}" > ) > ).register_table_source(INPUT_TABLE) > > t_env.connect(Kafka() > .version('universal') > .topic(OUTPUT_TOPIC) > .property("bootstrap.servers", LOCAL_KAFKA) > > .start_from_latest() > ) \ > .with_format( > Json() > .json_schema( > "{" > " type: 'object'," > " properties: {" > " lon: {" > " type: 'number'" > " }," > " rideTime: {" > " type: 'string'," > " format: 'date-time'" > " }" > " }" > "}" > )).register_table_sink(OUTPUT_TABLE) > > t_env.from_path(INPUT_TABLE) \ > .insert_into(OUTPUT_TABLE) > > t_env.execute('IU pyflink job') > > *However, I am getting the following exception : * > > Traceback (most recent call last): > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 147, in deco > return f(*a, **kw) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > o32.registerTableSource. > : org.apache.flink.table.api.TableException: findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException: Could not find the > required schema in property 'schema'. > at > org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90) > at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269) > at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158) > at > org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) > ... 13 more > > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in > <module> > ).register_table_source(INPUT_TABLE) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", > line 1295, in register_table_source > self._j_connect_table_descriptor.registerTableSource(name) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) > pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.' > > > The relevant part seems to be *Caused by: > org.apache.flink.table.api.ValidationException: Could not find the required > schema in property 'schema'.* > > This is probably a basic error, but I can't figure out how I can know what's > wrong with the schema. Is the schema not properly declared? Is some field > missing? > > FWIW I have included the JSON and kafka connector JARs in the required > location. > > > Regards, > Manas > > > On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <manaskal...@gmail.com> wrote: > >> Hi Xingbo, >> Thank you for the information, it certainly helps! >> >> Regards, >> Manas >> >> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <hxbks...@gmail.com> wrote: >> >>> Hi Manas, >>> >>> Since Flink 1.9, the entire architecture of PyFlink has been redesigned. >>> So the method described in the link won't work. >>> But you can use more convenient DDL[1] or descriptor[2] to read kafka >>> data. Besides, You can refer to the common questions about PyFlink[3] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html >>> >>> Best, >>> Xingbo >>> >>> Manas Kale <manaskal...@gmail.com> 于2020年6月29日周一 下午8:10写道: >>> >>>> Hi, >>>> I want to consume and write to Kafak from Flink's python API. >>>> >>>> The only way I found to do this was through this >>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class> >>>> question >>>> on SO where the user essentially copies FlinkKafka connector JARs into the >>>> Flink runtime's lib/ directory. >>>> >>>> - Is this the recommended method to do this? If not, what is? >>>> - Is there any official documentation for using Kafka with pyFlink? >>>> Is this officially supported? >>>> - How does the method described in the link work? Does the Flink >>>> runtime load and expose all JARs in /lib to the python script? Can I >>>> write >>>> custom operators in Java and use those through python? >>>> >>>> Thanks, >>>> Manas >>>> >>>