Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-04-18 Thread Anupam Aggarwal
Thanks David.
That's a great idea. For deserialization the external schema Id will be
used to obtain a dynamic message, so in a way it has to be inline with the
writer schema.
We could limit it to serialization and rename it according to your
suggestion.

Thanks
Anupam
On Tue, Apr 16, 2024 at 3:38 PM David Radley 
wrote:

> Hi Anupam,
> Thanks for your response. I was wondering around the schema id and had
> some thoughts:
>
> I assume that for Confluent Avro, specifying the schema is not normally
> done, but could be useful to force a particular shape.
>
> If you specify a schema id in the format configuration:
> - for deserialization : does this mean the schema id in the payload has to
> match it. If so we lose the ability to have multiple versions of the schema
> on a topic. For me schemaId makes less sense for deserialization as the
> existing mechanism used by Avro / confluent avro formats is working well.
>
> - I can see it makes sense for the serialization where there is an
> existing schema in the registry you want to target.
>
> I suggest the schemaId be called something like schemaIdForSink or
> schemaIdForSerilization; to prevent confusion with the deserialization
> case. We could have the schema as you suggest so we are compatible with the
> confluent avro format.
>
>
> WDYT?
> Kind regards, David.
>
>
> From: Anupam Aggarwal 
> Date: Saturday, 13 April 2024 at 16:08
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> Confluent Format
> Hi David,
>
> Thank you for the suggestion.
> IIUC, you are proposing using an explicit schema string, instead of the
> schemaID.
> This makes sense, as it would make the behavior consistent with Avro,
> although a bit more verbose from a config standpoint.
>
> If we go via the schema string route, the user would have to ensure that
> the input schema string corresponds to an existing schemaID.
> This however, might end up registering a new id (based on
>
> https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493
> ).
>
> How about adding both the options (explicit schema string/ schemaID).
> If a schema string is specified we register a new schemaID, if the user
> specifies an explicit schemaID we just use it directly?
>
> Thanks
> Anupam
>
> On Wed, Apr 10, 2024 at 2:27 PM David Radley 
> wrote:
>
> > Hi,
> > I notice in the draft pr that there is a schema id in the format config.
> I
> > was wondering why? In the confluent avro and existing debezium formats,
> > there is no schema id in the config, but there is the ability to specify
> a
> > complete schema. In the protobuf format there is no schema id.
> >
> > I assume the schema id would be used during serialize in the case there
> is
> > already an existing registered schema and you have its id. I see in the
> > docs
> >
> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
> > there is a serialize example where 2 schemas are registered.
> >
> > I would suggest aiming to copy what the confluent DeSer libraries do
> > rather than having a schema id hard coded in the config.
> >
> > WDYT?
> > Kind regards, David.
> >
> > From: Kevin Lam 
> > Date: Tuesday, 26 March 2024 at 20:06
> > To: dev@flink.apache.org 
> > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > Confluent Format
> > Thanks Anupam! Looking forward to it.
> >
> > On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal <
> anupam.aggar...@gmail.com
> > >
> > wrote:
> >
> > > Hi Kevin,
> > >
> > > Thanks, these are some great points.
> > > Just to clarify, I do agree that the subject should be an option (like
> in
> > > the case of RegistryAvroFormatFactory).
> > > We could fallback to subject and auto-register schemas, if schema-Id
> not
> > > provided explicitly.
> > > In general, I think it would be good to be more explicit about the
> > schemas
> > > used (
> > >
> > >
> >
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > > <
> > >
> >
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > > >
> > > ).
> > > This would also help prevent us from overriding the ids in incompatible
> > > ways.
> > >
> > > Under the current implementation of FlinkToProtoSchemaConverter we
> might
&g

RE: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-04-16 Thread David Radley
Hi Anupam,
Thanks for your response. I was wondering around the schema id and had some 
thoughts:

I assume that for Confluent Avro, specifying the schema is not normally done, 
but could be useful to force a particular shape.

If you specify a schema id in the format configuration:
- for deserialization : does this mean the schema id in the payload has to 
match it. If so we lose the ability to have multiple versions of the schema on 
a topic. For me schemaId makes less sense for deserialization as the existing 
mechanism used by Avro / confluent avro formats is working well.

- I can see it makes sense for the serialization where there is an existing 
schema in the registry you want to target.

I suggest the schemaId be called something like schemaIdForSink or 
schemaIdForSerilization; to prevent confusion with the deserialization case. We 
could have the schema as you suggest so we are compatible with the confluent 
avro format.


WDYT?
Kind regards, David.


From: Anupam Aggarwal 
Date: Saturday, 13 April 2024 at 16:08
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf 
Confluent Format
Hi David,

Thank you for the suggestion.
IIUC, you are proposing using an explicit schema string, instead of the
schemaID.
This makes sense, as it would make the behavior consistent with Avro,
although a bit more verbose from a config standpoint.

If we go via the schema string route, the user would have to ensure that
the input schema string corresponds to an existing schemaID.
This however, might end up registering a new id (based on
https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493
).

How about adding both the options (explicit schema string/ schemaID).
If a schema string is specified we register a new schemaID, if the user
specifies an explicit schemaID we just use it directly?

Thanks
Anupam

On Wed, Apr 10, 2024 at 2:27 PM David Radley 
wrote:

> Hi,
> I notice in the draft pr that there is a schema id in the format config. I
> was wondering why? In the confluent avro and existing debezium formats,
> there is no schema id in the config, but there is the ability to specify a
> complete schema. In the protobuf format there is no schema id.
>
> I assume the schema id would be used during serialize in the case there is
> already an existing registered schema and you have its id. I see in the
> docs
> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
> there is a serialize example where 2 schemas are registered.
>
> I would suggest aiming to copy what the confluent DeSer libraries do
> rather than having a schema id hard coded in the config.
>
> WDYT?
> Kind regards, David.
>
> From: Kevin Lam 
> Date: Tuesday, 26 March 2024 at 20:06
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> Confluent Format
> Thanks Anupam! Looking forward to it.
>
> On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal  >
> wrote:
>
> > Hi Kevin,
> >
> > Thanks, these are some great points.
> > Just to clarify, I do agree that the subject should be an option (like in
> > the case of RegistryAvroFormatFactory).
> > We could fallback to subject and auto-register schemas, if schema-Id not
> > provided explicitly.
> > In general, I think it would be good to be more explicit about the
> schemas
> > used (
> >
> >
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > <
> >
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > >
> > ).
> > This would also help prevent us from overriding the ids in incompatible
> > ways.
> >
> > Under the current implementation of FlinkToProtoSchemaConverter we might
> > end up overwriting the field-Ids.
> > If we are able to locate a prior schema, the approach you outlined makes
> a
> > lot of sense.
> > Let me explore this a bit further and get back(in terms of feasibility).
> >
> > Thanks again!
> > - Anupam
> >
> > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam  >
> > wrote:
> >
> > > Hi Anupam,
> > >
> > > Thanks again for your work on contributing this feature back.
> > >
> > > Sounds good re: the refactoring/re-organizing.
> > >
> > > Regarding the schema-id, in my opinion this should NOT be a
> configuration
> > > option on the format. We should be able to deterministically map the
> > Flink
> > > type to the ProtoSchema and register that with the Schema Registry.
> > >
> > > I 

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-04-13 Thread Anupam Aggarwal
Hi David,

