Hi Dawid,

Thanks for driving this. This is a blocker to support Debezium CDC format
(FLIP-105). So big +1 from my side.

Regarding to emitting multiple records and checkpointing, I'm also in favor
of option#1: buffer all the records outside of the checkpoint lock.
I think most of the use cases will not buffer larger data than
it's deserialized byte[].

I have a minor suggestion on DeserializationSchema: could we have a default
implementation (maybe throw exception) for `T deserialize(byte[] message)`?
I think this will not break compatibility, and users don't have to
implement this deprecated interface if he/she wants to use the new
collector interface.
I think SinkFunction also did this in the same way: introduce a new invoke
method with Context parameter, and give the old invoke method an
empty implemention.

Best,
Jark

On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote:

> I would be in favor of buffering data outside of the checkpoint lock. In my
> experience, serialization is always the biggest performance killer in user
> code and I have a hard time believing in practice that anyone is going to
> buffer so many records that is causes real memory concerns.
>
> To add to Timo's point,
>
> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
>
> Seth
>
> [1]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> [2]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
>
>
> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> wrote:
>
> > Hi Dawid,
> >
> > thanks for this FLIP. This solves a lot of issues with the current
> > design for both the Flink contributors and users. +1 for this.
> >
> > Some minor suggestions from my side:
> > - How about finding something shorter for `InitializationContext`? Maybe
> > just `OpenContext`?
> > - While introducing default methods for existing interfaces, shall we
> > also create contexts for those methods? I see the following method in
> > your FLIP and wonder if we can reduce the number of parameters while
> > introducing a new method:
> >
> > deserialize(
> >              byte[] recordValue,
> >              String partitionKey,
> >              String seqNum,
> >              long approxArrivalTimestamp,
> >              String stream,
> >              String shardId,
> >              Collector<T> out)
> >
> > to:
> >
> > deserialize(
> >              byte[] recordValue,
> >              Context c,
> >              Collector<T> out)
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > > Hi devs,
> > >
> > > When working on improving the Table API/SQL connectors we faced a few
> > > shortcomings of the DeserializationSchema and SerializationSchema
> > > interfaces. Similar features were also mentioned by other users in the
> > > past. The shortcomings I would like to address with the FLIP include:
> > >
> > >   * Emitting 0 to m records from the deserialization schema with per
> > >     partition watermarks
> > >       o
> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> > >       o differentiate null value from no value
> > >       o support for Debezium CDC format
> > >         (
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > )
> > >
> > >   * A way to initialize the schema
> > >       o establish external connections
> > >       o generate code on startup
> > >       o no need for lazy initialization
> > >
> > >   * Access to metrics
> > >     [
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> > ]
> > >
> > > One important aspect I would like to hear your opinion on is how to
> > > support the Collector interface in Kafka source. Of course if we agree
> > > to add the Collector to the DeserializationSchema.
> > >
> > > The FLIP can be found here:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> > >
> > > Looking forward to your feedback.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> >
> >
>

Reply via email to