Thanks for publishing the FLIP!

On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org> wrote: 
>  1. How to sort/group keys? What representation of the key should we
>     use? Should we sort on the binary form or should we depend on
>     Comparators being available.

Initially, I suggested to Dawid (in private) to do the sorting/grouping by 
using the binary representation. Then my opinion switched and I thought we 
should use TypeComparator/Comparator because that's what the DataSet API uses. 
After talking to Stephan, I'm again encouraged in my opinion to use the binary 
representation because it means we can eventually get rid of the TypeComparator 
interface, which is a bit complicated, and because we don't need any good order 
in our sort, we only need the grouping.

This comes with some problems, though: we need to ensure that the 
TypeSerializer of the type we're sorting is stable/deterministic. Beam has 
infrastructure for this in the form of Coder.verifyDeterministic() [1] which we 
don't have right now and should add if we go down this path.

>  2. Where in the stack should we apply the sorting (this rather a
>     discussion about internals)

Here, I'm gravitating towards the third option of implementing it in the layer 
of the StreamTask, which probably means implementing a custom InputProcessor. I 
think it's best to do it in this layer because we would not mix concerns of 
different layers as we would if we implemented this as a custom StreamOperator. 
I think this solution is also best when it comes to multi-input operators.

>  3. How should we deal with custom implementations of StreamOperators

I think the cleanest solution would be to go through the complete operator 
lifecycle for every key, because then the watermark would not oscillate between 
-Inf and +Inf and we would not break the semantical guarantees that we gave to 
operators so far, in that the watermark is strictly monotonically increasing. 
However, I don't think this solution is feasible because it would come with too 
much overhead. We should solve this problem via documentation and maybe educate 
people to not query the current watermark or not rely on the watermark being 
monotonically increasing in operator implementations to allow the framework 
more freedoms in how user programs are executed.

Aljoscha

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184

Reply via email to