Hi all,

+1 to support the format in Flink.

@Kevin: Do you already have an implementation for this inhouse that you are
looking to upstream, or would you start from scratch?
I'm asking because my current employer, Confluent, has a Protobuf Schema
registry implementation for Flink, and I could help drive contributing this
back to open source.
If you already have an implementation, let's decide which one to use :)


On Thu, Feb 22, 2024 at 2:05 PM David Radley <david_rad...@uk.ibm.com>

> Hi Kevin,
> Some thoughts on this.
> I suggested an Apicurio registry format in the dev list, and was advised
> to raise a FLIP for this, I suggest the same would apply here (or the
> alternative to FLIPs if you cannot raise one). I am prototyping an Avro
> Apicurio format, prior to raising the Flip,  and notice that the readSchema
> in the SchemaCoder only takes a byte array ,but I need to pass down the
> Kafka headers (where the Apicurio globalId identifying the schema lives).
> I assume:
>   *   for the confluent Protobuf format you would extend the Protobuf
> format to drive some Schema Registry logic for Protobuf (similar to the way
> Avro does it) where the magic byte _ schema id can be obtained and the
> schema looked up using the Confluent Schema registry.
>   *   It would be good if any protobuf format enhancements for Schema
> registries pass down the Kafka headers (I am thinking as a Map<String,
> Object> for Avro) as well as the message payload so Apicurio registry could
> work with this.
>   *   It would make sense to have the Confluent schema lookup in common
> code, which is part of the SchemaCoder readSchema  logic.
>   *   I assume the ProtobufSchemaCoder readSchema would return a Protobuf
> Schema object.
> I also wondered whether these Kafka only formats should be moved to the
> Kafka connector repo, or whether they might in the future be used outside
> Kafka – e.g. Avro/Protobuf files in a database.
>    Kind regards, David.
> From: Kevin Lam <kevin....@shopify.com.INVALID>
> Date: Wednesday, 21 February 2024 at 18:51
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> Confluent Format
> I would love to get some feedback from the community on this JIRA issue:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> I am looking into creating a PR and would appreciate some review on the
> approach.
> In terms of design I think we can mirror the `debezium-avro-confluent` and
> `avro-confluent` formats already available in Flink:
>    1. `protobuf-confluent` format which uses DynamicMessage
>    <
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> >
>    for encoding and decoding.
>       - For encoding the Flink RowType will be used to dynamically create a
>       Protobuf Schema and register it with the Confluent Schema
> Registry. It will
>       use the same schema to construct a DynamicMessage and serialize it.
>       - For decoding, the schema will be fetched from the registry and use
>       DynamicMessage to deserialize and convert the Protobuf object to a
> Flink
>       RowData.
>       - Note: here there is no external .proto file
>    2. `debezium-avro-confluent` format which unpacks the Debezium Envelope
>    and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE
>    events.
>       - We may be able to refactor and reuse code from the existing
>       DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema
> since
>       the deser logic is largely delegated to and these Schemas are
> concerned
>       with the handling the Debezium envelope.
>    3. Move the Confluent Schema Registry Client code to a separate maven
>    module, flink-formats/flink-confluent-common, and extend it to support
>    ProtobufSchemaProvider
>    <
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> >
>    .
> Does anyone have any feedback or objections to this approach?
> Unless otherwise stated above:
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to