Thank you for the suggestion.
IIUC, you are proposing using an explicit schema string, instead of the
schemaID.
This makes sense, as it would make the behavior consistent with Avro,
although a bit more verbose from a config standpoint.

If we go via the schema string route, the user would have to ensure that
the input schema string corresponds to an existing schemaID.
This however, might end up registering a new id (based on
https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493
).

How about adding both the options (explicit schema string/ schemaID).
If a schema string is specified we register a new schemaID, if the user
specifies an explicit schemaID we just use it directly?

Thanks
Anupam

On Wed, Apr 10, 2024 at 2:27 PM David Radley 
wrote:

> Hi,
> I notice in the draft pr that there is a schema id in the format config. I
> was wondering why? In the confluent avro and existing debezium formats,
> there is no schema id in the config, but there is the ability to specify a
> complete schema. In the protobuf format there is no schema id.
>
> I assume the schema id would be used during serialize in the case there is
> already an existing registered schema and you have its id. I see in the
> docs
> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
> there is a serialize example where 2 schemas are registered.
>
> I would suggest aiming to copy what the confluent DeSer libraries do
> rather than having a schema id hard coded in the config.
>
> WDYT?
> Kind regards, David.
>
> From: Kevin Lam 
> Date: Tuesday, 26 March 2024 at 20:06
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> Confluent Format
> Thanks Anupam! Looking forward to it.
>
> On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal  >
> wrote:
>
> > Hi Kevin,
> >
> > Thanks, these are some great points.
> > Just to clarify, I do agree that the subject should be an option (like in
> > the case of RegistryAvroFormatFactory).
> > We could fallback to subject and auto-register schemas, if schema-Id not
> > provided explicitly.
> > In general, I think it would be good to be more explicit about the
> schemas
> > used (
> >
> >
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > <
> >
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > >
> > ).
> > This would also help prevent us from overriding the ids in incompatible
> > ways.
> >
> > Under the current implementation of FlinkToProtoSchemaConverter we might
> > end up overwriting the field-Ids.
> > If we are able to locate a prior schema, the approach you outlined makes
> a
> > lot of sense.
> > Let me explore this a bit further and get back(in terms of feasibility).
> >
> > Thanks again!
> > - Anupam
> >
> > On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam  >
> > wrote:
> >
> > > Hi Anupam,
> > >
> > > Thanks again for your work on contributing this feature back.
> > >
> > > Sounds good re: the refactoring/re-organizing.
> > >
> > > Regarding the schema-id, in my opinion this should NOT be a
> configuration
> > > option on the format. We should be able to deterministically map the
> > Flink
> > > type to the ProtoSchema and register that with the Schema Registry.
> > >
> > > I think it can make sense to provide the `subject` as a parameter, so
> > that
> > > the serialization format can look up existing schemas.
> > >
> > > This would be used in something I mentioned something related above
> > >
> > > > Another topic I had is Protobuf's field ids. Ideally in Flink it
> would
> > be
> > > > nice if we are idiomatic about not renumbering them in incompatible
> > ways,
> > > > similar to what's discussed on the Schema Registry issue here:
> > > > https://github.com/confluentinc/schema-registry/issues/2551
> > >
> > >
> > > When we construct the ProtobufSchema from the Flink LogicalType, we
> > > shouldn't renumber the field ids in an incompatible way, so one
> approach
> > > would be to use the subject to look up the most recent version, and use
> > > that to evolve the field ids correctly.
> > >
> > >
> > > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal <
> > anupam.aggar...@gmail.com
> > > >
> > > wrote:
> &g

RE: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-04-10 Thread David Radley
Hi,
I notice in the draft pr that there is a schema id in the format config. I was 
wondering why? In the confluent avro and existing debezium formats,  there is 
no schema id in the config, but there is the ability to specify a complete 
schema. In the protobuf format there is no schema id.

I assume the schema id would be used during serialize in the case there is 
already an existing registered schema and you have its id. I see in the docs 
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
 there is a serialize example where 2 schemas are registered.

I would suggest aiming to copy what the confluent DeSer libraries do rather 
than having a schema id hard coded in the config.

WDYT?
Kind regards, David.

From: Kevin Lam 
Date: Tuesday, 26 March 2024 at 20:06
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf 
Confluent Format
Thanks Anupam! Looking forward to it.

On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal 
wrote:

