Hi everyone,
I have talked with Chesnay and Danny offline. Danny and I were not very happy 
with the passing Maps around, and were looking for a neater design. Chesnay 
suggested that we could move the new format to the Kafka connector, then pass 
the Kafka record down to the deserialize logic so it can make use of the 
headers during deserialization and serialisation.

I think this is a neat idea. This would mean:
- the Kafka connector code would need to be updated to pass down the Kafka 
record
- there would be the Avro Apicurio format and SQL in the kafka repository. We 
feel it is unlikely to want to use the Apicurio registry with files, as the 
Avro format could be used.

Unfortunately I have found that this as not so straight forward to implement as 
the Avro Apicurio format uses the Avro format, which is tied to the 
DeserializationSchema. We were hoping to have a new decoding implementation 
that would pass down the Kafka record rather than the payload. This does not 
appear possible without a Avro format change.


Inspired by this idea, I notice that KafkaValueOnlyRecordDeserializerWrapper<T> 
extends KafkaValueOnlyDeserializerWrapper

Does

deserializer.deserialize(record.topic(),record.value())



I am investigating If I can add a factory/reflection to provide an alternative
Implementation that will pass the record based (the kafka record is not 
serializable so I will pick what we need and deserialize) as a byte array.

I would need to do this 4 times (value ,key for deserialisation and 
serialisation. To do this I would need to convert the record into a byte array, 
so it fits into the existing interface (DeserializationSchema).  I think this 
could be a way through, to avoid using maps and avoid changing the existing 
Avro format and avoid change any core Flink interfaces.

I am going to prototype this idea. WDYT?

My thanks go to Chesnay and Danny for their support and insight around this 
Flip,
   Kind regards, David.






From: David Radley <david_rad...@uk.ibm.com>
Date: Wednesday, 29 May 2024 at 11:39
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] RE: [DISCUSS] FLIP-XXX Apicurio-avro format
Hi Danny,
Thank you for your feedback on this.

I agree that using maps has pros and cons. The maps are flexible, but do 
require the sender and receiver to know what is in the map.

When you say “That sounds like it would fit in better, I assume we cannot just 
take that approach?” The motivation behind this Flip is to support the headers 
which is the usual way that Apicurio runs. We will support the “schema id in 
the payload” as well.

I agree with you when you say “ I am not 100% happy with the solution but I
cannot offer a better option.” – this is a pragmatic way we have found to solve 
this issue. I am open to any suggestions to improve this as well.

If we are going with the maps design (which is the best we have at the moment) 
; it would be good to have the Flink core changes in base Flink version 2.0 as 
this would mean we do not need to use reflection in a Flink Kafka version 2 
connector to work out if the runtime Flink has the new methods.

At this stage we only have one committer (yourself) backing this. Do you know 
of other 2 committers who would support this Flip?

     Kind regards, David.



From: Danny Cranmer <dannycran...@apache.org>
Date: Friday, 24 May 2024 at 19:32
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
Hello,

> I am curious what you mean by abused.

I just meant we will end up adding more and more fields to this map over
time, and it may be hard to undo.

> For Apicurio it can be sent at the start of the payload like Confluent
Avro does. Confluent Avro have a magic byte followed by 4 bytes of schema
id, at the start of the payload. Apicurio clients and SerDe libraries can
be configured to not put the schema id in the headers in which case there
is a magic byte followed by an 8 byte schema at the start of the payload.
In the deserialization case, we would not need to look at the headers –
though the encoding is also in the headers.

That sounds like it would fit in better, I assume we cannot just take that
approach?

Thanks for the discussion. I am not 100% happy with the solution but I
cannot offer a better option. I would be interested to hear if others have
any suggestions. Playing devil's advocate against myself, we pass maps
around to configure connectors so it is not too far away from that.

Thanks,
Danny


