Re: Unable to deserialize Avro data using Pyflink

2021-05-21 Thread Zerah J
ted in PyFlink currently. Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? Could you please suggest how to do this or any othe

Re: Unable to deserialize Avro data using Pyflink

2021-05-20 Thread Zerah J
;name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently. Is there a way to convert this Generic Records returned by Con

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
.org.apache.avro.Schema.Parser > value_schema = JSchemaParser().parse(value_schema_str) > ``` > > The reason is that ```value_schema = avro.schema.parse( here>) ``` will create a Python object instead of Java object. > > Regards, > Dian > > 2021年5月19日 下午5:23,Z

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
= "http://host:port; How can I create Java Schema object from this schema string and pass it from python method ? Regards, Zerah On Wed, May 19, 2021 at 1:57 PM Dian Fu wrote: > Hi Zerah, > > What’s the type of value_schema? It should be a Java object of type > Schema. From the e

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
plementation and so it’s will be very easy to implement. You > could take a look at AvroRowDeserializationSchema [1] as an example. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 > > > 2021年5月17日 下午5:3

Unable to deserialize Avro data using Pyflink

2021-05-17 Thread Zerah J
Hi, I have below use case 1. Read streaming data from Kafka topic using Flink Python API 2. Apply transformations on the data stream 3. Write back to different kafka topics based on the incoming data Input data is coming from Confluent Avro Producer. By using the existing