Hi Anupam,
Thanks for your response. I was wondering around the schema id and had some 
thoughts:

I assume that for Confluent Avro, specifying the schema is not normally done, 
but could be useful to force a particular shape.

If you specify a schema id in the format configuration:
- for deserialization : does this mean the schema id in the payload has to 
match it. If so we lose the ability to have multiple versions of the schema on 
a topic. For me schemaId makes less sense for deserialization as the existing 
mechanism used by Avro / confluent avro formats is working well.

- I can see it makes sense for the serialization where there is an existing 
schema in the registry you want to target.

I suggest the schemaId be called something like schemaIdForSink or 
schemaIdForSerilization; to prevent confusion with the deserialization case. We 
could have the schema as you suggest so we are compatible with the confluent 
avro format.


WDYT?
    Kind regards, David.


From: Anupam Aggarwal <anupam.aggar...@gmail.com>
Date: Saturday, 13 April 2024 at 16:08
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf 
Confluent Format
Hi David,

Thank you for the suggestion.
IIUC, you are proposing using an explicit schema string, instead of the
schemaID.
This makes sense, as it would make the behavior consistent with Avro,
although a bit more verbose from a config standpoint.

If we go via the schema string route, the user would have to ensure that
the input schema string corresponds to an existing schemaID.
This however, might end up registering a new id (based on
https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493
).

How about adding both the options (explicit schema string/ schemaID).
If a schema string is specified we register a new schemaID, if the user
specifies an explicit schemaID we just use it directly?

Thanks
Anupam

