RE: Additional metadata available for Kafka serdes

2024-03-20 Thread David Radley
Hi Balint,
Excellent, I have just put up a discussion thread on the dev list for a new Fli 
top add this, please review and feedback and +1 if this is what you are looking 
for.
At the moment, I have not got the topic name in there. I wonder where we would 
use the topic name during deserialization or serialization? The only place I 
could see it being used could be in serialization, where we register a 
(potentially new) schema, but this may not be desired as the schema could be a 
nested schema.
WDYT
 kind regards, David.

From: Balint Bene 
Date: Thursday, 14 March 2024 at 20:16
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: Additional metadata available for Kafka serdes
Hi David!

I think passing the headers as a map (as opposed to
ConsumerRecord/ProducerRecord) is a great idea that should work. That way
the core Flink package doesn't have Kafka dependencies, it seems like
they're meant to be decoupled anyway. The one bonus that using the Record
objects has is that it also provides the topic name, which is a part of the
signature (but usually unused) for Kafka serdes. Do you think it's
worthwhile to also have the topic name included in the signature along with
the map?

Happy to test things out, provide feedback. I'm not working on an Apicurio
format myself, but the use case is very similar.

Thanks,
Balint

On Thu, Mar 14, 2024 at 12:41 PM David Radley 
wrote:

> Hi ,
> I am currently prototyping an Avro Apicurio format that I hope to raise as
> a FLIP very soon (hopefully by early  next week). In my prototyping , I am
> passing through the Kafka headers content as a map to the
> DeserializationSchema and have extended the SerializationSchema to pass
> back headers. I am using new default methods in the interface so as to be
> backwardly compatible. I have the deserialise working and the serialise is
> close.
>
> We did consider trying to use the Apicurio deser libraries but this is
> tricky due to the way the code is split.
>
> Let me know what you think – I hope this approach will meet your needs,
> Kind regards, David.
>
> From: Balint Bene 
> Date: Tuesday, 12 March 2024 at 22:18
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Additional metadata available for Kafka serdes
> Hello! Looking to get some guidance for a problem around the Flink formats
> used for Kafka.
>
> Flink currently uses common serdes interfaces across all formats. However,
> some data formats used in Kafka require headers for serdes.  It's the same
> problem for serialization and deserialization, so I'll just use
> DynamicKafkaDeserialationSchema
> <
> https://github.com/Shopify/shopify-flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L130
> >
> as
> an example. It has access to the Kafka record headers, but it can't pass
> them to the DeserializationSchema
> <
> https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java#L81
> >
> implemented
> by the format since the interface is generic.
>
> If it were possible to pass the headers, then open source formats such as
> Apicurio could be supported. Unlike the Confluent formats which store the
> metadata (schema ID) appended to the serialized bytes in the key and value,
> the Apicurio formats store their metadata in the record headers.
>
> I have bandwidth to work on this, but it would be great to have direction
> from the community. I have a simple working prototype that's able to load a
> custom version of the format with a modified interface that can accept the
> headers (I just put the entire Apache Kafka ConsumerRecord
> <
> https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> >
> /ProducerRecord
> <
> https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> for simplicity). The issues I foresee is that the class-loader
> <
> https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
> >
> exists in the Flink repo along with interfaces for the formats, but these
> changes are specific to Kafka. This solution could require migrating
> formats to the Flink-connector-kafka repo which is a decent amount of work.
>
> Feedback is appreciated!
> Thanks
> Balint
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: Additional metadata available for Kafka serdes

2024-03-14 Thread Balint Bene
Hi David!

I think passing the headers as a map (as opposed to
ConsumerRecord/ProducerRecord) is a great idea that should work. That way
the core Flink package doesn't have Kafka dependencies, it seems like
they're meant to be decoupled anyway. The one bonus that using the Record
objects has is that it also provides the topic name, which is a part of the
signature (but usually unused) for Kafka serdes. Do you think it's
worthwhile to also have the topic name included in the signature along with
the map?

Happy to test things out, provide feedback. I'm not working on an Apicurio
format myself, but the use case is very similar.

Thanks,
Balint

On Thu, Mar 14, 2024 at 12:41 PM David Radley 
wrote:

