Hi Kevin,

Thanks, these are some great points.
Just to clarify, I do agree that the subject should be an option (like in
the case of RegistryAvroFormatFactory).
We could fallback to subject and auto-register schemas, if schema-Id not
provided explicitly.
In general, I think it would be good to be more explicit about the schemas
used (
https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
<https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration>
).
This would also help prevent us from overriding the ids in incompatible
ways.

Under the current implementation of FlinkToProtoSchemaConverter we might
end up overwriting the field-Ids.
If we are able to locate a prior schema, the approach you outlined makes a
lot of sense.
Let me explore this a bit further and get back(in terms of feasibility).

Thanks again!
- Anupam

On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam <kevin....@shopify.com.invalid>
wrote:

> Hi Anupam,
>
> Thanks again for your work on contributing this feature back.
>
> Sounds good re: the refactoring/re-organizing.
>
> Regarding the schema-id, in my opinion this should NOT be a configuration
> option on the format. We should be able to deterministically map the Flink
> type to the ProtoSchema and register that with the Schema Registry.
>
> I think it can make sense to provide the `subject` as a parameter, so that
> the serialization format can look up existing schemas.
>
> This would be used in something I mentioned something related above
>
> > Another topic I had is Protobuf's field ids. Ideally in Flink it would be
> > nice if we are idiomatic about not renumbering them in incompatible ways,
> > similar to what's discussed on the Schema Registry issue here:
> > https://github.com/confluentinc/schema-registry/issues/2551
>
>
> When we construct the ProtobufSchema from the Flink LogicalType, we
> shouldn't renumber the field ids in an incompatible way, so one approach
> would be to use the subject to look up the most recent version, and use
> that to evolve the field ids correctly.
>
>
> On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal <anupam.aggar...@gmail.com
> >
> wrote:
>
> > Hi Kevin,
> >
> > Thanks for starting the discussion on this.
> > I will be working on contributing this feature back (This was developed
> by
> > Dawid Wysakowicz and others at Confluent).
> >
> > I have opened a (very initial) draft PR here
> > https://github.com/apache/flink/pull/24482 with our current
> > implementation.
> > Thanks for the feedback on the PR, I haven’t gotten around to
> > re-organizing/refactoring the classes yet, but it would be inline with
> some
> > of your comments.
> >
> > On the overall approach there are some slight variations from the initial
> > proposal.
> > Our implementation relies on an explicit schema-id being passed through
> the
> > config. This could help in cases where one Flink type could potentially
> map
> > to multiple proto types.
> > We could make the schema-Id optional and fall back to deriving it from
> the
> > rowType (during serialization) if not present?
> >
> > The message index handling is still TBD. I am thinking of replicating
> logic
> > in AbstractKafkaProtobufSerializer.java
> > <
> >
> https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157
> > >
> >  (|Deserializer).
> > Please let me know if this makes sense / or in case you have any other
> > feedback.
> >
> > Thanks
> > Anupam
> >
> > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam <kevin....@shopify.com.invalid
> >
> > wrote:
> >
> > > Hey Robert,
> > >
> > > Awesome thanks, that timeline works for me. Sounds good re: deciding on
> > > FLIP once we have the PR, and thanks for looking into the field ids.
> > >
> > > Looking forward to it!
> > >
> > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger <metrob...@gmail.com>
> > > wrote:
> > >
> > > > Hey Kevin,
> > > >
> > > > Thanks a lot. Then let's contribute the Confluent implementation to
> > > > apache/flink. We can't start working on this immediately because of a
> > > team
> > > > event next week, but within the next two weeks, we will start working
> > on
> > > > this.
> > > > It probably makes sense for us to open a pull request of what we have
> > > > already, so that you can start reviewing and maybe also contributing
> to
> > > the
> > > > PR.
> > > > I hope this timeline works for you!
> > > >
> > > > Let's also decide if we need a FLIP once the code is public.
> > > > We will look into the field ids.
> > > >
> > > >
> > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam
> > <kevin....@shopify.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hey Robert,
> > > > >
> > > > > Thanks for your response. I have a partial implementation, just for
> > the
> > > > > decoding portion.
> > > > >
> > > > > The code I have is pretty rough and doesn't do any of the
> refactors I
> > > > > mentioned, but the decoder logic does pull the schema from the
> schema
> > > > > registry and use that to deserialize the DynamicMessage before
> > > converting
> > > > > it to RowData using a DynamicMessageToRowDataConverter class. For
> the
> > > > other
> > > > > aspects, I would need to start from scratch for the encoder.
> > > > >
> > > > > Would be very happy to see you drive the contribution back to open
> > > source
> > > > > from Confluent, or collaborate on this.
> > > > >
> > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it
> > would
> > > be
> > > > > nice if we are idiomatic about not renumbering them in incompatible
> > > ways,
> > > > > similar to what's discussed on the Schema Registry issue here:
> > > > > https://github.com/confluentinc/schema-registry/issues/2551
> > > > >
> > > > >
> > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <
> rmetz...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > +1 to support the format in Flink.
> > > > > >
> > > > > > @Kevin: Do you already have an implementation for this inhouse
> that
> > > you
> > > > > are
> > > > > > looking to upstream, or would you start from scratch?
> > > > > > I'm asking because my current employer, Confluent, has a Protobuf
> > > > Schema
> > > > > > registry implementation for Flink, and I could help drive
> > > contributing
> > > > > this
> > > > > > back to open source.
> > > > > > If you already have an implementation, let's decide which one to
> > use
> > > :)
> > > > > >
> > > > > > Best,
> > > > > > Robert
> > > > > >
> > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> > > david_rad...@uk.ibm.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Kevin,
> > > > > > > Some thoughts on this.
> > > > > > > I suggested an Apicurio registry format in the dev list, and
> was
> > > > > advised
> > > > > > > to raise a FLIP for this, I suggest the same would apply here
> (or
> > > the
> > > > > > > alternative to FLIPs if you cannot raise one). I am prototyping
> > an
> > > > Avro
> > > > > > > Apicurio format, prior to raising the Flip,  and notice that
> the
> > > > > > readSchema
> > > > > > > in the SchemaCoder only takes a byte array ,but I need to pass
> > down
> > > > the
> > > > > > > Kafka headers (where the Apicurio globalId identifying the
> schema
> > > > > lives).
> > > > > > >
> > > > > > > I assume:
> > > > > > >
> > > > > > >   *   for the confluent Protobuf format you would extend the
> > > Protobuf
> > > > > > > format to drive some Schema Registry logic for Protobuf
> (similar
> > to
> > > > the
> > > > > > way
> > > > > > > Avro does it) where the magic byte _ schema id can be obtained
> > and
> > > > the
> > > > > > > schema looked up using the Confluent Schema registry.
> > > > > > >   *   It would be good if any protobuf format enhancements for
> > > Schema
> > > > > > > registries pass down the Kafka headers (I am thinking as a
> > > > Map<String,
> > > > > > > Object> for Avro) as well as the message payload so Apicurio
> > > registry
> > > > > > could
> > > > > > > work with this.
> > > > > > >   *   It would make sense to have the Confluent schema lookup
> in
> > > > common
> > > > > > > code, which is part of the SchemaCoder readSchema  logic.
> > > > > > >   *   I assume the ProtobufSchemaCoder readSchema would return
> a
> > > > > Protobuf
> > > > > > > Schema object.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I also wondered whether these Kafka only formats should be
> moved
> > to
> > > > the
> > > > > > > Kafka connector repo, or whether they might in the future be
> used
> > > > > outside
> > > > > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > > > > >    Kind regards, David.
> > > > > > >
> > > > > > >
> > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID>
> > > > > > > Date: Wednesday, 21 February 2024 at 18:51
> > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium
> > Protobuf
> > > > > > > Confluent Format
> > > > > > > I would love to get some feedback from the community on this
> JIRA
> > > > > issue:
> > > > > > >
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > > > > > >
> > > > > > > I am looking into creating a PR and would appreciate some
> review
> > on
> > > > the
> > > > > > > approach.
> > > > > > >
> > > > > > > In terms of design I think we can mirror the
> > > > `debezium-avro-confluent`
> > > > > > and
> > > > > > > `avro-confluent` formats already available in Flink:
> > > > > > >
> > > > > > >    1. `protobuf-confluent` format which uses DynamicMessage
> > > > > > >    <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > > > > > > >
> > > > > > >    for encoding and decoding.
> > > > > > >       - For encoding the Flink RowType will be used to
> > dynamically
> > > > > > create a
> > > > > > >       Protobuf Schema and register it with the Confluent Schema
> > > > > > > Registry. It will
> > > > > > >       use the same schema to construct a DynamicMessage and
> > > serialize
> > > > > it.
> > > > > > >       - For decoding, the schema will be fetched from the
> > registry
> > > > and
> > > > > > use
> > > > > > >       DynamicMessage to deserialize and convert the Protobuf
> > object
> > > > to
> > > > > a
> > > > > > > Flink
> > > > > > >       RowData.
> > > > > > >       - Note: here there is no external .proto file
> > > > > > >    2. `debezium-avro-confluent` format which unpacks the
> Debezium
> > > > > > Envelope
> > > > > > >    and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER,
> > > INSERT,
> > > > > > DELETE
> > > > > > >    events.
> > > > > > >       - We may be able to refactor and reuse code from the
> > existing
> > > > > > >       DebeziumAvroDeserializationSchema +
> > > > > DebeziumAvroSerializationSchema
> > > > > > > since
> > > > > > >       the deser logic is largely delegated to and these Schemas
> > are
> > > > > > > concerned
> > > > > > >       with the handling the Debezium envelope.
> > > > > > >    3. Move the Confluent Schema Registry Client code to a
> > separate
> > > > > maven
> > > > > > >    module, flink-formats/flink-confluent-common, and extend it
> to
> > > > > support
> > > > > > >    ProtobufSchemaProvider
> > > > > > >    <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> > > > > > > >
> > > > > > >    .
> > > > > > >
> > > > > > >
> > > > > > > Does anyone have any feedback or objections to this approach?
> > > > > > >
> > > > > > > Unless otherwise stated above:
> > > > > > >
> > > > > > > IBM United Kingdom Limited
> > > > > > > Registered in England and Wales with number 741598
> > > > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> > PO6
> > > > 3AU
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to