Hi Kevin,
I have agreed a design with Chesnay and Danny. I am implementing a prototype, 
to prove it works,  then will update the Flip text with the new design. Initial 
testing is showing it working.

Here is a quick history so you can understand our current thinking.

  1.  Initially we passed maps for header information from the kafka connector 
to Flink for deserialization. Similar for serialize. This was not great, 
because maps are not ideal and it was a big change as it needed core Flink 
interface changes
  2.  We then moved the Avro Apicurio format to the Kafka connector and looked 
to discover a new record based de/serialization interface. So we could pass 
down the record (containing the headers) rather than the payload. This did not 
work, because there is a dependence on the Avro connector that is not  aware of 
the new interface.
  3.  We considered using Thread local storage to pass the headers, we did not 
like this as there was a risk of memory leaks if we did not manage the thread 
well, also the contract is hidden.
  4.  We then came up with the current design that augments the deserialization 
in the Kafka connector in a new discovered record based deserialization, it 
then takes the headers out in the schema coder, leaving the message as it was. 
Similar for serialization.


One piece I need to work out the details of, is how to work when there are 2 
implementations that can be discovered, probably using an augmented format name 
as a factory identifier,

I hope to put up a new design in the Flip by the end of next week, for wider 
review,
    Kind regards, David.


From: Kevin Lam <kevin....@shopify.com.INVALID>
Date: Monday, 8 July 2024 at 21:16
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
Hi David,

Any updates on the Kafka Message Header support? I am also interested in
supporting headers with the Flink SQL Formats:
https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6