> Hi ,
> I am currently prototyping an Avro Apicurio format that I hope to raise as
> a FLIP very soon (hopefully by early  next week). In my prototyping , I am
> passing through the Kafka headers content as a map to the
> DeserializationSchema and have extended the SerializationSchema to pass
> back headers. I am using new default methods in the interface so as to be
> backwardly compatible. I have the deserialise working and the serialise is
> close.
>
> We did consider trying to use the Apicurio deser libraries but this is
> tricky due to the way the code is split.
>
> Let me know what you think – I hope this approach will meet your needs,
> Kind regards, David.
>
> From: Balint Bene 
> Date: Tuesday, 12 March 2024 at 22:18
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Additional metadata available for Kafka serdes
> Hello! Looking to get some guidance for a problem around the Flink formats
> used for Kafka.
>
> Flink currently uses common serdes interfaces across all formats. However,
> some data formats used in Kafka require headers for serdes.  It's the same
> problem for serialization and deserialization, so I'll just use
> DynamicKafkaDeserialationSchema
> <
> https://github.com/Shopify/shopify-flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L130
> >
> as
> an example. It has access to the Kafka record headers, but it can't pass
> them to the DeserializationSchema
> <
> https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java#L81
> >
> implemented
> by the format since the interface is generic.
>
> If it were possible to pass the headers, then open source formats such as
> Apicurio could be supported. Unlike the Confluent formats which store the
> metadata (schema ID) appended to the serialized bytes in the key and value,
> the Apicurio formats store their metadata in the record headers.
>
> I have bandwidth to work on this, but it would be great to have direction
> from the community. I have a simple working prototype that's able to load a
> custom version of the format with a modified interface that can accept the
> headers (I just put the entire Apache Kafka ConsumerRecord
> <
> https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> >
> /ProducerRecord
> <
> https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> for simplicity). The issues I foresee is that the class-loader
> <
> https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
> >
> exists in the Flink repo along with interfaces for the formats, but these
> changes are specific to Kafka. This solution could require migrating
> formats to the Flink-connector-kafka repo which is a decent amount of work.
>
> Feedback is appreciated!
> Thanks
> Balint
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: Additional metadata available for Kafka serdes

2024-03-14 Thread David Radley
Hi ,
I am currently prototyping an Avro Apicurio format that I hope to raise as a 
FLIP very soon (hopefully by early  next week). In my prototyping , I am 
passing through the Kafka headers content as a map to the DeserializationSchema 
and have extended the SerializationSchema to pass back headers. I am using new 
default methods in the interface so as to be backwardly compatible. I have the 
deserialise working and the serialise is close.

We did consider trying to use the Apicurio deser libraries but this is tricky 
due to the way the code is split.

Let me know what you think – I hope this approach will meet your needs,
Kind regards, David.

From: Balint Bene 
Date: Tuesday, 12 March 2024 at 22:18
To: dev@flink.apache.org 
Subject: [EXTERNAL] Additional metadata available for Kafka serdes
Hello! Looking to get some guidance for a problem around the Flink formats
used for Kafka.

Flink currently uses common serdes interfaces across all formats. However,
some data formats used in Kafka require headers for serdes.  It's the same
problem for serialization and deserialization, so I'll just use
DynamicKafkaDeserialationSchema
<https://github.com/Shopify/shopify-flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L130
 >
as
an example. It has access to the Kafka record headers, but it can't pass
them to the DeserializationSchema
<https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java#L81
 >
implemented
by the format since the interface is generic.

If it were possible to pass the headers, then open source formats such as
Apicurio could be supported. Unlike the Confluent formats which store the
metadata (schema ID) appended to the serialized bytes in the key and value,
the Apicurio formats store their metadata in the record headers.

I have bandwidth to work on this, but it would be great to have direction
from the community. I have a simple working prototype that's able to load a
custom version of the format with a modified interface that can accept the
headers (I just put the entire Apache Kafka ConsumerRecord
<https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
 >
/ProducerRecord
<https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
 >
for simplicity). The issues I foresee is that the class-loader
<https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
 >
exists in the Flink repo along with interfaces for the formats, but these
changes are specific to Kafka. This solution could require migrating
formats to the Flink-connector-kafka repo which is a decent amount of work.

Feedback is appreciated!
Thanks
Balint

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Additional metadata available for Kafka serdes

2024-03-12 Thread Balint Bene
Hello! Looking to get some guidance for a problem around the Flink formats
used for Kafka.

Flink currently uses common serdes interfaces across all formats. However,
some data formats used in Kafka require headers for serdes.  It's the same
problem for serialization and deserialization, so I'll just use
DynamicKafkaDeserialationSchema

as
an example. It has access to the Kafka record headers, but it can't pass
them to the DeserializationSchema

implemented
by the format since the interface is generic.

If it were possible to pass the headers, then open source formats such as
Apicurio could be supported. Unlike the Confluent formats which store the
metadata (schema ID) appended to the serialized bytes in the key and value,
the Apicurio formats store their metadata in the record headers.

I have bandwidth to work on this, but it would be great to have direction
from the community. I have a simple working prototype that's able to load a
custom version of the format with a modified interface that can accept the
headers (I just put the entire Apache Kafka ConsumerRecord

/ProducerRecord

for simplicity). The issues I foresee is that the class-loader

exists in the Flink repo along with interfaces for the formats, but these
changes are specific to Kafka. This solution could require migrating
formats to the Flink-connector-kafka repo which is a decent amount of work.

Feedback is appreciated!
Thanks
Balint