Hi Kevin,

Thanks for starting the discussion on this.
I will be working on contributing this feature back (This was developed by
Dawid Wysakowicz and others at Confluent).

I have opened a (very initial) draft PR here
https://github.com/apache/flink/pull/24482 with our current implementation.
Thanks for the feedback on the PR, I haven’t gotten around to
re-organizing/refactoring the classes yet, but it would be inline with some
of your comments.

On the overall approach there are some slight variations from the initial
proposal.
Our implementation relies on an explicit schema-id being passed through the
config. This could help in cases where one Flink type could potentially map
to multiple proto types.
We could make the schema-Id optional and fall back to deriving it from the
rowType (during serialization) if not present?

The message index handling is still TBD. I am thinking of replicating logic
in AbstractKafkaProtobufSerializer.java
<https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157>
 (|Deserializer).
Please let me know if this makes sense / or in case you have any other
feedback.

Thanks
Anupam

On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam <kevin....@shopify.com.invalid>
wrote:

> Hey Robert,
>
> Awesome thanks, that timeline works for me. Sounds good re: deciding on
> FLIP once we have the PR, and thanks for looking into the field ids.
>
> Looking forward to it!
>
> On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger <metrob...@gmail.com>
> wrote:
>
> > Hey Kevin,
> >
> > Thanks a lot. Then let's contribute the Confluent implementation to
> > apache/flink. We can't start working on this immediately because of a
> team
> > event next week, but within the next two weeks, we will start working on
> > this.
> > It probably makes sense for us to open a pull request of what we have
> > already, so that you can start reviewing and maybe also contributing to
> the
> > PR.
> > I hope this timeline works for you!
> >
> > Let's also decide if we need a FLIP once the code is public.
> > We will look into the field ids.
> >
> >
> > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam <kevin....@shopify.com.invalid
> >
> > wrote:
> >
> > > Hey Robert,
> > >
> > > Thanks for your response. I have a partial implementation, just for the
> > > decoding portion.
> > >
> > > The code I have is pretty rough and doesn't do any of the refactors I
> > > mentioned, but the decoder logic does pull the schema from the schema
> > > registry and use that to deserialize the DynamicMessage before
> converting
> > > it to RowData using a DynamicMessageToRowDataConverter class. For the
> > other
> > > aspects, I would need to start from scratch for the encoder.
> > >
> > > Would be very happy to see you drive the contribution back to open
> source
> > > from Confluent, or collaborate on this.
> > >
> > > Another topic I had is Protobuf's field ids. Ideally in Flink it would
> be
> > > nice if we are idiomatic about not renumbering them in incompatible
> ways,
> > > similar to what's discussed on the Schema Registry issue here:
> > > https://github.com/confluentinc/schema-registry/issues/2551
> > >
> > >
> > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <rmetz...@apache.org>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > +1 to support the format in Flink.
> > > >
> > > > @Kevin: Do you already have an implementation for this inhouse that
> you
> > > are
> > > > looking to upstream, or would you start from scratch?
> > > > I'm asking because my current employer, Confluent, has a Protobuf
> > Schema
> > > > registry implementation for Flink, and I could help drive
> contributing
> > > this
> > > > back to open source.
> > > > If you already have an implementation, let's decide which one to use
> :)
> > > >
> > > > Best,
> > > > Robert
> > > >
> > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> david_rad...@uk.ibm.com>
> > > > wrote:
> > > >
> > > > > Hi Kevin,
> > > > > Some thoughts on this.
> > > > > I suggested an Apicurio registry format in the dev list, and was
> > > advised
> > > > > to raise a FLIP for this, I suggest the same would apply here (or
> the
> > > > > alternative to FLIPs if you cannot raise one). I am prototyping an
> > Avro
> > > > > Apicurio format, prior to raising the Flip,  and notice that the
> > > > readSchema
> > > > > in the SchemaCoder only takes a byte array ,but I need to pass down
> > the
> > > > > Kafka headers (where the Apicurio globalId identifying the schema
> > > lives).
> > > > >
> > > > > I assume:
> > > > >
> > > > >   *   for the confluent Protobuf format you would extend the
> Protobuf
> > > > > format to drive some Schema Registry logic for Protobuf (similar to
> > the
> > > > way
> > > > > Avro does it) where the magic byte _ schema id can be obtained and
> > the
> > > > > schema looked up using the Confluent Schema registry.
> > > > >   *   It would be good if any protobuf format enhancements for
> Schema
> > > > > registries pass down the Kafka headers (I am thinking as a
> > Map<String,
> > > > > Object> for Avro) as well as the message payload so Apicurio
> registry
> > > > could
> > > > > work with this.
> > > > >   *   It would make sense to have the Confluent schema lookup in
> > common
> > > > > code, which is part of the SchemaCoder readSchema  logic.
> > > > >   *   I assume the ProtobufSchemaCoder readSchema would return a
> > > Protobuf
> > > > > Schema object.
> > > > >
> > > > >
> > > > >
> > > > > I also wondered whether these Kafka only formats should be moved to
> > the
> > > > > Kafka connector repo, or whether they might in the future be used
> > > outside
> > > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > > >    Kind regards, David.
> > > > >
> > > > >
> > > > > From: Kevin Lam <kevin....@shopify.com.INVALID>
> > > > > Date: Wednesday, 21 February 2024 at 18:51
> > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > > > > Confluent Format
> > > > > I would love to get some feedback from the community on this JIRA
> > > issue:
> > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > > > >
> > > > > I am looking into creating a PR and would appreciate some review on
> > the
> > > > > approach.
> > > > >
> > > > > In terms of design I think we can mirror the
> > `debezium-avro-confluent`
> > > > and
> > > > > `avro-confluent` formats already available in Flink:
> > > > >
> > > > >    1. `protobuf-confluent` format which uses DynamicMessage
> > > > >    <
> > > > >
> > > >
> > >
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > > > > >
> > > > >    for encoding and decoding.
> > > > >       - For encoding the Flink RowType will be used to dynamically
> > > > create a
> > > > >       Protobuf Schema and register it with the Confluent Schema
> > > > > Registry. It will
> > > > >       use the same schema to construct a DynamicMessage and
> serialize
> > > it.
> > > > >       - For decoding, the schema will be fetched from the registry
> > and
> > > > use
> > > > >       DynamicMessage to deserialize and convert the Protobuf object
> > to
> > > a
> > > > > Flink
> > > > >       RowData.
> > > > >       - Note: here there is no external .proto file
> > > > >    2. `debezium-avro-confluent` format which unpacks the Debezium
> > > > Envelope
> > > > >    and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER,
> INSERT,
> > > > DELETE
> > > > >    events.
> > > > >       - We may be able to refactor and reuse code from the existing
> > > > >       DebeziumAvroDeserializationSchema +
> > > DebeziumAvroSerializationSchema
> > > > > since
> > > > >       the deser logic is largely delegated to and these Schemas are
> > > > > concerned
> > > > >       with the handling the Debezium envelope.
> > > > >    3. Move the Confluent Schema Registry Client code to a separate
> > > maven
> > > > >    module, flink-formats/flink-confluent-common, and extend it to
> > > support
> > > > >    ProtobufSchemaProvider
> > > > >    <
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> > > > > >
> > > > >    .
> > > > >
> > > > >
> > > > > Does anyone have any feedback or objections to this approach?
> > > > >
> > > > > Unless otherwise stated above:
> > > > >
> > > > > IBM United Kingdom Limited
> > > > > Registered in England and Wales with number 741598
> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> > 3AU
> > > > >
> > > >
> > >
> >
>

Reply via email to