Hey Kevin,

Thanks a lot. Then let's contribute the Confluent implementation to
apache/flink. We can't start working on this immediately because of a team
event next week, but within the next two weeks, we will start working on
this.
It probably makes sense for us to open a pull request of what we have
already, so that you can start reviewing and maybe also contributing to the
PR.
I hope this timeline works for you!

Let's also decide if we need a FLIP once the code is public.
We will look into the field ids.


On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam <kevin....@shopify.com.invalid>
wrote:

> Hey Robert,
>
> Thanks for your response. I have a partial implementation, just for the
> decoding portion.
>
> The code I have is pretty rough and doesn't do any of the refactors I
> mentioned, but the decoder logic does pull the schema from the schema
> registry and use that to deserialize the DynamicMessage before converting
> it to RowData using a DynamicMessageToRowDataConverter class. For the other
> aspects, I would need to start from scratch for the encoder.
>
> Would be very happy to see you drive the contribution back to open source
> from Confluent, or collaborate on this.
>
> Another topic I had is Protobuf's field ids. Ideally in Flink it would be
> nice if we are idiomatic about not renumbering them in incompatible ways,
> similar to what's discussed on the Schema Registry issue here:
> https://github.com/confluentinc/schema-registry/issues/2551
>
>
> On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <rmetz...@apache.org>
> wrote:
>
> > Hi all,
> >
> > +1 to support the format in Flink.
> >
> > @Kevin: Do you already have an implementation for this inhouse that you
> are
> > looking to upstream, or would you start from scratch?
> > I'm asking because my current employer, Confluent, has a Protobuf Schema
> > registry implementation for Flink, and I could help drive contributing
> this
> > back to open source.
> > If you already have an implementation, let's decide which one to use :)
> >
> > Best,
> > Robert
> >
> > On Thu, Feb 22, 2024 at 2:05 PM David Radley <david_rad...@uk.ibm.com>
> > wrote:
> >
> > > Hi Kevin,
> > > Some thoughts on this.
> > > I suggested an Apicurio registry format in the dev list, and was
> advised
> > > to raise a FLIP for this, I suggest the same would apply here (or the
> > > alternative to FLIPs if you cannot raise one). I am prototyping an Avro
> > > Apicurio format, prior to raising the Flip,  and notice that the
> > readSchema
> > > in the SchemaCoder only takes a byte array ,but I need to pass down the
> > > Kafka headers (where the Apicurio globalId identifying the schema
> lives).
> > >
> > > I assume:
> > >
> > >   *   for the confluent Protobuf format you would extend the Protobuf
> > > format to drive some Schema Registry logic for Protobuf (similar to the
> > way
> > > Avro does it) where the magic byte _ schema id can be obtained and the
> > > schema looked up using the Confluent Schema registry.
> > >   *   It would be good if any protobuf format enhancements for Schema
> > > registries pass down the Kafka headers (I am thinking as a Map<String,
> > > Object> for Avro) as well as the message payload so Apicurio registry
> > could
> > > work with this.
> > >   *   It would make sense to have the Confluent schema lookup in common
> > > code, which is part of the SchemaCoder readSchema  logic.
> > >   *   I assume the ProtobufSchemaCoder readSchema would return a
> Protobuf
> > > Schema object.
> > >
> > >
> > >
> > > I also wondered whether these Kafka only formats should be moved to the
> > > Kafka connector repo, or whether they might in the future be used
> outside
> > > Kafka – e.g. Avro/Protobuf files in a database.
> > >    Kind regards, David.
> > >
> > >
> > > From: Kevin Lam <kevin....@shopify.com.INVALID>
> > > Date: Wednesday, 21 February 2024 at 18:51
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > > Confluent Format
> > > I would love to get some feedback from the community on this JIRA
> issue:
> > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > >
> > > I am looking into creating a PR and would appreciate some review on the
> > > approach.
> > >
> > > In terms of design I think we can mirror the `debezium-avro-confluent`
> > and
> > > `avro-confluent` formats already available in Flink:
> > >
> > >    1. `protobuf-confluent` format which uses DynamicMessage
> > >    <
> > >
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > > >
> > >    for encoding and decoding.
> > >       - For encoding the Flink RowType will be used to dynamically
> > create a
> > >       Protobuf Schema and register it with the Confluent Schema
> > > Registry. It will
> > >       use the same schema to construct a DynamicMessage and serialize
> it.
> > >       - For decoding, the schema will be fetched from the registry and
> > use
> > >       DynamicMessage to deserialize and convert the Protobuf object to
> a
> > > Flink
> > >       RowData.
> > >       - Note: here there is no external .proto file
> > >    2. `debezium-avro-confluent` format which unpacks the Debezium
> > Envelope
> > >    and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT,
> > DELETE
> > >    events.
> > >       - We may be able to refactor and reuse code from the existing
> > >       DebeziumAvroDeserializationSchema +
> DebeziumAvroSerializationSchema
> > > since
> > >       the deser logic is largely delegated to and these Schemas are
> > > concerned
> > >       with the handling the Debezium envelope.
> > >    3. Move the Confluent Schema Registry Client code to a separate
> maven
> > >    module, flink-formats/flink-confluent-common, and extend it to
> support
> > >    ProtobufSchemaProvider
> > >    <
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> > > >
> > >    .
> > >
> > >
> > > Does anyone have any feedback or objections to this approach?
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> >
>

Reply via email to