> Hi Kevin,
>
> Thanks, these are some great points.
> Just to clarify, I do agree that the subject should be an option (like in
> the case of RegistryAvroFormatFactory).
> We could fallback to subject and auto-register schemas, if schema-Id not
> provided explicitly.
> In general, I think it would be good to be more explicit about the schemas
> used (
>
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> <
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> >
> ).
> This would also help prevent us from overriding the ids in incompatible
> ways.
>
> Under the current implementation of FlinkToProtoSchemaConverter we might
> end up overwriting the field-Ids.
> If we are able to locate a prior schema, the approach you outlined makes a
> lot of sense.
> Let me explore this a bit further and get back(in terms of feasibility).
>
> Thanks again!
> - Anupam
>
> On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam 
> wrote:
>
> > Hi Anupam,
> >
> > Thanks again for your work on contributing this feature back.
> >
> > Sounds good re: the refactoring/re-organizing.
> >
> > Regarding the schema-id, in my opinion this should NOT be a configuration
> > option on the format. We should be able to deterministically map the
> Flink
> > type to the ProtoSchema and register that with the Schema Registry.
> >
> > I think it can make sense to provide the `subject` as a parameter, so
> that
> > the serialization format can look up existing schemas.
> >
> > This would be used in something I mentioned something related above
> >
> > > Another topic I had is Protobuf's field ids. Ideally in Flink it would
> be
> > > nice if we are idiomatic about not renumbering them in incompatible
> ways,
> > > similar to what's discussed on the Schema Registry issue here:
> > > https://github.com/confluentinc/schema-registry/issues/2551
> >
> >
> > When we construct the ProtobufSchema from the Flink LogicalType, we
> > shouldn't renumber the field ids in an incompatible way, so one approach
> > would be to use the subject to look up the most recent version, and use
> > that to evolve the field ids correctly.
> >
> >
> > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal <
> anupam.aggar...@gmail.com
> > >
> > wrote:
> >
> > > Hi Kevin,
> > >
> > > Thanks for starting the discussion on this.
> > > I will be working on contributing this feature back (This was developed
> > by
> > > Dawid Wysakowicz and others at Confluent).
> > >
> > > I have opened a (very initial) draft PR here
> > > https://github.com/apache/flink/pull/24482  with our current
> > > implementation.
> > > Thanks for the feedback on the PR, I haven’t gotten around to
> > > re-organizing/refactoring the classes yet, but it would be inline with
> > some
> > > of your comments.
> > >
> > > On the overall approach there are some slight variations from the
> initial
> > > proposal.
> > > Our implementation relies on an explicit schema-id being passed through
> > the
> > > config. This could help in cases where one Flink type could potentially
> > map
> > > to multiple proto types.
> > > We could make the schema-Id optional and fall back to deriving it from
> > the
> > > rowType (during serialization) if not present?
> > >
> > > The message index handling is still TBD. I am thinking of replica

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-03-27 Thread Anupam Aggarwal
t; > > > > registry implementation for Flink, and I could help drive
> > > > > contributing
> > > > > > > this
> > > > > > > > back to open source.
> > > > > > > > If you already have an implementation, let's decide which one
> > to
> > > > use
> > > > > :)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Robert
> > > > > > > >
> > > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> > > > > david_rad...@uk.ibm.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Kevin,
> > > > > > > > > Some thoughts on this.
> > > > > > > > > I suggested an Apicurio registry format in the dev list,
> and
> > > was
> > > > > > > advised
> > > > > > > > > to raise a FLIP for this, I suggest the same would apply
> here
> > > (or
> > > > > the
> > > > > > > > > alternative to FLIPs if you cannot raise one). I am
> > prototyping
> > > > an
> > > > > > Avro
> > > > > > > > > Apicurio format, prior to raising the Flip,  and notice
> that
> > > the
> > > > > > > > readSchema
> > > > > > > > > in the SchemaCoder only takes a byte array ,but I need to
> > pass
> > > > down
> > > > > > the
> > > > > > > > > Kafka headers (where the Apicurio globalId identifying the
> > > schema
> > > > > > > lives).
> > > > > > > > >
> > > > > > > > > I assume:
> > > > > > > > >
> > > > > > > > >   *   for the confluent Protobuf format you would extend
> the
> > > > > Protobuf
> > > > > > > > > format to drive some Schema Registry logic for Protobuf
> > > (similar
> > > > to
> > > > > > the
> > > > > > > > way
> > > > > > > > > Avro does it) where the magic byte _ schema id can be
> > obtained
> > > > and
> > > > > > the
> > > > > > > > > schema looked up using the Confluent Schema registry.
> > > > > > > > >   *   It would be good if any protobuf format enhancements
> > for
> > > > > Schema
> > > > > > > > > registries pass down the Kafka headers (I am thinking as a
> > > > > > Map > > > > > > > > Object> for Avro) as well as the message payload so
> Apicurio
> > > > > registry
> > > > > > > > could
> > > > > > > > > work with this.
> > > > > > > > >   *   It would make sense to have the Confluent schema
> lookup
> > > in
> > > > > > common
> > > > > > > > > code, which is part of the SchemaCoder readSchema  logic.
> > > > > > > > >   *   I assume the ProtobufSchemaCoder readSchema would
> > return
> > > a
> > > > > > > Protobuf
> > > > > > > > > Schema object.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I also wondered whether these Kafka only formats should be
> > > moved
> > > > to
> > > > > > the
> > > > > > > > > Kafka connector repo, or whether they might in the future
> be
> > > used
> > > > > > > outside
> > > > > > > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > > > > > > >Kind regards, David.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > From: Kevin Lam 
> > > > > > > > > Date: Wednesday, 21 February 2024 at 18:51
> > > > > > > > > To: dev@flink.apache.org 
> > > > > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium
> > > > Protobuf
> > > > > > > > > Confluent Format
> > > > > > > > > I would love to get some feedback from the community on
> this
> > > JIRA
> > > > > > > issue:
> > > > > > > > >
> > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > > > > > > > >
> > > > > > > > > I am looking into creating a PR and would appreciate some
> > > review
> > > > on
> > > > > > the
> > > > > > > > > approach.
> > > > > > > > >
> > > > > > > > > In terms of design I think we can mirror the
> > > > > > `debezium-avro-confluent`
> > > > > > > > and
> > > > > > > > > `avro-confluent` formats already available in Flink:
> > > > > > > > >
> > > > > > > > >1. `protobuf-confluent` format which uses DynamicMessage
> > > > > > > > ><
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > > > > > > > > >
> > > > > > > > >for encoding and decoding.
> > > > > > > > >   - For encoding the Flink RowType will be used to
> > > > dynamically
> > > > > > > > create a
> > > > > > > > >   Protobuf Schema and register it with the Confluent
> > Schema
> > > > > > > > > Registry. It will
> > > > > > > > >   use the same schema to construct a DynamicMessage and
> > > > > serialize
> > > > > > > it.
> > > > > > > > >   - For decoding, the schema will be fetched from the
> > > > registry
> > > > > > and
> > > > > > > > use
> > > > > > > > >   DynamicMessage to deserialize and convert the
> Protobuf
> > > > object
> > > > > > to
> > > > > > > a
> > > > > > > > > Flink
> > > > > > > > >   RowData.
> > > > > > > > >   - Note: here there is no external .proto file
> > > > > > > > >2. `debezium-avro-confluent` format which unpacks the
> > > Debezium
> > > > > > > > Envelope
> > > > > > > > >and collects the appropriate UPDATE_BEFORE,
> UPDATE_AFTER,
> > > > > INSERT,
> > > > > > > > DELETE
> > > > > > > > >events.
> > > > > > > > >   - We may be able to refactor and reuse code from the
> > > > existing
> > > > > > > > >   DebeziumAvroDeserializationSchema +
> > > > > > > DebeziumAvroSerializationSchema
> > > > > > > > > since
> > > > > > > > >   the deser logic is largely delegated to and these
> > Schemas
> > > > are
> > > > > > > > > concerned
> > > > > > > > >   with the handling the Debezium envelope.
> > > > > > > > >3. Move the Confluent Schema Registry Client code to a
> > > > separate
> > > > > > > maven
> > > > > > > > >module, flink-formats/flink-confluent-common, and extend
> > it
> > > to
> > > > > > > support
> > > > > > > > >ProtobufSchemaProvider
> > > > > > > > ><
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> > > > > > > > > >
> > > > > > > > >.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Does anyone have any feedback or objections to this
> approach?
> > > > > > > > >
> > > > > > > > > Unless otherwise stated above:
> > > > > > > > >
> > > > > > > > > IBM United Kingdom Limited
> > > > > > > > > Registered in England and Wales with number 741598
> > > > > > > > > Registered office: PO Box 41, North Harbour, Portsmouth,
> > Hants.
> > > > PO6
> > > > > > 3AU
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-03-26 Thread Kevin Lam
 > > Looking forward to it!