On Wed, Apr 10, 2024 at 2:27 PM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi,
> I notice in the draft pr that there is a schema id in the format config. I
> was wondering why? In the confluent avro and existing debezium formats,
> there is no schema id in the config, but there is the ability to specify a
> complete schema. In the protobuf format there is no schema id.
>
> I assume the schema id would be used during serialize in the case there is
> already an existing registered schema and you have its id. I see in the
> docs
> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
> there is a serialize example where 2 schemas are registered.
>
> I would suggest aiming to copy what the confluent DeSer libraries do
> rather than having a schema id hard coded in the config.
>
> WDYT?
>     Kind regards, David.
>
> From: Kevin Lam <kevin....@shopify.com.INVALID>
> Date: Tuesday, 26 March 2024 at 20:06
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> Confluent Format
> Thanks Anupam! Looking forward to it.
>
> On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal <anupam.aggar...@gmail.com
> >
> wrote:
>
> > Hi Kevin,
> >
> > Thanks, these are some great points.
> > Just to clarify, I do agree that the subject should be an option (like in
> > the case of RegistryAvroFormatFactory).
> > We could fallback to subject and auto-register schemas, if schema-Id not
> > provided explicitly.
> > In general, I think it would be good to be more explicit about the
> schemas
> > used (
> >
> >
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > <
> >
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > >
> > ).
> > This would also help prevent us from overriding the ids in incompatible
> > ways.
> >
> > Under the current implementation of FlinkToProtoSchemaConverter we might
> > end up overwriting the field-Ids.
> > If we are able to locate a prior schema, the approach you outlined makes
> a
> > lot of sense.
> > Let me explore this a bit further and get back(in terms of feasibility).
> >
> > Thanks again!
> > - Anupam
> >
> > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam <kevin....@shopify.com.invalid
> >
> > wrote:
> >
> > > Hi Anupam,
> > >
> > > Thanks again for your work on contributing this feature back.
> > >
> > > Sounds good re: the refactoring/re-organizing.
> > >
> > > Regarding the schema-id, in my opinion this should NOT be a
> configuration
> > > option on the format. We should be able to deterministically map the
> > Flink
> > > type to the ProtoSchema and register that with the Schema Registry.
> > >
> > > I think it can make sense to provide the `subject` as a parameter, so
> > that
> > > the serialization format can look up existing schemas.
> > >
> > > This would be used in something I mentioned something related above
> > >
> > > > Another topic I had is Protobuf's field ids. Ideally in Flink it
> would
> > be
> > > > nice if we are idiomatic about not renumbering them in incompatible
> > ways,
> > > > similar to what's discussed on the Schema Registry issue here:
> > > > https://github.com/confluentinc/schema-registry/issues/2551
> > >
> > >
> > > When we construct the ProtobufSchema from the Flink LogicalType, we
> > > shouldn't renumber the field ids in an incompatible way, so one
> approach
> > > would be to use the subject to look up the most recent version, and use
> > > that to evolve the field ids correctly.
> > >
> > >
> > > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal <
> > anupam.aggar...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Kevin,
> > > >
> > > > Thanks for starting the discussion on this.
> > > > I will be working on contributing this feature back (This was
> developed
> > > by
> > > > Dawid Wysakowicz and others at Confluent).
> > > >
> > > > I have opened a (very initial) draft PR here
> > > > https://github.com/apache/flink/pull/24482  with our current
> > > > implementation.
> > > > Thanks for the feedback on the PR, I haven’t gotten around to
> > > > re-organizing/refactoring the classes yet, but it would be inline
> with
> > > some
> > > > of your comments.
> > > >
> > > > On the overall approach there are some slight variations from the
> > initial
> > > > proposal.
> > > > Our implementation relies on an explicit schema-id being passed
> through
> > > the
> > > > config. This could help in cases where one Flink type could
> potentially
> > > map
> > > > to multiple proto types.
> > > > We could make the schema-Id optional and fall back to deriving it
> from
> > > the
> > > > rowType (during serialization) if not present?
> > > >
> > > > The message index handling is still TBD. I am thinking of replicating
> > > logic
> > > > in AbstractKafkaProtobufSerializer.java
> > > > <
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157
> > > > >
> > > >  (|Deserializer).
> > > > Please let me know if this makes sense / or in case you have any
> other
> > > > feedback.
> > > >
> > > > Thanks
> > > > Anupam
> > > >
> > > > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam
> > <kevin....@shopify.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hey Robert,
> > > > >
> > > > > Awesome thanks, that timeline works for me. Sounds good re:
> deciding
> > on
> > > > > FLIP once we have the PR, and thanks for looking into the field
> ids.
> > > > >
> > > > > Looking forward to it!
> > > > >
> > > > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger <
> metrob...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey Kevin,
> > > > > >
> > > > > > Thanks a lot. Then let's contribute the Confluent implementation
> to
> > > > > > apache/flink. We can't start working on this immediately because
> > of a
> > > > > team
> > > > > > event next week, but within the next two weeks, we will start
> > working
> > > > on
> > > > > > this.
> > > > > > It probably makes sense for us to open a pull request of what we
> > have
> > > > > > already, so that you can start reviewing and maybe also
> > contributing
> > > to
> > > > > the
> > > > > > PR.
> > > > > > I hope this timeline works for you!
> > > > > >
> > > > > > Let's also decide if we need a FLIP once the code is public.
> > > > > > We will look into the field ids.
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam
> > > > <kevin....@shopify.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Robert,
> > > > > > >
> > > > > > > Thanks for your response. I have a partial implementation, just
> > for
> > > > the
> > > > > > > decoding portion.
> > > > > > >
> > > > > > > The code I have is pretty rough and doesn't do any of the
> > > refactors I
> > > > > > > mentioned, but the decoder logic does pull the schema from the
> > > schema
> > > > > > > registry and use that to deserialize the DynamicMessage before
> > > > > converting
> > > > > > > it to RowData using a DynamicMessageToRowDataConverter class.
> For
> > > the
> > > > > > other
> > > > > > > aspects, I would need to start from scratch for the encoder.
> > > > > > >
> > > > > > > Would be very happy to see you drive the contribution back to
> > open
> > > > > source
> > > > > > > from Confluent, or collaborate on this.
> > > > > > >
> > > > > > > Another topic I had is Protobuf's field ids. Ideally in Flink
> it
> > > > would
> > > > > be
> > > > > > > nice if we are idiomatic about not renumbering them in
> > incompatible
> > > > > ways,
> > > > > > > similar to what's discussed on the Schema Registry issue here:
> > > > > > > https://github.com/confluentinc/schema-registry/issues/2551
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <
> > > rmetz...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > +1 to support the format in Flink.
> > > > > > > >
> > > > > > > > @Kevin: Do you already have an implementation for this
> inhouse
> > > that
> > > > > you
> > > > > > > are
> > > > > > > > looking to upstream, or would you start from scratch?
> > > > > > > > I'm asking because my current employer, Confluent, has a
> > Protobuf
> > > > > > Schema
> > > > > > > > registry implementation for Flink, and I could help drive
> > > > > contributing
> > > > > > > this
> > > > > > > > back to open source.
> > > > > > > > If you already have an implementation, let's decide which one
> > to
> > > > use
> > > > > :)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Robert
> > > > > > > >
> > > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> > > > > david_rad...@uk.ibm.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Kevin,
> > > > > > > > > Some thoughts on this.
> > > > > > > > > I suggested an Apicurio registry format in the dev list,
> and
> > > was
> > > > > > > advised
> > > > > > > > > to raise a FLIP for this, I suggest the same would apply
> here
> > > (or
> > > > > the
> > > > > > > > > alternative to FLIPs if you cannot raise one). I am
> > prototyping
> > > > an
> > > > > > Avro
> > > > > > > > > Apicurio format, prior to raising the Flip,  and notice
> that
> > > the
> > > > > > > > readSchema
> > > > > > > > > in the SchemaCoder only takes a byte array ,but I need to
> > pass
> > > > down
> > > > > > the
> > > > > > > > > Kafka headers (where the Apicurio globalId identifying the
> > > schema
> > > > > > > lives).
> > > > > > > > >
> > > > > > > > > I assume:
> > > > > > > > >
> > > > > > > > >   *   for the confluent Protobuf format you would extend
> the
> > > > > Protobuf
> > > > > > > > > format to drive some Schema Registry logic for Protobuf
> > > (similar
> > > > to
> > > > > > the
> > > > > > > > way
> > > > > > > > > Avro does it) where the magic byte _ schema id can be
> > obtained
> > > > and
> > > > > > the
> > > > > > > > > schema looked up using the Confluent Schema registry.
> > > > > > > > >   *   It would be good if any protobuf format enhancements
> > for
> > > > > Schema
> > > > > > > > > registries pass down the Kafka headers (I am thinking as a
> > > > > > Map<String,
> > > > > > > > > Object> for Avro) as well as the message payload so
> Apicurio
> > > > > registry
> > > > > > > > could
> > > > > > > > > work with this.
> > > > > > > > >   *   It would make sense to have the Confluent schema
> lookup
> > > in
> > > > > > common
> > > > > > > > > code, which is part of the SchemaCoder readSchema  logic.
> > > > > > > > >   *   I assume the ProtobufSchemaCoder readSchema would
> > return
> > > a
> > > > > > > Protobuf
> > > > > > > > > Schema object.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I also wondered whether these Kafka only formats should be
> > > moved
> > > > to
> > > > > > the
> > > > > > > > > Kafka connector repo, or whether they might in the future
> be
> > > used
> > > > > > > outside
> > > > > > > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > > > > > > >    Kind regards, David.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > From: Kevin Lam <kevin....@shopify.com.INVALID>
> > > > > > > > > Date: Wednesday, 21 February 2024 at 18:51
> > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium
> > > > Protobuf
> > > > > > > > > Confluent Format
> > > > > > > > > I would love to get some feedback from the community on
> this
> > > JIRA
> > > > > > > issue:
> > > > > > > > >
> > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > > > > > > > >
> > > > > > > > > I am looking into creating a PR and would appreciate some
> > > review
> > > > on
> > > > > > the
> > > > > > > > > approach.
> > > > > > > > >
> > > > > > > > > In terms of design I think we can mirror the
> > > > > > `debezium-avro-confluent`
> > > > > > > > and
> > > > > > > > > `avro-confluent` formats already available in Flink:
> > > > > > > > >
> > > > > > > > >    1. `protobuf-confluent` format which uses DynamicMessage
> > > > > > > > >    <
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > > > > > > > > >
> > > > > > > > >    for encoding and decoding.
> > > > > > > > >       - For encoding the Flink RowType will be used to
> > > > dynamically
> > > > > > > > create a
> > > > > > > > >       Protobuf Schema and register it with the Confluent
> > Schema
> > > > > > > > > Registry. It will
> > > > > > > > >       use the same schema to construct a DynamicMessage and
> > > > > serialize
> > > > > > > it.
> > > > > > > > >       - For decoding, the schema will be fetched from the
> > > > registry
> > > > > > and
> > > > > > > > use
> > > > > > > > >       DynamicMessage to deserialize and convert the
> Protobuf
> > > > object
> > > > > > to
> > > > > > > a
> > > > > > > > > Flink
> > > > > > > > >       RowData.
> > > > > > > > >       - Note: here there is no external .proto file
> > > > > > > > >    2. `debezium-avro-confluent` format which unpacks the
> > > Debezium
> > > > > > > > Envelope
> > > > > > > > >    and collects the appropriate UPDATE_BEFORE,
> UPDATE_AFTER,
> > > > > INSERT,
> > > > > > > > DELETE
> > > > > > > > >    events.
> > > > > > > > >       - We may be able to refactor and reuse code from the
> > > > existing
> > > > > > > > >       DebeziumAvroDeserializationSchema +
> > > > > > > DebeziumAvroSerializationSchema
> > > > > > > > > since
> > > > > > > > >       the deser logic is largely delegated to and these
> > Schemas
> > > > are
> > > > > > > > > concerned
> > > > > > > > >       with the handling the Debezium envelope.
> > > > > > > > >    3. Move the Confluent Schema Registry Client code to a
> > > > separate
> > > > > > > maven
> > > > > > > > >    module, flink-formats/flink-confluent-common, and extend
> > it
> > > to
> > > > > > > support
> > > > > > > > >    ProtobufSchemaProvider
> > > > > > > > >    <
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> > > > > > > > > >
> > > > > > > > >    .
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Does anyone have any feedback or objections to this
> approach?
> > > > > > > > >
> > > > > > > > > Unless otherwise stated above:
> > > > > > > > >
> > > > > > > > > IBM United Kingdom Limited
> > > > > > > > > Registered in England and Wales with number 741598
> > > > > > > > > Registered office: PO Box 41, North Harbour, Portsmouth,
> > Hants.
> > > > PO6
> > > > > > 3AU
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to