+1 to this

A few small comments: 

Currently, if users have Avro schemas in an Apicurio Registry (an open source 
Apache 2 licensed schema registry), then the natural way to work with those 
Avro flows is to use the schemas in the Apicurio Repository.
'those Avro flows' ... this is the first reference to flows.

The new format will use the global Id to look up the Avro schema that the 
message was written during deserialization.
I get the point, phrasing is awkward. Probably you're more interested in 
content than word polish at this point though.

The Avro Schema Registry (apicurio-avro) format
The Confluent format is called avro-confluent; this should be avro-apicurio

How to create tables with Apicurio-avro format
s/Apicurio-avro/avro-apicurio/g

HEADER – globalId is put in the header
LEGACY– global Id is put in the message as a long
CONFLUENT - globalId is put in the message as an int.
Please could we specify 'four-byte int' and 'eight-byte long' ?

For a Kafka source the globalId will be looked for in this order:
-       In the header
-       After a magic byte as an int
-       After a magic byte as a long.
but apicurio-avro.globalid-placement has a default value of HEADER : why do we 
have a search order as well? Isn't apicurio-avro.globalid-placement enough? 
Don't the two mechanisms conflict?

In addition to the types listed there, Flink supports reading/writing nullable 
types. Flink maps nullable types to Avro union(something, null), where 
something is the Avro type converted from Flink type.
Is that definitely the right way round? I know we've had multiple conversations 
about how unions work with Flink

 This is because the writer schema is expanded, but this could not complete if 
there are circularities.
I understand your meaning but the sentence is awkward.

The registered schema will be created or if it exists be updated.
same again

At some stage the lowest Flink level supported by the Kafka connector will 
contain the additionalProperties methods in code flink.
wording

There existing Kafka deserialization for the writer schema passes down the 
message body to be deserialised.
wording

@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> 
out)
        throws IOException {
        Map<String, Object> additionalPropertiesMap =  new HashMap<>();
        for (Header header : message.additionalProperties()) {
        headersMap.put(header.key(), header.value());
        }
        deserializationSchema.deserialize(message.value(), headersMap, out);
}
This fails to compile at headersMap.

The input stream and additionalProperties will be sent so the Apicurio 
SchemaCoder which will try getting the globalId from the headers, then 4 bytes 
from the payload then 8 bytes from the payload.
I'm still stuck on apicurio-avro.globalid-placement having a default value of 
HEADER . Should we try all three, or fail if this config param has a wrong 
value?

Other considerations
The implementation does not use the Apicurio deser libraries,
Please can we refer to them as SerDes; this is the term used within the 
documentation that you link to


On 2024/03/20 10:09:08 David Radley wrote:
> Hi,
> As per the FLIP process I would like to raise a FLIP, but do not have 
> authority, so have created a google doc for the Flip to introduce a new 
> Apicurio Avro format. The document is 
> https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing
> 
> I have prototyped a lot of the content to prove that this approach is 
> feasible. I look forward to the discussion,
>       Kind regards, David.
> 
> 
> 
> Unless otherwise stated above:
> 
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> 

Reply via email to