I think Kurts concerns/comments are very valid and we need to implement such things in the future. However, I also think that we need to get started somewhere and I think what's proposed in this FLIP is a good starting point that we can build on. So we should not get paralyzed by thinking too far ahead into the future. Does that make sense?

Best,
Aljoscha

On 08.09.20 16:59, Dawid Wysakowicz wrote:
Ad. 1

Yes, you are right in principle.

Let me though clarify my proposal a bit. The proposed sort-style
execution aims at a generic KeyedProcessFunction were all the
"aggregations" are actually performed in the user code. It tries to
improve the performance by actually removing the need to use RocksDB e.g.:

     private static final class Summer<K>
             extends KeyedProcessFunction<K, Tuple2<K, Integer>,
Tuple2<K, Integer>> {

         ....

         @Override
         public void processElement(
                 Tuple2<K, Integer> value,
                 Context ctx,
                 Collector<Tuple2<K, Integer>> out) throws Exception {
             if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
                 ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
                 timerRegistered.update(true);
             }
             Integer v = counter.value();
             Integer incomingValue = value.f1;
             if (v != null) {
                 v += incomingValue;
             } else {
                 v = incomingValue;
             }
             counter.update(v);
         }

         ....

    }

Therefore I don't think the first part of your reply with separating the
write and read workload applies here. We do not aim to create a
competing API with the Table API. We think operations such as joins or
analytical aggregations should be performed in Table API.

As for the second part I agree it would be nice to fall back to the
sorting approach only if a certain threshold of memory in a State
Backend is used. This has some problems though. We would need a way to
estimate the size of the occupied memory to tell when the threshold is
reached. That is not easily doable by default e.g. in a
MemoryStateBackend, as we do not serialize the values in the state
backend by default. We would have to add that, but this would add the
overhead of the serialization.

This proposal aims at the cases where we do have a large state that will
not fit into the memory and without the change users are forced to use
RocksDB. If the state fits in memory I agree it will be better to do
hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
think it is important to give users the choice to use one or the other
approach. We might discuss which approach should be the default for
RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
user configured state backend or sorting-based with a single key at a
time backend. Moreover we could think if we should let users choose the
sort vs hash "state backend" per operator. Would that suffice?

Ad. 2

I still think we can just use the first X bytes of the serialized form
as the normalized key and fallback to comparing full keys on clashes. It
is because we are actually not interested in a logical order, but we
care only about the "grouping" aspect of the sorting. Therefore I think
its enough to compare only parts of the full key as the normalized key.

Thanks again for the really nice and thorough feedback!

Best,

Dawid

On 08/09/2020 14:47, Kurt Young wrote:
Regarding #1, yes the state backend is definitely hash-based execution.
However there are some differences between
batch hash-based execution. The key difference is *random access &
read/write mixed workload". For example, by using
state backend in streaming execution, one have to mix the read and write
operations and all of them are actually random
access. But in a batch hash execution, we could divide the phases into
write and read. For example, we can build the
hash table first, with only write operations. And once the build is done,
we can start to read and trigger the user codes.
Take hash aggregation which blink planner implemented as an example, during
building phase, as long as the hash map
could fit into memory, we will update the accumulators directly in the hash
map. And once we are running out of memory,
we then fall back to sort based execution. It improves the performance a
lot if the incoming data can be processed in
memory.

Regarding #2, IIUC you are actually describing a binary format of key, not
normalized key which is used in DataSet. I will
take String for example. If we have lots of keys with length all greater
than, let's say 20. In your proposal, you will encode
the whole string in the prefix of your composed data ( <key> + <timestamp>
+ <record> ). And when you compare
records, you will actually compare the *whole* key of the record. For
normalized key, it's fixed-length in this case, IIRC it will
take 8 bytes to represent the string. And the sorter will store the
normalized key and offset in a dedicated array. When doing
the sorting, it only sorts this *small* array. If the normalized keys are
different, you could immediately tell which is greater from
normalized keys. You only have to compare the full keys if the normalized
keys are equal and you know in this case the normalized
key couldn't represent the full key. The reason why Dataset is doing this
is it's super cache efficient by sorting the *small* array.
The idea is borrowed from this paper [1]. Let me know if I missed or
misunderstood anything.

[1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
cache-sensitive parallel external sort)

Best,
Kurt


On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

Hey Kurt,

Thank you for comments!

Ad. 1 I might have missed something here, but as far as I see it is that
using the current execution stack with regular state backends (RocksDB
in particular if we want to have spilling capabilities) is equivalent to
hash-based execution. I can see a different spilling state backend
implementation in the future, but I think it is not batch specifc. Or am
I missing something?

Ad. 2 Totally agree that normalized keys are important to the
performance. I think though TypeComparators are not a necessity to have
that. Actually  this proposal is heading towards only ever performing
"normalized keys" comparison. I have not included in the proposal the
binary format which we will use for sorting (partially because I forgot,
and partially because I thought it was too much of an implementation
detail). Let me include it here though, as it might clear the situation
a bit here.

In DataSet, at times we have KeySelectors which extract keys based on
field indices or names. This allows in certain situation to extract the
key from serialized records. Compared to DataSet, in DataStream, the key
is always described with a black-box KeySelector, or differently with a
function which extracts a key from a deserialized record.  In turn there
is no way to create a comparator that could compare records by
extracting the key from a serialized record (neither with, nor without
key normalization). We suggest that the input for the sorter will be

