+1 to this
A few small comments:
Currently, if users have Avro schemas in an Apicurio Registry (an open source
Apache 2 licensed schema registry), then the natural way to work with those
Avro flows is to use the schemas in the Apicurio Repository.
'those Avro flows' ... this is the first reference to flows.
The new format will use the global Id to look up the Avro schema that the
message was written during deserialization.
I get the point, phrasing is awkward. Probably you're more interested in
content than word polish at this point though.
The Avro Schema Registry (apicurio-avro) format
The Confluent format is called avro-confluent; this should be avro-apicurio
How to create tables with Apicurio-avro format
s/Apicurio-avro/avro-apicurio/g
HEADER – globalId is put in the header
LEGACY– global Id is put in the message as a long
CONFLUENT - globalId is put in the message as an int.
Please could we specify 'four-byte int' and 'eight-byte long' ?
For a Kafka source the globalId will be looked for in this order:
- In the header
- After a magic byte as an int
- After a magic byte as a long.
but apicurio-avro.globalid-placement has a default value of HEADER : why do we
have a search order as well? Isn't apicurio-avro.globalid-placement enough?
Don't the two mechanisms conflict?
In addition to the types listed there, Flink supports reading/writing nullable
types. Flink maps nullable types to Avro union(something, null), where
something is the Avro type converted from Flink type.
Is that definitely the right way round? I know we've had multiple conversations
about how unions work with Flink
This is because the writer schema is expanded, but this could not complete if
there are circularities.
I understand your meaning but the sentence is awkward.
The registered schema will be created or if it exists be updated.
same again
At some stage the lowest Flink level supported by the Kafka connector will
contain the additionalProperties methods in code flink.
wording
There existing Kafka deserialization for the writer schema passes down the
message body to be deserialised.
wording
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T>
out)
throws IOException {
Map<String, Object> additionalPropertiesMap = new HashMap<>();
for (Header header : message.additionalProperties()) {
headersMap.put(header.key(), header.value());
}
deserializationSchema.deserialize(message.value(), headersMap, out);
}
This fails to compile at headersMap.
The input stream and additionalProperties will be sent so the Apicurio
SchemaCoder which will try getting the globalId from the headers, then 4 bytes
from the payload then 8 bytes from the payload.
I'm still stuck on apicurio-avro.globalid-placement having a default value of
HEADER . Should we try all three, or fail if this config param has a wrong
value?
Other considerations
The implementation does not use the Apicurio deser libraries,
Please can we refer to them as SerDes; this is the term used within the
documentation that you link to
On 2024/03/20 10:09:08 David Radley wrote:
> Hi,
> As per the FLIP process I would like to raise a FLIP, but do not have
> authority, so have created a google doc for the Flip to introduce a new
> Apicurio Avro format. The document is
> https://docs.google.com/document/d/14LWZPVFQ7F9mryJPdKXb4l32n7B0iWYkcOdEd1xTC7w/edit?usp=sharing
>
> I have prototyped a lot of the content to prove that this approach is
> feasible. I look forward to the discussion,
> Kind regards, David.
>
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>