Hi Dawid, Thanks for driving this. This is a blocker to support Debezium CDC format (FLIP-105). So big +1 from my side.
Regarding to emitting multiple records and checkpointing, I'm also in favor of option#1: buffer all the records outside of the checkpoint lock. I think most of the use cases will not buffer larger data than it's deserialized byte[]. I have a minor suggestion on DeserializationSchema: could we have a default implementation (maybe throw exception) for `T deserialize(byte[] message)`? I think this will not break compatibility, and users don't have to implement this deprecated interface if he/she wants to use the new collector interface. I think SinkFunction also did this in the same way: introduce a new invoke method with Context parameter, and give the old invoke method an empty implemention. Best, Jark On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote: > I would be in favor of buffering data outside of the checkpoint lock. In my > experience, serialization is always the biggest performance killer in user > code and I have a hard time believing in practice that anyone is going to > buffer so many records that is causes real memory concerns. > > To add to Timo's point, > > Statefun actually did that on its Kinesis ser/de interfaces[1,2]. > > Seth > > [1] > > https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java > [2] > > https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java > > > On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> wrote: > > > Hi Dawid, > > > > thanks for this FLIP. This solves a lot of issues with the current > > design for both the Flink contributors and users. +1 for this. > > > > Some minor suggestions from my side: > > - How about finding something shorter for `InitializationContext`? Maybe > > just `OpenContext`? > > - While introducing default methods for existing interfaces, shall we > > also create contexts for those methods? I see the following method in > > your FLIP and wonder if we can reduce the number of parameters while > > introducing a new method: > > > > deserialize( > > byte[] recordValue, > > String partitionKey, > > String seqNum, > > long approxArrivalTimestamp, > > String stream, > > String shardId, > > Collector<T> out) > > > > to: > > > > deserialize( > > byte[] recordValue, > > Context c, > > Collector<T> out) > > > > What do you think? > > > > Regards, > > Timo > > > > > > > > On 06.04.20 11:08, Dawid Wysakowicz wrote: > > > Hi devs, > > > > > > When working on improving the Table API/SQL connectors we faced a few > > > shortcomings of the DeserializationSchema and SerializationSchema > > > interfaces. Similar features were also mentioned by other users in the > > > past. The shortcomings I would like to address with the FLIP include: > > > > > > * Emitting 0 to m records from the deserialization schema with per > > > partition watermarks > > > o > https://github.com/apache/flink/pull/3314#issuecomment-376237266 > > > o differentiate null value from no value > > > o support for Debezium CDC format > > > ( > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL > > ) > > > > > > * A way to initialize the schema > > > o establish external connections > > > o generate code on startup > > > o no need for lazy initialization > > > > > > * Access to metrics > > > [ > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329 > > ] > > > > > > One important aspect I would like to hear your opinion on is how to > > > support the Collector interface in Kafka source. Of course if we agree > > > to add the Collector to the DeserializationSchema. > > > > > > The FLIP can be found here: > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode > > > > > > Looking forward to your feedback. > > > > > > Best, > > > > > > Dawid > > > > > > > >