TLDR:
We currently require a connection to a Confluent Schema Registry to be able to 
work with Confluent Avro data. With a small modification to the Avro formatter, 
I think we could also offer the ability to process this type of data without 
requiring access to the schema registry.

What would people think of such an enhancement?

-----

When working with Avro data, there are two formats available to us: avro and 
avro-confluent.

avro
Data it supports: Avro records
Approach: You specify a table schema and it derives an appropriate Avro schema 
from this.

avro-confluent
Data it supports: Confluent’s variant[1] of the Avro encoding
Approach: You provide connection details (URL, credentials, 
keystore/truststore, schema lookup strategy, etc.) for retrieving an 
appropriate schema from the Confluent Schema Registry.

What this means is if you have Confluent Avro data[2] that you want to use in 
Flink, you currently have to use the avro-confluent format, and that means you 
need to provide Flink with access to your Schema Registry.

I think there will be times where you may not want, or may not be able, to 
provide Flink with direct access to a Schema Registry. In such cases, it would 
be useful to support the same behaviour that the avro format does (i.e. allow 
you to explicitly specify a table schema)

This could be achieved with a very minor modification to the avro formatter.

For reading records, we could add an option to the formatter to highlight when 
records will be Confluent Avro. If that option is set, we just need the 
formatter to skip the first bytes with the schema ID/version (it can then use 
the remaining bytes with a regular Avro decoder as it does today – the existing 
implementation would be essentially unchanged).

For writing records, something similar would work. An option to the formatter 
to highlight when to write records using Confluent Avro. We would need a way to 
specify what ID value to use for the first bytes [3]. (After that, the record 
can be encoded with a regular Avro encoder as it does today – the rest of the 
implementation would be unchanged).


-----
[1] – This is the same as regular Avro, but prefixing the payload with extra 
bytes that identify which schema to use, to allow an appropriate schema to be 
retrieved from a schema registry.

[2] – Records that were serialized by 
io.confluent.kafka.serializers.KafkaAvroSerializer and could be read by 
io.confluent.kafka.serializers.KafkaAvroDeserializer.

[3] – Either by making them fixed options for that formatter, or by allowing it 
to be specified from something in the record.

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to