Hi Kevin,
Some thoughts on this.
I suggested an Apicurio registry format in the dev list, and was advised to 
raise a FLIP for this, I suggest the same would apply here (or the alternative 
to FLIPs if you cannot raise one). I am prototyping an Avro Apicurio format, 
prior to raising the Flip,  and notice that the readSchema in the SchemaCoder 
only takes a byte array ,but I need to pass down the Kafka headers (where the 
Apicurio globalId identifying the schema lives).

I assume:

  *   for the confluent Protobuf format you would extend the Protobuf format to 
drive some Schema Registry logic for Protobuf (similar to the way Avro does it) 
where the magic byte _ schema id can be obtained and the schema looked up using 
the Confluent Schema registry.
  *   It would be good if any protobuf format enhancements for Schema 
registries pass down the Kafka headers (I am thinking as a Map<String, Object> 
for Avro) as well as the message payload so Apicurio registry could work with 
this.
  *   It would make sense to have the Confluent schema lookup in common code, 
which is part of the SchemaCoder readSchema  logic.
  *   I assume the ProtobufSchemaCoder readSchema would return a Protobuf 
Schema object.



I also wondered whether these Kafka only formats should be moved to the Kafka 
connector repo, or whether they might in the future be used outside Kafka – 
e.g. Avro/Protobuf files in a database.
   Kind regards, David.


From: Kevin Lam <kevin....@shopify.com.INVALID>
Date: Wednesday, 21 February 2024 at 18:51
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent 
Format
I would love to get some feedback from the community on this JIRA issue:
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440

I am looking into creating a PR and would appreciate some review on the
approach.

In terms of design I think we can mirror the `debezium-avro-confluent` and
`avro-confluent` formats already available in Flink:

   1. `protobuf-confluent` format which uses DynamicMessage
   
<https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
 >
   for encoding and decoding.
      - For encoding the Flink RowType will be used to dynamically create a
      Protobuf Schema and register it with the Confluent Schema
Registry. It will
      use the same schema to construct a DynamicMessage and serialize it.
      - For decoding, the schema will be fetched from the registry and use
      DynamicMessage to deserialize and convert the Protobuf object to a Flink
      RowData.
      - Note: here there is no external .proto file
   2. `debezium-avro-confluent` format which unpacks the Debezium Envelope
   and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE
   events.
      - We may be able to refactor and reuse code from the existing
      DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema since
      the deser logic is largely delegated to and these Schemas are concerned
      with the handling the Debezium envelope.
   3. Move the Confluent Schema Registry Client code to a separate maven
   module, flink-formats/flink-confluent-common, and extend it to support
   ProtobufSchemaProvider
   
<https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
 >
   .


Does anyone have any feedback or objections to this approach?

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to