I also tried doing this by using a User Defined Function. class DataConverter(ScalarFunction): def eval(self, str_data): data = json.loads(str_data) return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc.
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()], result_type = DataTypes.ROW([ DataTypes.FIELD("feature1", DataTypes.STRING()) ]))) t_env.from_path(INPUT_TABLE) \ .select("data_converter(data)") \ # <--- here "data" is the field "data" from the previous mail .insert_into(OUTPUT_TABLE) I used a ROW to hold multiple values but I can't figure out how I can return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it? Thanks! On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <manaskal...@gmail.com> wrote: > Hi Xingbo, > Thanks for the reply, I didn't know that a table schema also needs to be > declared after the connect or but I understand now. > I have another question: how do I write the parsing schemas for a field > that itself is a valid JSON string? For example: > { > "monitorId": 865, > "deviceId": "94:54:93:49:96:13", > "data": > "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}", > "state": 2, > "time": 1593687809180 > } > The field "data" is a string of valid JSON with string:number objects. I'm > currently trying using JSON schema object and DataTypes.ROW, but am getting > deserialization errors. > > .with_format( > Json() > .json_schema( > """ > { > "type": "object", > "properties": { > "monitorId": { > "type": "string" > }, > "deviceId": { > "type": "string" > }, > "data": { > "type": "object" > }, > "state": { > "type": "integer" > }, > "time": { > "type": "string" > } > } > } > """ > ) > ) \ > .with_schema( > Schema() > .field("monitorId", DataTypes.STRING()) > .field("deviceId", DataTypes.STRING()) > .field("data", DataTypes.ROW()) > ) > > Regards, > > Manas > > > On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <hxbks...@gmail.com> wrote: > >> Hi, Manas >> You need to define the schema. You can refer to the following example: >> t_env.connect( >> Kafka() >> .version('0.11') >> .topic(INPUT_TOPIC) >> .property("bootstrap.servers", PROD_KAFKA) >> .property("zookeeper.connect", "localhost:2181") >> .start_from_latest() >> ) \ >> .with_format( >> Json() >> .json_schema( >> "{" >> " type: 'object'," >> " properties: {" >> " lon: {" >> " type: 'number'" >> " }," >> " rideTime: {" >> " type: 'string'," >> " format: 'date-time'" >> " }" >> " }" >> "}" >> ) >> ) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("lon", DataTypes.DECIMAL(20, 10)) >> .field("rideTime", DataTypes.TIMESTAMP(6)) >> ).register_table_source(INPUT_TABLE) >> >> Best, >> Xingbo >> >> Manas Kale <manaskal...@gmail.com> 于2020年7月2日周四 下午7:59写道: >> >>> Hi, >>> I'm trying to get a simple consumer/producer running using the following >>> code referred from the provided links : >>> >>> from pyflink.dataset import ExecutionEnvironment >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, >>> StreamTableEnvironment >>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema >>> >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> >>> t_config = TableConfig() >>> t_env = StreamTableEnvironment.create(exec_env, t_config) >>> >>> INPUT_TOPIC = 'xyz' >>> INPUT_TABLE = 'raw_message' >>> PROD_ZOOKEEPER = '...' >>> PROD_KAFKA = '...' >>> >>> OUTPUT_TOPIC = 'summary_output' >>> OUTPUT_TABLE = 'feature_summary' >>> LOCAL_ZOOKEEPER = 'localhost:2181' >>> LOCAL_KAFKA = 'localhost:9092' >>> >>> >>> t_env.connect( >>> Kafka() >>> .version('universal') >>> .topic(INPUT_TOPIC) >>> .property("bootstrap.servers", PROD_KAFKA) >>> >>> .start_from_latest() >>> ) \ >>> .with_format( >>> Json() >>> .json_schema( >>> "{" >>> " type: 'object'," >>> " properties: {" >>> " lon: {" >>> " type: 'number'" >>> " }," >>> " rideTime: {" >>> " type: 'string'," >>> " format: 'date-time'" >>> " }" >>> " }" >>> "}" >>> ) >>> ).register_table_source(INPUT_TABLE) >>> >>> t_env.connect(Kafka() >>> .version('universal') >>> .topic(OUTPUT_TOPIC) >>> .property("bootstrap.servers", LOCAL_KAFKA) >>> >>> .start_from_latest() >>> ) \ >>> .with_format( >>> Json() >>> .json_schema( >>> "{" >>> " type: 'object'," >>> " properties: {" >>> " lon: {" >>> " type: 'number'" >>> " }," >>> " rideTime: {" >>> " type: 'string'," >>> " format: 'date-time'" >>> " }" >>> " }" >>> "}" >>> )).register_table_sink(OUTPUT_TABLE) >>> >>> t_env.from_path(INPUT_TABLE) \ >>> .insert_into(OUTPUT_TABLE) >>> >>> t_env.execute('IU pyflink job') >>> >>> *However, I am getting the following exception : * >>> >>> Traceback (most recent call last): >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >>> line 147, in deco >>> return f(*a, **kw) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", >>> line 328, in get_return_value >>> format(target_id, ".", name), value) >>> py4j.protocol.Py4JJavaError: An error occurred while calling >>> o32.registerTableSource. >>> : org.apache.flink.table.api.TableException: findAndCreateTableSource >>> failed. >>> at >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>> at >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42) >>> at >>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at >>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.table.api.ValidationException: Could not find >>> the required schema in property 'schema'. >>> at >>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90) >>> at >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269) >>> at >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158) >>> at >>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) >>> at >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) >>> ... 13 more >>> >>> >>> During handling of the above exception, another exception occurred: >>> >>> Traceback (most recent call last): >>> File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in >>> <module> >>> ).register_table_source(INPUT_TABLE) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", >>> line 1295, in register_table_source >>> self._j_connect_table_descriptor.registerTableSource(name) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", >>> line 1286, in __call__ >>> answer, self.gateway_client, self.target_id, self.name) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >>> line 154, in deco >>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) >>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.' >>> >>> >>> The relevant part seems to be *Caused by: >>> org.apache.flink.table.api.ValidationException: Could not find the required >>> schema in property 'schema'.* >>> >>> This is probably a basic error, but I can't figure out how I can know >>> what's wrong with the schema. Is the schema not properly declared? Is some >>> field missing? >>> >>> FWIW I have included the JSON and kafka connector JARs in the required >>> location. >>> >>> >>> Regards, >>> Manas >>> >>> >>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <manaskal...@gmail.com> >>> wrote: >>> >>>> Hi Xingbo, >>>> Thank you for the information, it certainly helps! >>>> >>>> Regards, >>>> Manas >>>> >>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <hxbks...@gmail.com> >>>> wrote: >>>> >>>>> Hi Manas, >>>>> >>>>> Since Flink 1.9, the entire architecture of PyFlink has been >>>>> redesigned. So the method described in the link won't work. >>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka >>>>> data. Besides, You can refer to the common questions about PyFlink[3] >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>> [3] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html >>>>> >>>>> Best, >>>>> Xingbo >>>>> >>>>> Manas Kale <manaskal...@gmail.com> 于2020年6月29日周一 下午8:10写道: >>>>> >>>>>> Hi, >>>>>> I want to consume and write to Kafak from Flink's python API. >>>>>> >>>>>> The only way I found to do this was through this >>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class> >>>>>> question >>>>>> on SO where the user essentially copies FlinkKafka connector JARs into >>>>>> the >>>>>> Flink runtime's lib/ directory. >>>>>> >>>>>> - Is this the recommended method to do this? If not, what is? >>>>>> - Is there any official documentation for using Kafka >>>>>> with pyFlink? Is this officially supported? >>>>>> - How does the method described in the link work? Does the Flink >>>>>> runtime load and expose all JARs in /lib to the python script? Can I >>>>>> write >>>>>> custom operators in Java and use those through python? >>>>>> >>>>>> Thanks, >>>>>> Manas >>>>>> >>>>>