Hi Jeyhun,
Thanks for your feedback.

So for outbound messages, the message includes the global ID. We register the 
schema and match on the artifact id. So if the schema then evolved, adding a 
new  version, the global ID would still be unique and the same version would be 
targeted. If you wanted to change the Flink table definition in line with a 
higher version, then you could do this – the artifact id would need to match 
for it to use the same schema and a higher artifact version would need to be 
provided. I notice that Apicurio has rules around compatibility that you can 
configure, I suppose if we attempt to create an artifact that breaks these 
rules , then the register schema will fail and the associated operation should 
fail (e.g. an insert). I have not tried this.


For inbound messages, using the global id in the header – this targets one 
version of the schema. I can create different messages on the topic built with 
different schema versions, and I can create different tables in Flink, as long 
as the reader and writer schemas are compatible as per the 
https://github.com/apache/flink/blob/779459168c46b7b4c600ef52f99a5435f81b9048/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L109
Then this should work.

Does this address your question?
    Kind regards, David.


From: Jeyhun Karimov <je.kari...@gmail.com>
Date: Thursday, 21 March 2024 at 21:06
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
Hi David,

Thanks for the FLIP. +1 for it.
I have a minor comment.

Can you please elaborate more on mechanisms in place to ensure data
consistency and integrity, particularly in the event of schema conflicts?
Since each message includes a schema ID for inbound and outbound messages,
can you elaborate more on message consistency in the context of schema
evolution?

Regards,
Jeyhun





On Wed, Mar 20, 2024 at 4:34 PM David Radley <david...@apache.org> wrote:

> Thank you very much for your feedback Mark. I have made the changes in the
> latest google document. On reflection I agree with you that the
> globalIdPlacement format configuration should apply to the deserialization
> as well, so it is declarative. I am also going to have a new configuration
> option to work with content IDs as well as global IDs. In line with the
> deser Apicurio IdHandler and headerHandlers.
>
>  kind regards, David.
>
>
> On 2024/03/20 15:18:37 Mark Nuttall wrote:
> > +1 to this
> >
> > A few small comments:
> >
> > Currently, if users have Avro schemas in an Apicurio Registry (an open
> source Apache 2 licensed schema registry), then the natural way to work
> with those Avro flows is to use the schemas in the Apicurio Repository.
> > 'those Avro flows' ... this is the first reference to flows.
> >
> > The new format will use the global Id to look up the Avro schema that
> the message was written during deserialization.
> > I get the point, phrasing is awkward. Probably you're more interested in
> content than word polish at this point though.
> >
> > The Avro Schema Registry (apicurio-avro) format
> > The Confluent format is called avro-confluent; this should be
> avro-apicurio
> >
> > How to create tables with Apicurio-avro format
> > s/Apicurio-avro/avro-apicurio/g
> >
> > HEADER – globalId is put in the header
> > LEGACY– global Id is put in the message as a long
> > CONFLUENT - globalId is put in the message as an int.
> > Please could we specify 'four-byte int' and 'eight-byte long' ?
> >
> > For a Kafka source the globalId will be looked for in this order:
> > -     In the header
> > -     After a magic byte as an int
> > -     After a magic byte as a long.
> > but apicurio-avro.globalid-placement has a default value of HEADER : why
> do we have a search order as well? Isn't apicurio-avro.globalid-placement
> enough? Don't the two mechanisms conflict?
> >
> > In addition to the types listed there, Flink supports reading/writing
> nullable types. Flink maps nullable types to Avro union(something, null),
> where something is the Avro type converted from Flink type.
> > Is that definitely the right way round? I know we've had multiple
> conversations about how unions work with Flink
> >
> >  This is because the writer schema is expanded, but this could not
> complete if there are circularities.
> > I understand your meaning but the sentence is awkward.
> >
> > The registered schema will be created or if it exists be updated.
> > same again
> >
> > At some stage the lowest Flink level supported by the Kafka connector
> will contain the additionalProperties methods in code flink.
> > wording
> >
> > There existing Kafka deserialization for the writer schema passes down
> the message body to be deserialised.
> > wording
> >
> > @Override
> > public void deserialize(ConsumerRecord<byte[], byte[]> message,
> Collector<T> out)
> >       throws IOException {
> >       Map<String, Object> additionalPropertiesMap =  new HashMap<>();
> >       for (Header header : message.additionalProperties()) {
> >       headersMap.put(header.key(), header.value());
> >       }
> >       deserializationSchema.deserialize(message.value(), headersMap,
> out);
> > }
> > This fails to compile at headersMap.
> >
> > The input stream and additionalProperties will be sent so the Apicurio
> SchemaCoder which will try getting the globalId from the headers, then 4
> bytes from the payload then 8 bytes from the payload.
> > I'm still stuck on apicurio-avro.globalid-placement having a default
> value of HEADER . Should we try all three, or fail if this config param has
> a wrong value?
> >
> > Other considerations
> > The implementation does not use the Apicurio deser libraries,
> > Please can we refer to them as SerDes; this is the term used within the
> documentation that you link to
> >
> >
> > On 2024/03/20 10:09:08 David Radley wrote:
> > > Hi,
> > > As per the FLIP process I would like to raise a FLIP, but do not have
> authority, so have created a google doc for the Flip to introduce a new
> Apicurio Avro format. The document is
> https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing
> > >
> > > I have prototyped a lot of the content to prove that this approach is
> feasible. I look forward to the discussion,
> > >       Kind regards, David.
> > >
> > >
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> >
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to