Hi Stephan,

I fully agree with what you said. Also as far as I can tell what was
suggested in the FLIP-124 does not contradict with what you are saying.
Let me clarify it a bit if it is not clear in the document.

Current implementations of Kafka and Kinesis do the deserialization
outside of the checkpoint lock in threads separate from the main
processing thread already. The approach described as option 1, which had
the most supporters is to keep that behavior. The way I would like to
support emitting multiple results in this setup is to let the
DeserializationSchema deserialize records into a list (via collector)
that will be emitted atomically all at once.

Currently the behavior can be modelled as:

T record = deserializationSchema.deserialize(...)
synchronized(checkpointLock) {
   sourceContext.collect(record)
   updateSourceState(...)
}

and I was suggesting to change it to:

Collector out = new Collector();
deserializationSchema.deserialize(..., out);
List<T> deserializedRecords = out.getRecords();
synchronized(checkpointLock) {
   for (T record: deserializedRecords) {
        sourceContext.collect(record)
   }
   updateSourceState(...)

}

I think that is aligned with your comment to Seth's comment that the
"batch" of records originating from a source record is atomically emitted.

Best,

Dawid



On 23/04/2020 14:55, Stephan Ewen wrote:
> Hi!
>
> Sorry for being a bit late to the party.
>
> One very important thing to consider for "serialization under
> checkpoint lock or not" is:
>   - If you do it under checkpoint lock, things are automatically
> correct. Checkpoint barriers go between original records that
> correspond to offsets in the source.
>   - If you deserialize outside the checkpoint lock, then you read a
> record from the source but only partially emit it. In that case you
> need to store the difference (not emitted part) in the checkpoint.
>
> ==> I would advise against trying to emit partial records, i.e. doing
> things outside the checkpoint lock. FLIP-27 will by default also not
> do partial emission of unnested events. Also, it is questionable
> whether optimizing this in the source makes sense when no other
> operator supports that (flatMap, etc.).
>
> Regarding Seth's comment about performance:
>   - For that it does probably makes not so much difference whether
> this is under lock or not, but more whether this can be pushed to
> another thread (source's I/O thread), so that it does not add load to
> the main task processing thread.
>
> ==> This means that the I/O thread deserialized that "batch" that it
> hands over.
> ==> Still, it is important that all records coming from one original
> source record are emitted atomically, otherwise we have the same issue
> as above.
>
> Best,
> Stephan
>
>
> On Tue, Apr 14, 2020 at 10:35 AM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hi Xiaogang,
>
>     I very much agree with Jark's and Aljoscha's responses.
>
>
>     On 10/04/2020 17:35, Jark Wu wrote:
>     > Hi Xiaogang,
>     >
>     > I think this proposal doesn't conflict with your use case, you
>     can still
>     > chain a ProcessFunction after a source which emits raw data.
>     > But I'm not in favor of chaining ProcessFunction after source,
>     and we
>     > should avoid that, because:
>     >
>     > 1) For correctness, it is necessary to perform the watermark
>     generation as
>     > early as possible in order to be close to the actual data
>     >  generation within a source's data partition. This is also the
>     purpose of
>     > per-partition watermark and event-time alignment.
>     >  Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this
>     effort.
>     > Deseriazing records and generating watermark in chained
>     >  ProcessFunction makes it difficult to do per-partition
>     watermark in the
>     > future.
>     > 2) In Flink SQL, a source should emit the deserialized row
>     instead of raw
>     > data. Otherwise, users have to define raw byte[] as the
>     >  single column of the defined table, and parse them in queries,
>     which is
>     > very inconvenient.
>     >
>     > Best,
>     > Jark
>     >
>     > On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang
>     <shixiaoga...@gmail.com <mailto:shixiaoga...@gmail.com>> wrote:
>     >
>     >> Hi,
>     >>
>     >> I don't think the proposal is a good solution to the problems.
>     I am in
>     >> favour of using a ProcessFunction chained to the source/sink
>     function to
>     >> serialize/deserialize the records, instead of embedding
>     (de)serialization
>     >> schema in source/sink function.
>     >>
>     >> Message packing is heavily used in our production environment
>     to allow
>     >> compression and improve throughput. As buffered messages have to be
>     >> delivered when the time exceeds the limit, timers are also
>     required in our
>     >> cases. I think it's also a common need for other users.
>     >>
>     >> In the this proposal, with more components added into the
>     context, in the
>     >> end we will find the serialization/deserialization schema is
>     just another
>     >> wrapper of ProcessFunction.
>     >>
>     >> Regards,
>     >> Xiaogang
>     >>
>     >> Aljoscha Krettek <aljos...@apache.org
>     <mailto:aljos...@apache.org>> 于2020年4月7日周二 下午6:34写道:
>     >>
>     >>> On 07.04.20 08:45, Dawid Wysakowicz wrote:
>     >>>
>     >>>> @Jark I was aware of the implementation of SinkFunction, but
>     it was a
>     >>>> conscious choice to not do it that way.
>     >>>>
>     >>>> Personally I am against giving a default implementation to
>     both the new
>     >>>> and old methods. This results in an interface that by default
>     does
>     >>>> nothing or notifies the user only in the runtime, that he/she
>     has not
>     >>>> implemented a method of the interface, which does not sound
>     like a good
>     >>>> practice to me. Moreover I believe the method without a
>     Collector will
>     >>>> still be the preferred method by many users. Plus it communicates
>     >>>> explicitly what is the minimal functionality required by the
>     interface.
>     >>>> Nevertheless I am happy to hear other opinions.
>     >>> Dawid and I discussed this before. I did the extension of the
>     >>> SinkFunction but by now I think it's better to do it this way,
>     because
>     >>> otherwise you can implement the interface without implementing any
>     >> methods.
>     >>>> @all I also prefer the buffering approach. Let's wait a day
>     or two more
>     >>>> to see if others think differently.
>     >>> I'm also in favour of buffering outside the lock.
>     >>>
>     >>> Also, +1 to this FLIP.
>     >>>
>     >>> Best,
>     >>> Aljoscha
>     >>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to