> > > >
> > > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger 
> > > > wrote:
> > > >
> > > > > Hey Kevin,
> > > > >
> > > > > Thanks a lot. Then let's contribute the Confluent implementation to
> > > > > apache/flink. We can't start working on this immediately because
> of a
> > > > team
> > > > > event next week, but within the next two weeks, we will start
> working
> > > on
> > > > > this.
> > > > > It probably makes sense for us to open a pull request of what we
> have
> > > > > already, so that you can start reviewing and maybe also
> contributing
> > to
> > > > the
> > > > > PR.
> > > > > I hope this timeline works for you!
> > > > >
> > > > > Let's also decide if we need a FLIP once the code is public.
> > > > > We will look into the field ids.
> > > > >
> > > > >
> > > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Robert,
> > > > > >
> > > > > > Thanks for your response. I have a partial implementation, just
> for
> > > the
> > > > > > decoding portion.
> > > > > >
> > > > > > The code I have is pretty rough and doesn't do any of the
> > refactors I
> > > > > > mentioned, but the decoder logic does pull the schema from the
> > schema
> > > > > > registry and use that to deserialize the DynamicMessage before
> > > > converting
> > > > > > it to RowData using a DynamicMessageToRowDataConverter class. For
> > the
> > > > > other
> > > > > > aspects, I would need to start from scratch for the encoder.
> > > > > >
> > > > > > Would be very happy to see you drive the contribution back to
> open
> > > > source
> > > > > > from Confluent, or collaborate on this.
> > > > > >
> > > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it
> > > would
> > > > be
> > > > > > nice if we are idiomatic about not renumbering them in
> incompatible
> > > > ways,
> > > > > > similar to what's discussed on the Schema Registry issue here:
> > > > > > https://github.com/confluentinc/schema-registry/issues/2551
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <
> > rmetz...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > +1 to support the format in Flink.
> > > > > > >
> > > > > > > @Kevin: Do you already have an implementation for this inhouse
> > that
> > > > you
> > > > > > are
> > > > > > > looking to upstream, or would you start from scratch?
> > > > > > > I'm asking because my current employer, Confluent, has a
> Protobuf
> > > > > Schema
> > > > > > > registry implementation for Flink, and I could help drive
> > > > contributing
> > > > > > this
> > > > > > > back to open source.
> > > > > > > If you already have an implementation, let's decide which one
> to
> > > use
> > > > :)
> > > > > > >
> > > > > > > Best,
> > > > > > > Robert
> > > > > > >
> > > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> > > > david_rad...@uk.ibm.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Kevin,
> > > > > > > > Some thoughts on this.
> > > > > > > > I suggested an Apicurio registry format in the dev list, and
> > was
> > > > > > advised
> > > > > > > > to raise a FLIP for this, I suggest the same would apply here
> > (or
> > > > the
> > > > > > > > alternative to FLIPs if you cannot raise one). I am
> prototyping
> > > an
> > > > &

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-03-13 Thread Anupam Aggarwal
; the
> > > > PR.
> > > > I hope this timeline works for you!
> > > >
> > > > Let's also decide if we need a FLIP once the code is public.
> > > > We will look into the field ids.
> > > >
> > > >
> > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam
> >  > > >
> > > > wrote:
> > > >
> > > > > Hey Robert,
> > > > >
> > > > > Thanks for your response. I have a partial implementation, just for
> > the
> > > > > decoding portion.
> > > > >
> > > > > The code I have is pretty rough and doesn't do any of the
> refactors I
> > > > > mentioned, but the decoder logic does pull the schema from the
> schema
> > > > > registry and use that to deserialize the DynamicMessage before
> > > converting
> > > > > it to RowData using a DynamicMessageToRowDataConverter class. For
> the
> > > > other
> > > > > aspects, I would need to start from scratch for the encoder.
> > > > >
> > > > > Would be very happy to see you drive the contribution back to open
> > > source
> > > > > from Confluent, or collaborate on this.
> > > > >
> > > > > Another topic I had is Protobuf's field ids. Ideally in Flink it
> > would
> > > be
> > > > > nice if we are idiomatic about not renumbering them in incompatible
> > > ways,
> > > > > similar to what's discussed on the Schema Registry issue here:
> > > > > https://github.com/confluentinc/schema-registry/issues/2551
> > > > >
> > > > >
> > > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger <
> rmetz...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > +1 to support the format in Flink.
> > > > > >
> > > > > > @Kevin: Do you already have an implementation for this inhouse
> that
> > > you
> > > > > are
> > > > > > looking to upstream, or would you start from scratch?
> > > > > > I'm asking because my current employer, Confluent, has a Protobuf
> > > > Schema
> > > > > > registry implementation for Flink, and I could help drive
> > > contributing
> > > > > this
> > > > > > back to open source.
> > > > > > If you already have an implementation, let's decide which one to
> > use
> > > :)
> > > > > >
> > > > > > Best,
> > > > > > Robert
> > > > > >
> > > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> > > david_rad...@uk.ibm.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Kevin,
> > > > > > > Some thoughts on this.
> > > > > > > I suggested an Apicurio registry format in the dev list, and
> was
> > > > > advised
> > > > > > > to raise a FLIP for this, I suggest the same would apply here
> (or
> > > the
> > > > > > > alternative to FLIPs if you cannot raise one). I am prototyping
> > an
> > > > Avro
> > > > > > > Apicurio format, prior to raising the Flip,  and notice that
> the
> > > > > > readSchema
> > > > > > > in the SchemaCoder only takes a byte array ,but I need to pass
> > down
> > > > the
> > > > > > > Kafka headers (where the Apicurio globalId identifying the
> schema
> > > > > lives).
> > > > > > >
> > > > > > > I assume:
> > > > > > >
> > > > > > >   *   for the confluent Protobuf format you would extend the
> > > Protobuf
> > > > > > > format to drive some Schema Registry logic for Protobuf
> (similar
> > to
> > > > the
> > > > > > way
> > > > > > > Avro does it) where the magic byte _ schema id can be obtained
> > and
> > > > the
> > > > > > > schema looked up using the Confluent Schema registry.
> > > > > > >   *   It would be good if any protobuf format enhancements for
> > > Schema
> > > > > > > registries pass down the Kafka headers (I am thinking as a
> > >

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-03-12 Thread Kevin Lam
hat's discussed on the Schema Registry issue here:
> > > > https://github.com/confluentinc/schema-registry/issues/2551
> > > >
> > > >
> > > > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger 
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > +1 to support the format in Flink.
> > > > >
> > > > > @Kevin: Do you already have an implementation for this inhouse that
> > you
> > > > are
> > > > > looking to upstream, or would you start from scratch?
> > > > > I'm asking because my current employer, Confluent, has a Protobuf
> > > Schema
> > > > > registry implementation for Flink, and I could help drive
> > contributing
> > > > this
> > > > > back to open source.
> > > > > If you already have an implementation, let's decide which one to
> use
> > :)
> > > > >
> > > > > Best,
> > > > > Robert
> > > > >
> > > > > On Thu, Feb 22, 2024 at 2:05 PM David Radley <
> > david_rad...@uk.ibm.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Kevin,
> > > > > > Some thoughts on this.
> > > > > > I suggested an Apicurio registry format in the dev list, and was
> > > > advised
> > > > > > to raise a FLIP for this, I suggest the same would apply here (or
> > the
> > > > > > alternative to FLIPs if you cannot raise one). I am prototyping
> an
> > > Avro
> > > > > > Apicurio format, prior to raising the Flip,  and notice that the
> > > > > readSchema
> > > > > > in the SchemaCoder only takes a byte array ,but I need to pass
> down
> > > the
> > > > > > Kafka headers (where the Apicurio globalId identifying the schema
> > > > lives).
> > > > > >
> > > > > > I assume:
> > > > > >
> > > > > >   *   for the confluent Protobuf format you would extend the
> > Protobuf
> > > > > > format to drive some Schema Registry logic for Protobuf (similar
> to
> > > the
> > > > > way
> > > > > > Avro does it) where the magic byte _ schema id can be obtained
> and
> > > the
> > > > > > schema looked up using the Confluent Schema registry.
> > > > > >   *   It would be good if any protobuf format enhancements for
> > Schema
> > > > > > registries pass down the Kafka headers (I am thinking as a
> > > Map > > > > > Object> for Avro) as well as the message payload so Apicurio
> > registry
> > > > > could
> > > > > > work with this.
> > > > > >   *   It would make sense to have the Confluent schema lookup in
> > > common
> > > > > > code, which is part of the SchemaCoder readSchema  logic.
> > > > > >   *   I assume the ProtobufSchemaCoder readSchema would return a
> > > > Protobuf
> > > > > > Schema object.
> > > > > >
> > > > > >
> > > > > >
> > > > > > I also wondered whether these Kafka only formats should be moved
> to
> > > the
> > > > > > Kafka connector repo, or whether they might in the future be used
> > > > outside
> > > > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > > > >Kind regards, David.
> > > > > >
> > > > > >
> > > > > > From: Kevin Lam 
> > > > > > Date: Wednesday, 21 February 2024 at 18:51
> > > > > > To: dev@flink.apache.org 
> > > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium
> Protobuf
> > > > > > Confluent Format
> > > > > > I would love to get some feedback from the community on this JIRA
> > > > issue:
> > > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > > > > >
> > > > > > I am looking into creating a PR and would appreciate some review
> on
> > > the
> > > > > > approach.
> > > > > >
> > > > > > In terms of design I think we can mirror the
> > > `debezium-avro-confluent`
> > > > > and
> > > > > > `avro-confluent` formats already availab

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-03-12 Thread Anupam Aggarwal
format, prior to raising the Flip,  and notice that the
> > > > readSchema
> > > > > in the SchemaCoder only takes a byte array ,but I need to pass down
> > the
> > > > > Kafka headers (where the Apicurio globalId identifying the schema
> > > lives).
> > > > >
> > > > > I assume:
> > > > >
> > > > >   *   for the confluent Protobuf format you would extend the
> Protobuf
> > > > > format to drive some Schema Registry logic for Protobuf (similar to
> > the
> > > > way
> > > > > Avro does it) where the magic byte _ schema id can be obtained and
> > the
> > > > > schema looked up using the Confluent Schema registry.
> > > > >   *   It would be good if any protobuf format enhancements for
> Schema
> > > > > registries pass down the Kafka headers (I am thinking as a
> > Map > > > > Object> for Avro) as well as the message payload so Apicurio
> registry
> > > > could
> > > > > work with this.
> > > > >   *   It would make sense to have the Confluent schema lookup in
> > common
> > > > > code, which is part of the SchemaCoder readSchema  logic.
> > > > >   *   I assume the ProtobufSchemaCoder readSchema would return a
> > > Protobuf
> > > > > Schema object.
> > > > >
> > > > >
> > > > >
> > > > > I also wondered whether these Kafka only formats should be moved to
> > the
> > > > > Kafka connector repo, or whether they might in the future be used
> > > outside
> > > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > > >Kind regards, David.
> > > > >
> > > > >
> > > > > From: Kevin Lam 
> > > > > Date: Wednesday, 21 February 2024 at 18:51
> > > > > To: dev@flink.apache.org 
> > > > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > > > > Confluent Format
> > > > > I would love to get some feedback from the community on this JIRA
> > > issue:
> > > > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > > > >
> > > > > I am looking into creating a PR and would appreciate some review on
> > the
> > > > > approach.
> > > > >
> > > > > In terms of design I think we can mirror the
> > `debezium-avro-confluent`
> > > > and
> > > > > `avro-confluent` formats already available in Flink:
> > > > >
> > > > >1. `protobuf-confluent` format which uses DynamicMessage
> > > > ><
> > > > >
> > > >
> > >
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > > > > >
> > > > >for encoding and decoding.
> > > > >   - For encoding the Flink RowType will be used to dynamically
> > > > create a
> > > > >   Protobuf Schema and register it with the Confluent Schema
> > > > > Registry. It will
> > > > >   use the same schema to construct a DynamicMessage and
> serialize
> > > it.
> > > > >   - For decoding, the schema will be fetched from the registry
> > and
> > > > use
> > > > >   DynamicMessage to deserialize and convert the Protobuf object
> > to
> > > a
> > > > > Flink
> > > > >   RowData.
> > > > >   - Note: here there is no external .proto file
> > > > >2. `debezium-avro-confluent` format which unpacks the Debezium
> > > > Envelope
> > > > >and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER,
> INSERT,
> > > > DELETE
> > > > >events.
> > > > >   - We may be able to refactor and reuse code from the existing
> > > > >   DebeziumAvroDeserializationSchema +
> > > DebeziumAvroSerializationSchema
> > > > > since
> > > > >   the deser logic is largely delegated to and these Schemas are
> > > > > concerned
> > > > >   with the handling the Debezium envelope.
> > > > >3. Move the Confluent Schema Registry Client code to a separate
> > > maven
> > > > >module, flink-formats/flink-confluent-common, and extend it to
> > > support
> > > > >ProtobufSchemaProvider
> > > > ><
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> > > > > >
> > > > >.
> > > > >
> > > > >
> > > > > Does anyone have any feedback or objections to this approach?
> > > > >
> > > > > Unless otherwise stated above:
> > > > >
> > > > > IBM United Kingdom Limited
> > > > > Registered in England and Wales with number 741598
> > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> > 3AU
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-29 Thread Kevin Lam
Hey Robert,