<key> + <timestamp> + <record>

Without having the key prepended we would have to deserialize the record
for every key comparison.

Therefore if we agree that we perform binary comparison for keys (which
are always prepended), it is actually equivalent to a DataSet with
TypeComparators that support key normalization.

Let me know if that is clear, or I have missed something here.

Best,

Dawid

On 08/09/2020 03:39, Kurt Young wrote:
Hi Dawid, thanks for bringing this up, it's really exciting to see that
batch execution is introduced in DataStream. From the flip, it seems
we are sticking with sort based execution mode (at least for now), which
will sort the whole input data before any *keyed* operation is
executed. I have two comments here:

1. Do we want to introduce hash-based execution in the future? Sort is a
safe choice but not the best in lots of cases. IIUC we only need
to make sure that before the framework finishes dealing with one key, the
operator doesn't see any data belonging to other keys, thus
hash-based execution would also do the trick. Oon tricky thing the
framework might need to deal with is memory constraint and spilling
in the hash map, but Flink also has some good knowledge about these
stuff.
2. Going back to sort-based execution and how to sort keys. From my
experience, the performance of sorting would be one the most important
things if we want to achieve good performance of batch execution. And
normalized keys are actually the key of the performance of sorting.
If we want to get rid of TypeComparator, I think we still need to find a
way to introduce this back.

Best,
Kurt


On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <aljos...@apache.org>
wrote:
Yes, I think we can address the problem of indeterminacy in a separate
FLIP because we're already in it.

Aljoscha

On 07.09.20 17:00, Dawid Wysakowicz wrote:
@Seth That's a very good point. I agree that RocksDB has the same
problem. I think we can use the same approach for the sorted shuffles
then. @Aljoscha I agree we should think about making it more resilient,
as I guess users might have problems already if they use keys with
non-deterministic binary representation. How do you feel about
addressing that separately purely to limit the scope of this FLIP?

@Aljoscha I tend to agree with you that the best place to actually
place
the sorting would be in the InputProcessor(s). If there are no more
suggestions in respect to that issue. I'll put this proposal for
voting.
@all Thank you for the feedback so far. I'd like to start a voting
thread on the proposal tomorrow. Therefore I'd appreciate if you
comment
before that, if you still have some outstanding ideas.

Best,

Dawid

On 04/09/2020 17:13, Aljoscha Krettek wrote:
Seth is right, I was just about to write that as well. There is a
problem, though, because some of our TypeSerializers are not
deterministic even though we use them as if they were. Beam excludes
the FloatCoder, for example, and the AvroCoder in certain cases. I'm
pretty sure there is also weirdness going on in our KryoSerializer.

On 04.09.20 14:59, Seth Wiesman wrote:
There is already an implicit assumption the TypeSerializer for keys
is
stable/deterministic, RocksDB compares keys using their serialized
byte
strings. I think this is a non-issue (or at least it's not changing
the
status quo).

On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org>
wrote:
+1 for getting rid of the TypeComparator interface and rely on the
serialized representation for grouping.

Adding a new type to DataStream API is quite difficult at the moment
due
to too many components that are required: TypeInformation (tries to
deal
with logical fields for TypeComparators), TypeSerializer (incl. it's
snapshot interfaces), and TypeComparator (with many methods and
internals such normalized keys etc.).

If necessary, we can add more simple comparison-related methods to
the
TypeSerializer interface itself in the future (like
TypeSerializer.isDeterministic).

Regards,
Timo


On 04.09.20 11:48, Aljoscha Krettek wrote:
Thanks for publishing the FLIP!

On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:
     1. How to sort/group keys? What representation of the key
should we
        use? Should we sort on the binary form or should we depend
on
        Comparators being available.
Initially, I suggested to Dawid (in private) to do the
sorting/grouping
by using the binary representation. Then my opinion switched and I
thought
we should use TypeComparator/Comparator because that's what the
DataSet API
uses. After talking to Stephan, I'm again encouraged in my opinion
to use
the binary representation because it means we can eventually get rid
of the
TypeComparator interface, which is a bit complicated, and because we
don't
need any good order in our sort, we only need the grouping.
This comes with some problems, though: we need to ensure that the
TypeSerializer of the type we're sorting is stable/deterministic.
Beam has
infrastructure for this in the form of Coder.verifyDeterministic()
[1]
which we don't have right now and should add if we go down this
path.
     2. Where in the stack should we apply the sorting (this
rather a
        discussion about internals)
Here, I'm gravitating towards the third option of implementing it
in the
layer of the StreamTask, which probably means implementing a custom
InputProcessor. I think it's best to do it in this layer because we
would
not mix concerns of different layers as we would if we implemented
this as
a custom StreamOperator. I think this solution is also best when it
comes
to multi-input operators.
     3. How should we deal with custom implementations of
StreamOperators
I think the cleanest solution would be to go through the complete
operator lifecycle for every key, because then the watermark would
not
oscillate between -Inf and +Inf and we would not break the
semantical
guarantees that we gave to operators so far, in that the watermark
is
strictly monotonically increasing. However, I don't think this
solution is
feasible because it would come with too much overhead. We should
solve this
problem via documentation and maybe educate people to not query the
current
watermark or not rely on the watermark being monotonically
increasing in
operator implementations to allow the framework more freedoms in how
user
programs are executed.
Aljoscha

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184



Reply via email to