Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
method from Python. But it's not working. Kindly check the code snippet
below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema
= None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \

.org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema =
JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema,
self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using
MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "http://host:port";
deserialization_schema =
MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id':
'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema =
MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema =
JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line
298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'



Please suggest how this method should be called. Here the schema used is
avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Zerah,
>
> I guess you could provide a Python implementation for
> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
> for the Java implementation and so it’s will be very easy to implement. You
> could take a look at AvroRowDeserializationSchema [1] as an example.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>
> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道:
> >
> > Hi,
> >
> > I have below use case
> >
> > 1. Read streaming data from Kafka topic using Flink Python API
> > 2. Apply transformations on the data stream
> > 3. Write back to different kafka topics based on the incoming data
> >
> > Input data is coming from Confluent Avro Producer. By using the existing
> pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to
> deserialize the data.
> >
> > Please help to process the data as
> ConfluentRegistryAvroDeserializationSchema is not available in the Python
> API.
> >
> > Regards,
> > Zerah
>
>

Reply via email to