TLDR: We currently require a connection to a Confluent Schema Registry to be able to work with Confluent Avro data. With a small modification to the Avro formatter, I think we could also offer the ability to process this type of data without requiring access to the schema registry.
What would people think of such an enhancement? ----- When working with Avro data, there are two formats available to us: avro and avro-confluent. avro Data it supports: Avro records Approach: You specify a table schema and it derives an appropriate Avro schema from this. avro-confluent Data it supports: Confluent’s variant[1] of the Avro encoding Approach: You provide connection details (URL, credentials, keystore/truststore, schema lookup strategy, etc.) for retrieving an appropriate schema from the Confluent Schema Registry. What this means is if you have Confluent Avro data[2] that you want to use in Flink, you currently have to use the avro-confluent format, and that means you need to provide Flink with access to your Schema Registry. I think there will be times where you may not want, or may not be able, to provide Flink with direct access to a Schema Registry. In such cases, it would be useful to support the same behaviour that the avro format does (i.e. allow you to explicitly specify a table schema) This could be achieved with a very minor modification to the avro formatter. For reading records, we could add an option to the formatter to highlight when records will be Confluent Avro. If that option is set, we just need the formatter to skip the first bytes with the schema ID/version (it can then use the remaining bytes with a regular Avro decoder as it does today – the existing implementation would be essentially unchanged). For writing records, something similar would work. An option to the formatter to highlight when to write records using Confluent Avro. We would need a way to specify what ID value to use for the first bytes [3]. (After that, the record can be encoded with a regular Avro encoder as it does today – the rest of the implementation would be unchanged). ----- [1] – This is the same as regular Avro, but prefixing the payload with extra bytes that identify which schema to use, to allow an appropriate schema to be retrieved from a schema registry. [2] – Records that were serialized by io.confluent.kafka.serializers.KafkaAvroSerializer and could be read by io.confluent.kafka.serializers.KafkaAvroDeserializer. [3] – Either by making them fixed options for that formatter, or by allowing it to be specified from something in the record. Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU