Yes, I think we can address the problem of indeterminacy in a separate FLIP because we're already in it.

Aljoscha

On 07.09.20 17:00, Dawid Wysakowicz wrote:
@Seth That's a very good point. I agree that RocksDB has the same
problem. I think we can use the same approach for the sorted shuffles
then. @Aljoscha I agree we should think about making it more resilient,
as I guess users might have problems already if they use keys with
non-deterministic binary representation. How do you feel about
addressing that separately purely to limit the scope of this FLIP?

@Aljoscha I tend to agree with you that the best place to actually place
the sorting would be in the InputProcessor(s). If there are no more
suggestions in respect to that issue. I'll put this proposal for voting.

@all Thank you for the feedback so far. I'd like to start a voting
thread on the proposal tomorrow. Therefore I'd appreciate if you comment
before that, if you still have some outstanding ideas.

Best,

Dawid

On 04/09/2020 17:13, Aljoscha Krettek wrote:
Seth is right, I was just about to write that as well. There is a
problem, though, because some of our TypeSerializers are not
deterministic even though we use them as if they were. Beam excludes
the FloatCoder, for example, and the AvroCoder in certain cases. I'm
pretty sure there is also weirdness going on in our KryoSerializer.

On 04.09.20 14:59, Seth Wiesman wrote:
There is already an implicit assumption the TypeSerializer for keys is
stable/deterministic, RocksDB compares keys using their serialized byte
strings. I think this is a non-issue (or at least it's not changing the
status quo).

On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org> wrote:

+1 for getting rid of the TypeComparator interface and rely on the
serialized representation for grouping.

Adding a new type to DataStream API is quite difficult at the moment
due
to too many components that are required: TypeInformation (tries to
deal
with logical fields for TypeComparators), TypeSerializer (incl. it's
snapshot interfaces), and TypeComparator (with many methods and
internals such normalized keys etc.).

If necessary, we can add more simple comparison-related methods to the
TypeSerializer interface itself in the future (like
TypeSerializer.isDeterministic).

Regards,
Timo


On 04.09.20 11:48, Aljoscha Krettek wrote:
Thanks for publishing the FLIP!

On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:
    1. How to sort/group keys? What representation of the key
should we
       use? Should we sort on the binary form or should we depend on
       Comparators being available.

Initially, I suggested to Dawid (in private) to do the
sorting/grouping
by using the binary representation. Then my opinion switched and I
thought
we should use TypeComparator/Comparator because that's what the
DataSet API
uses. After talking to Stephan, I'm again encouraged in my opinion
to use
the binary representation because it means we can eventually get rid
of the
TypeComparator interface, which is a bit complicated, and because we
don't
need any good order in our sort, we only need the grouping.

This comes with some problems, though: we need to ensure that the
TypeSerializer of the type we're sorting is stable/deterministic.
Beam has
infrastructure for this in the form of Coder.verifyDeterministic() [1]
which we don't have right now and should add if we go down this path.

    2. Where in the stack should we apply the sorting (this rather a
       discussion about internals)

Here, I'm gravitating towards the third option of implementing it
in the
layer of the StreamTask, which probably means implementing a custom
InputProcessor. I think it's best to do it in this layer because we
would
not mix concerns of different layers as we would if we implemented
this as
a custom StreamOperator. I think this solution is also best when it
comes
to multi-input operators.

    3. How should we deal with custom implementations of
StreamOperators

I think the cleanest solution would be to go through the complete
operator lifecycle for every key, because then the watermark would not
oscillate between -Inf and +Inf and we would not break the semantical
guarantees that we gave to operators so far, in that the watermark is
strictly monotonically increasing. However, I don't think this
solution is
feasible because it would come with too much overhead. We should
solve this
problem via documentation and maybe educate people to not query the
current
watermark or not rely on the watermark being monotonically
increasing in
operator implementations to allow the framework more freedoms in how
user
programs are executed.

Aljoscha

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184








Reply via email to