Awesome thanks, that timeline works for me. Sounds good re: deciding on
FLIP once we have the PR, and thanks for looking into the field ids.

Looking forward to it!

On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger  wrote:

> Hey Kevin,
>
> Thanks a lot. Then let's contribute the Confluent implementation to
> apache/flink. We can't start working on this immediately because of a team
> event next week, but within the next two weeks, we will start working on
> this.
> It probably makes sense for us to open a pull request of what we have
> already, so that you can start reviewing and maybe also contributing to the
> PR.
> I hope this timeline works for you!
>
> Let's also decide if we need a FLIP once the code is public.
> We will look into the field ids.
>
>
> On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam 
> wrote:
>
> > Hey Robert,
> >
> > Thanks for your response. I have a partial implementation, just for the
> > decoding portion.
> >
> > The code I have is pretty rough and doesn't do any of the refactors I
> > mentioned, but the decoder logic does pull the schema from the schema
> > registry and use that to deserialize the DynamicMessage before converting
> > it to RowData using a DynamicMessageToRowDataConverter class. For the
> other
> > aspects, I would need to start from scratch for the encoder.
> >
> > Would be very happy to see you drive the contribution back to open source
> > from Confluent, or collaborate on this.
> >
> > Another topic I had is Protobuf's field ids. Ideally in Flink it would be
> > nice if we are idiomatic about not renumbering them in incompatible ways,
> > similar to what's discussed on the Schema Registry issue here:
> > https://github.com/confluentinc/schema-registry/issues/2551
> >
> >
> > On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger 
> > wrote:
> >
> > > Hi all,
> > >
> > > +1 to support the format in Flink.
> > >
> > > @Kevin: Do you already have an implementation for this inhouse that you
> > are
> > > looking to upstream, or would you start from scratch?
> > > I'm asking because my current employer, Confluent, has a Protobuf
> Schema
> > > registry implementation for Flink, and I could help drive contributing
> > this
> > > back to open source.
> > > If you already have an implementation, let's decide which one to use :)
> > >
> > > Best,
> > > Robert
> > >
> > > On Thu, Feb 22, 2024 at 2:05 PM David Radley 
> > > wrote:
> > >
> > > > Hi Kevin,
> > > > Some thoughts on this.
> > > > I suggested an Apicurio registry format in the dev list, and was
> > advised
> > > > to raise a FLIP for this, I suggest the same would apply here (or the
> > > > alternative to FLIPs if you cannot raise one). I am prototyping an
> Avro
> > > > Apicurio format, prior to raising the Flip,  and notice that the
> > > readSchema
> > > > in the SchemaCoder only takes a byte array ,but I need to pass down
> the
> > > > Kafka headers (where the Apicurio globalId identifying the schema
> > lives).
> > > >
> > > > I assume:
> > > >
> > > >   *   for the confluent Protobuf format you would extend the Protobuf
> > > > format to drive some Schema Registry logic for Protobuf (similar to
> the
> > > way
> > > > Avro does it) where the magic byte _ schema id can be obtained and
> the
> > > > schema looked up using the Confluent Schema registry.
> > > >   *   It would be good if any protobuf format enhancements for Schema
> > > > registries pass down the Kafka headers (I am thinking as a
> Map > > > Object> for Avro) as well as the message payload so Apicurio registry
> > > could
> > > > work with this.
> > > >   *   It would make sense to have the Confluent schema lookup in
> common
> > > > code, which is part of the SchemaCoder readSchema  logic.
> > > >   *   I assume the ProtobufSchemaCoder readSchema would return a
> > Protobuf
> > > > Schema object.
> > > >
> > > >
> > > >
> > > > I also wondered whether these Kafka only formats should be moved to
> the
> > > > Kafka connector repo, or whether they might in the future be used
> > outside
> > > > Kafka – e.g. Avro/Protobuf files in a database.
> > > >Kind regards, David.
> > > >
> > > >
> > > > From: Kevin Lam 
>

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-29 Thread Robert Metzger
Hey Kevin,

