+1 to this A few small comments:
Currently, if users have Avro schemas in an Apicurio Registry (an open source Apache 2 licensed schema registry), then the natural way to work with those Avro flows is to use the schemas in the Apicurio Repository. 'those Avro flows' ... this is the first reference to flows. The new format will use the global Id to look up the Avro schema that the message was written during deserialization. I get the point, phrasing is awkward. Probably you're more interested in content than word polish at this point though. The Avro Schema Registry (apicurio-avro) format The Confluent format is called avro-confluent; this should be avro-apicurio How to create tables with Apicurio-avro format s/Apicurio-avro/avro-apicurio/g HEADER – globalId is put in the header LEGACY– global Id is put in the message as a long CONFLUENT - globalId is put in the message as an int. Please could we specify 'four-byte int' and 'eight-byte long' ? For a Kafka source the globalId will be looked for in this order: - In the header - After a magic byte as an int - After a magic byte as a long. but apicurio-avro.globalid-placement has a default value of HEADER : why do we have a search order as well? Isn't apicurio-avro.globalid-placement enough? Don't the two mechanisms conflict? In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro union(something, null), where something is the Avro type converted from Flink type. Is that definitely the right way round? I know we've had multiple conversations about how unions work with Flink This is because the writer schema is expanded, but this could not complete if there are circularities. I understand your meaning but the sentence is awkward. The registered schema will be created or if it exists be updated. same again At some stage the lowest Flink level supported by the Kafka connector will contain the additionalProperties methods in code flink. wording There existing Kafka deserialization for the writer schema passes down the message body to be deserialised. wording @Override public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws IOException { Map<String, Object> additionalPropertiesMap = new HashMap<>(); for (Header header : message.additionalProperties()) { headersMap.put(header.key(), header.value()); } deserializationSchema.deserialize(message.value(), headersMap, out); } This fails to compile at headersMap. The input stream and additionalProperties will be sent so the Apicurio SchemaCoder which will try getting the globalId from the headers, then 4 bytes from the payload then 8 bytes from the payload. I'm still stuck on apicurio-avro.globalid-placement having a default value of HEADER . Should we try all three, or fail if this config param has a wrong value? Other considerations The implementation does not use the Apicurio deser libraries, Please can we refer to them as SerDes; this is the term used within the documentation that you link to On 2024/03/20 10:09:08 David Radley wrote: > Hi, > As per the FLIP process I would like to raise a FLIP, but do not have > authority, so have created a google doc for the Flip to introduce a new > Apicurio Avro format. The document is > https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing > > I have prototyped a lot of the content to prove that this approach is > feasible. I look forward to the discussion, > Kind regards, David. > > > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >