DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema are still not supported in
Python DataStream API.

Just take a further look at the Java implementation of
DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
the results type is RowData instead of Row and so it should be not that
easy to be directly supported in Python DataStream API. However, it
supports conversion between Table API & DataStream API[1]. Could you
firstly create a Table which consumes data from kafka and then convert it
to a DataStream API?

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors

On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote:

> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com>
> wrote:
>
>> Thank Dian !! Very appreciate this.
>>
>> However, I have another questions related to this. In current version or
>> any updating in future, does DataStream support
>> DebeziumAvroRowDeserializationSchema and
>> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
>> documentation and seem it is not supported yet.
>>
>> Best,
>> Quynh
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *Dian Fu <dian0511...@gmail.com>
>> *Sent: *Friday, April 22, 2022 9:36 PM
>> *To: *lan tran <indigoblue7...@gmail.com>
>> *Cc: *user@flink.apache.org
>> *Subject: *Re: AvroRowDeserializationSchema
>>
>>
>>
>> Hi Quynh,
>>
>> I have added an example on how to use AvroRowDeserializationSchema in
>> Python DataStream API in [1]. Please take a look at if that helps for you~
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>>
>>
>>
>> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote:
>>
>> Hi Quynh,
>>
>> Could you show some sample code on how you use it?
>>
>> Regards,
>> Dian
>>
>>
>>
>> On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com>
>> wrote:
>>
>> Wonder if this is a bug or not but if I use
>> *AvroRowDeserializationSchema,*
>>
>> In PyFlink the error still occure ?
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
>> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
>> org.apache.avro.Schema$RecordSchema]) does not exist
>>
>> Therefore, please help check. Thanks
>> Best,
>> Quynh
>>
>>
>>
>>
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *lan tran <indigoblue7...@gmail.com>
>> *Sent: *Thursday, April 21, 2022 1:43 PM
>> *To: *user@flink.apache.org
>> *Subject: *AvroRowDeserializationSchema
>>
>>
>>
>> Hi team,
>>
>> I want to implement AvroRowDeserializationSchema when consume data from
>> Kafka, however from the documentation, I did not understand what are
>> avro_schema_string and record_class ? I would be great if you can give me
>> the example on this (I only have the example on Java, however, I was doing
>> it using PyFlink ).
>>
>> As my understanding avro_schema_string is schema_registry_url ? Does it
>> support this
>>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
>> in TableAPI ?
>>
>> Best,
>> Quynh.
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to