Thanks a lot. Then let's contribute the Confluent implementation to
apache/flink. We can't start working on this immediately because of a team
event next week, but within the next two weeks, we will start working on
this.
It probably makes sense for us to open a pull request of what we have
already, so that you can start reviewing and maybe also contributing to the
PR.
I hope this timeline works for you!

Let's also decide if we need a FLIP once the code is public.
We will look into the field ids.


On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam 
wrote:

> Hey Robert,
>
> Thanks for your response. I have a partial implementation, just for the
> decoding portion.
>
> The code I have is pretty rough and doesn't do any of the refactors I
> mentioned, but the decoder logic does pull the schema from the schema
> registry and use that to deserialize the DynamicMessage before converting
> it to RowData using a DynamicMessageToRowDataConverter class. For the other
> aspects, I would need to start from scratch for the encoder.
>
> Would be very happy to see you drive the contribution back to open source
> from Confluent, or collaborate on this.
>
> Another topic I had is Protobuf's field ids. Ideally in Flink it would be
> nice if we are idiomatic about not renumbering them in incompatible ways,
> similar to what's discussed on the Schema Registry issue here:
> https://github.com/confluentinc/schema-registry/issues/2551
>
>
> On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger 
> wrote:
>
> > Hi all,
> >
> > +1 to support the format in Flink.
> >
> > @Kevin: Do you already have an implementation for this inhouse that you
> are
> > looking to upstream, or would you start from scratch?
> > I'm asking because my current employer, Confluent, has a Protobuf Schema
> > registry implementation for Flink, and I could help drive contributing
> this
> > back to open source.
> > If you already have an implementation, let's decide which one to use :)
> >
> > Best,
> > Robert
> >
> > On Thu, Feb 22, 2024 at 2:05 PM David Radley 
> > wrote:
> >
> > > Hi Kevin,
> > > Some thoughts on this.
> > > I suggested an Apicurio registry format in the dev list, and was
> advised
> > > to raise a FLIP for this, I suggest the same would apply here (or the
> > > alternative to FLIPs if you cannot raise one). I am prototyping an Avro
> > > Apicurio format, prior to raising the Flip,  and notice that the
> > readSchema
> > > in the SchemaCoder only takes a byte array ,but I need to pass down the
> > > Kafka headers (where the Apicurio globalId identifying the schema
> lives).
> > >
> > > I assume:
> > >
> > >   *   for the confluent Protobuf format you would extend the Protobuf
> > > format to drive some Schema Registry logic for Protobuf (similar to the
> > way
> > > Avro does it) where the magic byte _ schema id can be obtained and the
> > > schema looked up using the Confluent Schema registry.
> > >   *   It would be good if any protobuf format enhancements for Schema
> > > registries pass down the Kafka headers (I am thinking as a Map > > Object> for Avro) as well as the message payload so Apicurio registry
> > could
> > > work with this.
> > >   *   It would make sense to have the Confluent schema lookup in common
> > > code, which is part of the SchemaCoder readSchema  logic.
> > >   *   I assume the ProtobufSchemaCoder readSchema would return a
> Protobuf
> > > Schema object.
> > >
> > >
> > >
> > > I also wondered whether these Kafka only formats should be moved to the
> > > Kafka connector repo, or whether they might in the future be used
> outside
> > > Kafka – e.g. Avro/Protobuf files in a database.
> > >Kind regards, David.
> > >
> > >
> > > From: Kevin Lam 
> > > Date: Wednesday, 21 February 2024 at 18:51
> > > To: dev@flink.apache.org 
> > > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > > Confluent Format
> > > I would love to get some feedback from the community on this JIRA
> issue:
> > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> > >
> > > I am looking into creating a PR and would appreciate some review on the
> > > approach.
> > >
> > > In terms of design I think we can mirror the `debezium-avro-confluent`
> > and
> > > `avro-confluent` formats already available in Flink:
> > >
> > >1. `protobuf-confluent` format which uses DynamicM

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-27 Thread Kevin Lam
Hey Robert,