On Fri, May 24, 2024 at 2:23 PM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi Danny,
> No worries, thanks for replying. I have working prototype code that is
> being reviewed. It needs some cleaning up and more complete testing before
> it is ready, but will give you the general idea [1][2] to help to assess
> this approach.
>
>
> I am curious what you mean by abused. I guess the approaches are between
> generic map, mechanism vs a more particular more granular things being
> passed that might be used by another connector.
>
> Your first question:
> “how would this work if the schema ID is not in the Kafka headers, as
> hinted to in the FLIP "usually the global ID in a Kafka header"?
>
> For Apicurio it can be sent at the start of the payload like Confluent
> Avro does. Confluent Avro have a magic byte followed by 4 bytes of schema
> id, at the start of the payload. Apicurio clients and SerDe libraries can
> be configured to not put the schema id in the headers in which case there
> is a magic byte followed by an 8 byte schema at the start of the payload.
> In the deserialization case, we would not need to look at the headers –
> though the encoding is also in the headers.
>
> Your second question:
> “I am wondering if there are any other instances where the source would be
> aware of the schema ID and pass it through in this way?
> ”
> The examples I can think of are:
> - Avro can send the complete schema in a header, this is not recommended
> but in theory fits the need for a message payload to require something else
> to get the structure.
> - I see [2] that Apicurio Protobuf uses headers.
> - it might be that other message queuing projects like Rabbit MQ would
> need this to be able to support Apicurio Avro & protobuf.
>
> Kind regards, David,
>
>
>
>
> [1] https://github.com/apache/flink/pull/24715
> [2] https://github.com/apache/flink-connector-kafka/pull/99
> [3]
> https://www.apicur.io/registry/docs/apicurio-registry/2.5.x/getting-started/assembly-configuring-kafka-client-serdes.html#registry-serdes-types-json_registry
>
>
>
>
>
>
>
>
>
>
>
>
> From: Danny Cranmer <dannycran...@apache.org>
> Date: Friday, 24 May 2024 at 12:22
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hello,
>
> Apologies, I am on vacation and have limited access to email.
>
> I can see the logic here and why you ended up where you did. I can also see
> there are other useful metadata fields that we might want to pass through,
> which might result in this Map being abused (Kafka Topic, Kinesis Shard,
> etc).
>
> I have a follow up question, how would this work if the schema ID is not in
> the Kafka headers, as hinted to in the FLIP "usually the global ID in a
> Kafka header"? I am wondering if there are any other instances where the
> source would be aware of the schema ID and pass it through in this way?
>
> Thanks,
> Danny
>
>
>
> On Wed, May 22, 2024 at 3:43 PM David Radley <david_rad...@uk.ibm.com>
> wrote:
>
> > Hi Danny,
> > Did you have a chance you have a look at my responses to your feedback? I
> > am hoping to keep the momentum going on this one,   kind regards, David.
> >
> >
> > From: David Radley <david_rad...@uk.ibm.com>
> > Date: Tuesday, 14 May 2024 at 17:21
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: [EXTERNAL] [DISCUSS] FLIP-XXX Apicurio-avro format
> > Hi Danny,
> >
> > Thank you very much for the feedback and your support. I have copied your
> > feedback from the VOTE thread to this discussion thread, so we can
> continue
> > our discussions off the VOTE thread.
> >
> >
> >
> > Your feedback:
> >
> > Thanks for Driving this David. I am +1 for adding support for the new
> >
> > format, however have some questions/suggestions on the details.
> >
> >
> >
> > 1. Passing around Map<String, Object> additionalInputProperties feels a
> bit
> >
> > dirty. It looks like this is mainly for the Kafka connector. This
> connector
> >
> > already has a de/serialization schema extension to access record
> >
> > headers, KafkaRecordDeserializationSchema [1], can we use this instead?
> >
> > 2. Can you elaborate why we need to change the SchemaCoder interface?
> Again
> >
> > I am not a fan of adding these Map parameters
> >
> > 3. I assume this integration will go into the core Flink repo under
> >
> > flink-formats [2], and not be a separate repository like the connectors?
> >
> >
> >
> > My response:
> >
> > Addressing 1. and 2.
> >
> > I agree that sending maps around is a bit dirty. If we can see a better
> > way that would be great. I was looking for a way to pass this kafka
> header
> > information in a non-Kafka way - the most obvious way I could think was
> as
> > a map. Here are the main considerations I saw, if I have missed anything
> or
> > could improve something I would be grateful for any further feedback.
> >
> >
> >
> >   *   I see KafkaRecordDeserializationSchema is a Kafka interface that
> > works at the Kafka record level (so includes the headers). We need a
> > mechanism to send over the headers from the Kafka record to Flink
> >   *   Flink core is not aware of Kafka headers, and I did not want to add
> > a Kafka dependancy to core flink.
> >   *   The formats are stateless so it did not appear to be in fitting
> with
> > the Flink architecture to pass through header information to stash in
> state
> > in the format waiting for the deserialise to be subsequently called to
> pick
> > up the header information.
> >   *   We could have used Thread local storage to stash the header
> content,
> > but this would be extra state to manage; and this would seem like an
> > obtrusive change.
> >   *   The SchemaCoder deserialise is where Confluent Avro gets the schema
> > id from the payload, so it can lookup the schema. In line with this
> > approach it made sense to extend the deserialise so it had the header
> > contents so the Apicurio Avro format could lookup the schema.
> >   *   I did not want to have Apicurio specific logic in the Kafka
> > connector, if we did we could pull out the appropriate headers and only
> > send over the schema ids.
> >   *   For deserialise, the schema id we are interested in is the one in
> > the Kafka headers on the message and is for the writer schema (an Avro
> > format concept) currently used by the confluent-avro format in
> deserialize.
> >   *   For serialise the schema ids need to be obtained from apicurio then
> > passed through to Kafka.
> >   *   For serialise there is existing logic around handling the metadata
> > which includes passing the headers. But the presence of the metadata
> would
> > imply we have a metadata column. Maybe a change to the metadata mechanism
> > may have allowed to use to pass the headers, but not create a metadata
> > column; instead I pass through the additional headers in a map to be
> > appended.
> >
> >
> >
> > 3.
> >
> > Yes this integration will go into the core Flink repo under
> >
> > flink-formats and sit next to the confluent-avro format. The Avro format
> > has the concept of a Registry and drives the confluent-avro format. The
> > Apicurio Avro format will use the same approach.
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to