Hi Dian, Thanks for your suggestion.
I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below : class MyAvroRowDeserializationSchema(DeserializationSchema): def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None): JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema) FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema : value_schema = avro.schema.parse(<reader schema goes here>) schema_url = "http://host:port" deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) kafka_source = FlinkKafkaConsumer( topics='my_topic', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'}) I'm getting the below error : Traceback (most recent call last): File "flinkKafkaAvro.py", line 70, in datastream_api_demo deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__ args_command, temp_args = self._build_args(*args) File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args [get_command_part(arg, self.pool) for arg in new_args]) File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp> [get_command_part(arg, self.pool) for arg in new_args]) File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part command_part = REFERENCE_TYPE + parameter._get_object_id() AttributeError: 'RecordSchema' object has no attribute '_get_object_id' Please suggest how this method should be called. Here the schema used is avro schema. Regards, Zerah On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Zerah, > > I guess you could provide a Python implementation for > ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper > for the Java implementation and so it’s will be very easy to implement. You > could take a look at AvroRowDeserializationSchema [1] as an example. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 > > > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道: > > > > Hi, > > > > I have below use case > > > > 1. Read streaming data from Kafka topic using Flink Python API > > 2. Apply transformations on the data stream > > 3. Write back to different kafka topics based on the incoming data > > > > Input data is coming from Confluent Avro Producer. By using the existing > pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to > deserialize the data. > > > > Please help to process the data as > ConfluentRegistryAvroDeserializationSchema is not available in the Python > API. > > > > Regards, > > Zerah > >