Thanks for your response. I have a partial implementation, just for the
decoding portion.

The code I have is pretty rough and doesn't do any of the refactors I
mentioned, but the decoder logic does pull the schema from the schema
registry and use that to deserialize the DynamicMessage before converting
it to RowData using a DynamicMessageToRowDataConverter class. For the other
aspects, I would need to start from scratch for the encoder.

Would be very happy to see you drive the contribution back to open source
from Confluent, or collaborate on this.

Another topic I had is Protobuf's field ids. Ideally in Flink it would be
nice if we are idiomatic about not renumbering them in incompatible ways,
similar to what's discussed on the Schema Registry issue here:
https://github.com/confluentinc/schema-registry/issues/2551


On Tue, Feb 27, 2024 at 5:51 AM Robert Metzger  wrote:

> Hi all,
>
> +1 to support the format in Flink.
>
> @Kevin: Do you already have an implementation for this inhouse that you are
> looking to upstream, or would you start from scratch?
> I'm asking because my current employer, Confluent, has a Protobuf Schema
> registry implementation for Flink, and I could help drive contributing this
> back to open source.
> If you already have an implementation, let's decide which one to use :)
>
> Best,
> Robert
>
> On Thu, Feb 22, 2024 at 2:05 PM David Radley 
> wrote:
>
> > Hi Kevin,
> > Some thoughts on this.
> > I suggested an Apicurio registry format in the dev list, and was advised
> > to raise a FLIP for this, I suggest the same would apply here (or the
> > alternative to FLIPs if you cannot raise one). I am prototyping an Avro
> > Apicurio format, prior to raising the Flip,  and notice that the
> readSchema
> > in the SchemaCoder only takes a byte array ,but I need to pass down the
> > Kafka headers (where the Apicurio globalId identifying the schema lives).
> >
> > I assume:
> >
> >   *   for the confluent Protobuf format you would extend the Protobuf
> > format to drive some Schema Registry logic for Protobuf (similar to the
> way
> > Avro does it) where the magic byte _ schema id can be obtained and the
> > schema looked up using the Confluent Schema registry.
> >   *   It would be good if any protobuf format enhancements for Schema
> > registries pass down the Kafka headers (I am thinking as a Map > Object> for Avro) as well as the message payload so Apicurio registry
> could
> > work with this.
> >   *   It would make sense to have the Confluent schema lookup in common
> > code, which is part of the SchemaCoder readSchema  logic.
> >   *   I assume the ProtobufSchemaCoder readSchema would return a Protobuf
> > Schema object.
> >
> >
> >
> > I also wondered whether these Kafka only formats should be moved to the
> > Kafka connector repo, or whether they might in the future be used outside
> > Kafka – e.g. Avro/Protobuf files in a database.
> >Kind regards, David.
> >
> >
> > From: Kevin Lam 
> > Date: Wednesday, 21 February 2024 at 18:51
> > To: dev@flink.apache.org 
> > Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > Confluent Format
> > I would love to get some feedback from the community on this JIRA issue:
> > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
> >
> > I am looking into creating a PR and would appreciate some review on the
> > approach.
> >
> > In terms of design I think we can mirror the `debezium-avro-confluent`
> and
> > `avro-confluent` formats already available in Flink:
> >
> >1. `protobuf-confluent` format which uses DynamicMessage
> ><
> >
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> > >
> >for encoding and decoding.
> >   - For encoding the Flink RowType will be used to dynamically
> create a
> >   Protobuf Schema and register it with the Confluent Schema
> > Registry. It will
> >   use the same schema to construct a DynamicMessage and serialize it.
> >   - For decoding, the schema will be fetched from the registry and
> use
> >   DynamicMessage to deserialize and convert the Protobuf object to a
> > Flink
> >   RowData.
> >   - Note: here there is no external .proto file
> >2. `debezium-avro-confluent` format which unpacks the Debezium
> Envelope
> >and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT,
> DELETE
> >events.
> >   - We may be able to refactor and reuse code from the existing
> >   DebeziumAvroDeserializationSchema 

Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-27 Thread Robert Metzger
Hi all,

+1 to support the format in Flink.

@Kevin: Do you already have an implementation for this inhouse that you are
looking to upstream, or would you start from scratch?
I'm asking because my current employer, Confluent, has a Protobuf Schema
registry implementation for Flink, and I could help drive contributing this
back to open source.
If you already have an implementation, let's decide which one to use :)

Best,
Robert

On Thu, Feb 22, 2024 at 2:05 PM David Radley 
wrote:

> Hi Kevin,
> Some thoughts on this.
> I suggested an Apicurio registry format in the dev list, and was advised
> to raise a FLIP for this, I suggest the same would apply here (or the
> alternative to FLIPs if you cannot raise one). I am prototyping an Avro
> Apicurio format, prior to raising the Flip,  and notice that the readSchema
> in the SchemaCoder only takes a byte array ,but I need to pass down the
> Kafka headers (where the Apicurio globalId identifying the schema lives).
>
> I assume:
>
>   *   for the confluent Protobuf format you would extend the Protobuf
> format to drive some Schema Registry logic for Protobuf (similar to the way
> Avro does it) where the magic byte _ schema id can be obtained and the
> schema looked up using the Confluent Schema registry.
>   *   It would be good if any protobuf format enhancements for Schema
> registries pass down the Kafka headers (I am thinking as a Map Object> for Avro) as well as the message payload so Apicurio registry could
> work with this.
>   *   It would make sense to have the Confluent schema lookup in common
> code, which is part of the SchemaCoder readSchema  logic.
>   *   I assume the ProtobufSchemaCoder readSchema would return a Protobuf
> Schema object.
>
>
>
> I also wondered whether these Kafka only formats should be moved to the
> Kafka connector repo, or whether they might in the future be used outside
> Kafka – e.g. Avro/Protobuf files in a database.
>Kind regards, David.
>
>
> From: Kevin Lam 
> Date: Wednesday, 21 February 2024 at 18:51
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf
> Confluent Format
> I would love to get some feedback from the community on this JIRA issue:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440
>
> I am looking into creating a PR and would appreciate some review on the
> approach.
>
> In terms of design I think we can mirror the `debezium-avro-confluent` and
> `avro-confluent` formats already available in Flink:
>
>1. `protobuf-confluent` format which uses DynamicMessage
><
> https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
> >
>for encoding and decoding.
>   - For encoding the Flink RowType will be used to dynamically create a
>   Protobuf Schema and register it with the Confluent Schema
> Registry. It will
>   use the same schema to construct a DynamicMessage and serialize it.
>   - For decoding, the schema will be fetched from the registry and use
>   DynamicMessage to deserialize and convert the Protobuf object to a
> Flink
>   RowData.
>   - Note: here there is no external .proto file
>2. `debezium-avro-confluent` format which unpacks the Debezium Envelope
>and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE
>events.
>   - We may be able to refactor and reuse code from the existing
>   DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema
> since
>   the deser logic is largely delegated to and these Schemas are
> concerned
>   with the handling the Debezium envelope.
>3. Move the Confluent Schema Registry Client code to a separate maven
>module, flink-formats/flink-confluent-common, and extend it to support
>ProtobufSchemaProvider
><
> https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
> >
>.
>
>
> Does anyone have any feedback or objections to this approach?
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-22 Thread David Radley
Hi Kevin,
Some thoughts on this.
I suggested an Apicurio registry format in the dev list, and was advised to 
raise a FLIP for this, I suggest the same would apply here (or the alternative 
to FLIPs if you cannot raise one). I am prototyping an Avro Apicurio format, 
prior to raising the Flip,  and notice that the readSchema in the SchemaCoder 
only takes a byte array ,but I need to pass down the Kafka headers (where the 
Apicurio globalId identifying the schema lives).

I assume:

  *   for the confluent Protobuf format you would extend the Protobuf format to 
drive some Schema Registry logic for Protobuf (similar to the way Avro does it) 
where the magic byte _ schema id can be obtained and the schema looked up using 
the Confluent Schema registry.
  *   It would be good if any protobuf format enhancements for Schema 
registries pass down the Kafka headers (I am thinking as a Map 
for Avro) as well as the message payload so Apicurio registry could work with 
this.
  *   It would make sense to have the Confluent schema lookup in common code, 
which is part of the SchemaCoder readSchema  logic.
  *   I assume the ProtobufSchemaCoder readSchema would return a Protobuf 
Schema object.



I also wondered whether these Kafka only formats should be moved to the Kafka 
connector repo, or whether they might in the future be used outside Kafka – 
e.g. Avro/Protobuf files in a database.
   Kind regards, David.


From: Kevin Lam 
Date: Wednesday, 21 February 2024 at 18:51
To: dev@flink.apache.org 
Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent 
Format
I would love to get some feedback from the community on this JIRA issue:
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440

I am looking into creating a PR and would appreciate some review on the
approach.

In terms of design I think we can mirror the `debezium-avro-confluent` and
`avro-confluent` formats already available in Flink:

   1. `protobuf-confluent` format which uses DynamicMessage
   
<https://protobuf.dev/reference/java/api-docs/com/google/protobuf/DynamicMessage
 >
   for encoding and decoding.
  - For encoding the Flink RowType will be used to dynamically create a
  Protobuf Schema and register it with the Confluent Schema
Registry. It will
  use the same schema to construct a DynamicMessage and serialize it.
  - For decoding, the schema will be fetched from the registry and use
  DynamicMessage to deserialize and convert the Protobuf object to a Flink
  RowData.
  - Note: here there is no external .proto file
   2. `debezium-avro-confluent` format which unpacks the Debezium Envelope
   and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE
   events.
  - We may be able to refactor and reuse code from the existing
  DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema since
  the deser logic is largely delegated to and these Schemas are concerned
  with the handling the Debezium envelope.
   3. Move the Confluent Schema Registry Client code to a separate maven
   module, flink-formats/flink-confluent-common, and extend it to support
   ProtobufSchemaProvider
   
<https://github.com/confluentinc/schema-registry/blob/ca226f2e1e2091c67b372338221b57fdd435d9f2/protobuf-provider/src/main/java/io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider.java#L26
 >
   .


Does anyone have any feedback or objections to this approach?

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


[DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-21 Thread Kevin Lam
I would love to get some feedback from the community on this JIRA issue:
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440

I am looking into creating a PR and would appreciate some review on the
approach.

In terms of design I think we can mirror the `debezium-avro-confluent` and
`avro-confluent` formats already available in Flink:

   1. `protobuf-confluent` format which uses DynamicMessage
   

   for encoding and decoding.
  - For encoding the Flink RowType will be used to dynamically create a
  Protobuf Schema and register it with the Confluent Schema
Registry. It will
  use the same schema to construct a DynamicMessage and serialize it.
  - For decoding, the schema will be fetched from the registry and use
  DynamicMessage to deserialize and convert the Protobuf object to a Flink
  RowData.
  - Note: here there is no external .proto file
   2. `debezium-avro-confluent` format which unpacks the Debezium Envelope
   and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE
   events.
  - We may be able to refactor and reuse code from the existing
  DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema since
  the deser logic is largely delegated to and these Schemas are concerned
  with the handling the Debezium envelope.
   3. Move the Confluent Schema Registry Client code to a separate maven
   module, flink-formats/flink-confluent-common, and extend it to support
   ProtobufSchemaProvider
   

   .


Does anyone have any feedback or objections to this approach?