On Fri, Jun 14, 2024 at 6:10 AM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi everyone,
> I have talked with Chesnay and Danny offline. Danny and I were not very
> happy with the passing Maps around, and were looking for a neater design.
> Chesnay suggested that we could move the new format to the Kafka connector,
> then pass the Kafka record down to the deserialize logic so it can make use
> of the headers during deserialization and serialisation.
>
> I think this is a neat idea. This would mean:
> - the Kafka connector code would need to be updated to pass down the Kafka
> record
> - there would be the Avro Apicurio format and SQL in the kafka repository.
> We feel it is unlikely to want to use the Apicurio registry with files, as
> the Avro format could be used.
>
> Unfortunately I have found that this as not so straight forward to
> implement as the Avro Apicurio format uses the Avro format, which is tied
> to the DeserializationSchema. We were hoping to have a new decoding
> implementation that would pass down the Kafka record rather than the
> payload. This does not appear possible without a Avro format change.
>
>
> Inspired by this idea, I notice that
> KafkaValueOnlyRecordDeserializerWrapper<T> extends
> KafkaValueOnlyDeserializerWrapper
>
> Does
>
> deserializer.deserialize(record.topic(),record.value())
>
>
>
> I am investigating If I can add a factory/reflection to provide an
> alternative
> Implementation that will pass the record based (the kafka record is not
> serializable so I will pick what we need and deserialize) as a byte array.
>
> I would need to do this 4 times (value ,key for deserialisation and
> serialisation. To do this I would need to convert the record into a byte
> array, so it fits into the existing interface (DeserializationSchema).  I
> think this could be a way through, to avoid using maps and avoid changing
> the existing Avro format and avoid change any core Flink interfaces.
>
> I am going to prototype this idea. WDYT?
>
> My thanks go to Chesnay and Danny for their support and insight around
> this Flip,
>    Kind regards, David.
>
>
>
>
>
>
> From: David Radley <david_rad...@uk.ibm.com>
> Date: Wednesday, 29 May 2024 at 11:39
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hi Danny,
> Thank you for your feedback on this.
>
> I agree that using maps has pros and cons. The maps are flexible, but do
> require the sender and receiver to know what is in the map.
>
> When you say “That sounds like it would fit in better, I assume we cannot
> just take that approach?” The motivation behind this Flip is to support the
> headers which is the usual way that Apicurio runs. We will support the
> “schema id in the payload” as well.
>
> I agree with you when you say “ I am not 100% happy with the solution but I
> cannot offer a better option.” – this is a pragmatic way we have found to
> solve this issue. I am open to any suggestions to improve this as well.
>
> If we are going with the maps design (which is the best we have at the
> moment) ; it would be good to have the Flink core changes in base Flink
> version 2.0 as this would mean we do not need to use reflection in a Flink
> Kafka version 2 connector to work out if the runtime Flink has the new
> methods.
>
> At this stage we only have one committer (yourself) backing this. Do you
> know of other 2 committers who would support this Flip?
>
>      Kind regards, David.
>
>
>
> From: Danny Cranmer <dannycran...@apache.org>
> Date: Friday, 24 May 2024 at 19:32
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hello,
>
> > I am curious what you mean by abused.
>
> I just meant we will end up adding more and more fields to this map over
> time, and it may be hard to undo.
>
> > For Apicurio it can be sent at the start of the payload like Confluent
> Avro does. Confluent Avro have a magic byte followed by 4 bytes of schema
> id, at the start of the payload. Apicurio clients and SerDe libraries can
> be configured to not put the schema id in the headers in which case there
> is a magic byte followed by an 8 byte schema at the start of the payload.
> In the deserialization case, we would not need to look at the headers –
> though the encoding is also in the headers.
>
> That sounds like it would fit in better, I assume we cannot just take that
> approach?
>
> Thanks for the discussion. I am not 100% happy with the solution but I
> cannot offer a better option. I would be interested to hear if others have
> any suggestions. Playing devil's advocate against myself, we pass maps
> around to configure connectors so it is not too far away from that.
>
> Thanks,
> Danny
>
>
> On Fri, May 24, 2024 at 2:23 PM David Radley <david_rad...@uk.ibm.com>
> wrote:
>
> > Hi Danny,
> > No worries, thanks for replying. I have working prototype code that is
> > being reviewed. It needs some cleaning up and more complete testing
> before
> > it is ready, but will give you the general idea [1][2] to help to assess
> > this approach.
> >
> >
> > I am curious what you mean by abused. I guess the approaches are between
> > generic map, mechanism vs a more particular more granular things being
> > passed that might be used by another connector.
> >
> > Your first question:
> > “how would this work if the schema ID is not in the Kafka headers, as
> > hinted to in the FLIP "usually the global ID in a Kafka header"?
> >
> > For Apicurio it can be sent at the start of the payload like Confluent
> > Avro does. Confluent Avro have a magic byte followed by 4 bytes of schema
> > id, at the start of the payload. Apicurio clients and SerDe libraries can
> > be configured to not put the schema id in the headers in which case there
> > is a magic byte followed by an 8 byte schema at the start of the payload.
> > In the deserialization case, we would not need to look at the headers –
> > though the encoding is also in the headers.
> >
> > Your second question:
> > “I am wondering if there are any other instances where the source would
> be
> > aware of the schema ID and pass it through in this way?
> > ”
> > The examples I can think of are:
> > - Avro can send the complete schema in a header, this is not recommended
> > but in theory fits the need for a message payload to require something
> else
> > to get the structure.
> > - I see [2] that Apicurio Protobuf uses headers.
> > - it might be that other message queuing projects like Rabbit MQ would
> > need this to be able to support Apicurio Avro & protobuf.
> >
> > Kind regards, David,
> >
> >
> >
> >
> > [1] https://github.com/apache/flink/pull/24715
> > [2] https://github.com/apache/flink-connector-kafka/pull/99
> > [3]
> >
> https://www.apicur.io/registry/docs/apicurio-registry/2.5.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-types-json_registry
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > From: Danny Cranmer <dannycran...@apache.org>
> > Date: Friday, 24 May 2024 at 12:22
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> > Hello,
> >
> > Apologies, I am on vacation and have limited access to email.
> >
> > I can see the logic here and why you ended up where you did. I can also
> see
> > there are other useful metadata fields that we might want to pass
> through,
> > which might result in this Map being abused (Kafka Topic, Kinesis Shard,
> > etc).
> >
> > I have a follow up question, how would this work if the schema ID is not
> in
> > the Kafka headers, as hinted to in the FLIP "usually the global ID in a
> > Kafka header"? I am wondering if there are any other instances where the
> > source would be aware of the schema ID and pass it through in this way?
> >
> > Thanks,
> > Danny
> >
> >
> >
> > On Wed, May 22, 2024 at 3:43 PM David Radley <david_rad...@uk.ibm.com>
> > wrote:
> >
> > > Hi Danny,
> > > Did you have a chance you have a look at my responses to your
> feedback? I
> > > am hoping to keep the momentum going on this one,   kind regards,
> David.
> > >
> > >
> > > From: David Radley <david_rad...@uk.ibm.com>
> > > Date: Tuesday, 14 May 2024 at 17:21
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format
> > > Hi Danny,
> > >
> > > Thank you very much for the feedback and your support. I have copied
> your
> > > feedback from the VOTE thread to this discussion thread, so we can
> > continue
> > > our discussions off the VOTE thread.
> > >
> > >
> > >
> > > Your feedback:
> > >
> > > Thanks for Driving this David. I am +1 for adding support for the new
> > >
> > > format, however have some questions/suggestions on the details.
> > >
> > >
> > >
> > > 1. Passing around Map<String, Object> additionalInputProperties feels a
> > bit
> > >
> > > dirty. It looks like this is mainly for the Kafka connector. This
> > connector
> > >
> > > already has a de/serialization schema extension to access record
> > >
> > > headers, KafkaRecordDeserializationSchema [1], can we use this instead?
> > >
> > > 2. Can you elaborate why we need to change the SchemaCoder interface?
> > Again
> > >
> > > I am not a fan of adding these Map parameters
> > >
> > > 3. I assume this integration will go into the core Flink repo under
> > >
> > > flink-formats [2], and not be a separate repository like the
> connectors?
> > >
> > >
> > >
> > > My response:
> > >
> > > Addressing 1. and 2.
> > >
> > > I agree that sending maps around is a bit dirty. If we can see a better
> > > way that would be great. I was looking for a way to pass this kafka
> > header
> > > information in a non-Kafka way - the most obvious way I could think was
> > as
> > > a map. Here are the main considerations I saw, if I have missed
> anything
> > or
> > > could improve something I would be grateful for any further feedback.
> > >
> > >
> > >
> > >   *   I see KafkaRecordDeserializationSchema is a Kafka interface that
> > > works at the Kafka record level (so includes the headers). We need a
> > > mechanism to send over the headers from the Kafka record to Flink
> > >   *   Flink core is not aware of Kafka headers, and I did not want to
> add
> > > a Kafka dependancy to core flink.
> > >   *   The formats are stateless so it did not appear to be in fitting
> > with
> > > the Flink architecture to pass through header information to stash in
> > state
> > > in the format waiting for the deserialise to be subsequently called to
> > pick
> > > up the header information.
> > >   *   We could have used Thread local storage to stash the header
> > content,
> > > but this would be extra state to manage; and this would seem like an
> > > obtrusive change.
> > >   *   The SchemaCoder deserialise is where Confluent Avro gets the
> schema
> > > id from the payload, so it can lookup the schema. In line with this
> > > approach it made sense to extend the deserialise so it had the header
> > > contents so the Apicurio Avro format could lookup the schema.
> > >   *   I did not want to have Apicurio specific logic in the Kafka
> > > connector, if we did we could pull out the appropriate headers and only
> > > send over the schema ids.
> > >   *   For deserialise, the schema id we are interested in is the one in
> > > the Kafka headers on the message and is for the writer schema (an Avro
> > > format concept) currently used by the confluent-avro format in
> > deserialize.
> > >   *   For serialise the schema ids need to be obtained from apicurio
> then
> > > passed through to Kafka.
> > >   *   For serialise there is existing logic around handling the
> metadata
> > > which includes passing the headers. But the presence of the metadata
> > would
> > > imply we have a metadata column. Maybe a change to the metadata
> mechanism
> > > may have allowed to use to pass the headers, but not create a metadata
> > > column; instead I pass through the additional headers in a map to be
> > > appended.
> > >
> > >
> > >
> > > 3.
> > >
> > > Yes this integration will go into the core Flink repo under
> > >
> > > flink-formats and sit next to the confluent-avro format. The Avro
> format
> > > has the concept of a Registry and drives the confluent-avro format. The
> > > Apicurio Avro format will use the same approach.
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to