I doubt that any sorting algorithm would work with only knowing the keys
are different but without
information of which is greater.

Best,
Kurt


On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Ad. 1
>
> Yes, you are right in principle.
>
> Let me though clarify my proposal a bit. The proposed sort-style
> execution aims at a generic KeyedProcessFunction were all the
> "aggregations" are actually performed in the user code. It tries to
> improve the performance by actually removing the need to use RocksDB e.g.:
>
>     private static final class Summer<K>
>             extends KeyedProcessFunction<K, Tuple2<K, Integer>,
> Tuple2<K, Integer>> {
>
>         ....
>
>         @Override
>         public void processElement(
>                 Tuple2<K, Integer> value,
>                 Context ctx,
>                 Collector<Tuple2<K, Integer>> out) throws Exception {
>             if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
>                 ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>                 timerRegistered.update(true);
>             }
>             Integer v = counter.value();
>             Integer incomingValue = value.f1;
>             if (v != null) {
>                 v += incomingValue;
>             } else {
>                 v = incomingValue;
>             }
>             counter.update(v);
>         }
>
>         ....
>
>    }
>
> Therefore I don't think the first part of your reply with separating the
> write and read workload applies here. We do not aim to create a
> competing API with the Table API. We think operations such as joins or
> analytical aggregations should be performed in Table API.
>
> As for the second part I agree it would be nice to fall back to the
> sorting approach only if a certain threshold of memory in a State
> Backend is used. This has some problems though. We would need a way to
> estimate the size of the occupied memory to tell when the threshold is
> reached. That is not easily doable by default e.g. in a
> MemoryStateBackend, as we do not serialize the values in the state
> backend by default. We would have to add that, but this would add the
> overhead of the serialization.
>
> This proposal aims at the cases where we do have a large state that will
> not fit into the memory and without the change users are forced to use
> RocksDB. If the state fits in memory I agree it will be better to do
> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> think it is important to give users the choice to use one or the other
> approach. We might discuss which approach should be the default for
> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> user configured state backend or sorting-based with a single key at a
> time backend. Moreover we could think if we should let users choose the
> sort vs hash "state backend" per operator. Would that suffice?
>
> Ad. 2
>
> I still think we can just use the first X bytes of the serialized form
> as the normalized key and fallback to comparing full keys on clashes. It
> is because we are actually not interested in a logical order, but we
> care only about the "grouping" aspect of the sorting. Therefore I think
> its enough to compare only parts of the full key as the normalized key.
>
> Thanks again for the really nice and thorough feedback!
>
> Best,
>
> Dawid
>
> On 08/09/2020 14:47, Kurt Young wrote:
> > Regarding #1, yes the state backend is definitely hash-based execution.
> > However there are some differences between
> > batch hash-based execution. The key difference is *random access &
> > read/write mixed workload". For example, by using
> > state backend in streaming execution, one have to mix the read and write
> > operations and all of them are actually random
> > access. But in a batch hash execution, we could divide the phases into
> > write and read. For example, we can build the
> > hash table first, with only write operations. And once the build is done,
> > we can start to read and trigger the user codes.
> > Take hash aggregation which blink planner implemented as an example,
> during
> > building phase, as long as the hash map
> > could fit into memory, we will update the accumulators directly in the
> hash
> > map. And once we are running out of memory,
> > we then fall back to sort based execution. It improves the performance a
> > lot if the incoming data can be processed in
> > memory.
> >
> > Regarding #2, IIUC you are actually describing a binary format of key,
> not
> > normalized key which is used in DataSet. I will
> > take String for example. If we have lots of keys with length all greater
> > than, let's say 20. In your proposal, you will encode
> > the whole string in the prefix of your composed data ( <key> +
> <timestamp>
> > + <record> ). And when you compare
> > records, you will actually compare the *whole* key of the record. For
> > normalized key, it's fixed-length in this case, IIRC it will
> > take 8 bytes to represent the string. And the sorter will store the
> > normalized key and offset in a dedicated array. When doing
> > the sorting, it only sorts this *small* array. If the normalized keys are
> > different, you could immediately tell which is greater from
> > normalized keys. You only have to compare the full keys if the normalized
> > keys are equal and you know in this case the normalized
> > key couldn't represent the full key. The reason why Dataset is doing this
> > is it's super cache efficient by sorting the *small* array.
> > The idea is borrowed from this paper [1]. Let me know if I missed or
> > misunderstood anything.
> >
> > [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a
> > cache-sensitive parallel external sort)
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dwysakow...@apache.org>
> > wrote:
> >
> >> Hey Kurt,
> >>
> >> Thank you for comments!
> >>
> >> Ad. 1 I might have missed something here, but as far as I see it is that
> >> using the current execution stack with regular state backends (RocksDB
> >> in particular if we want to have spilling capabilities) is equivalent to
> >> hash-based execution. I can see a different spilling state backend
> >> implementation in the future, but I think it is not batch specifc. Or am
> >> I missing something?
> >>
> >> Ad. 2 Totally agree that normalized keys are important to the
> >> performance. I think though TypeComparators are not a necessity to have
> >> that. Actually  this proposal is heading towards only ever performing
> >> "normalized keys" comparison. I have not included in the proposal the
> >> binary format which we will use for sorting (partially because I forgot,
> >> and partially because I thought it was too much of an implementation
> >> detail). Let me include it here though, as it might clear the situation
> >> a bit here.
> >>
> >> In DataSet, at times we have KeySelectors which extract keys based on
> >> field indices or names. This allows in certain situation to extract the
> >> key from serialized records. Compared to DataSet, in DataStream, the key
> >> is always described with a black-box KeySelector, or differently with a
> >> function which extracts a key from a deserialized record.  In turn there
> >> is no way to create a comparator that could compare records by
> >> extracting the key from a serialized record (neither with, nor without
> >> key normalization). We suggest that the input for the sorter will be
> >>
> >> <key> + <timestamp> + <record>
> >>
> >> Without having the key prepended we would have to deserialize the record
> >> for every key comparison.
> >>
> >> Therefore if we agree that we perform binary comparison for keys (which
> >> are always prepended), it is actually equivalent to a DataSet with
> >> TypeComparators that support key normalization.
> >>
> >> Let me know if that is clear, or I have missed something here.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 08/09/2020 03:39, Kurt Young wrote:
> >>> Hi Dawid, thanks for bringing this up, it's really exciting to see that
> >>> batch execution is introduced in DataStream. From the flip, it seems
> >>> we are sticking with sort based execution mode (at least for now),
> which
> >>> will sort the whole input data before any *keyed* operation is
> >>> executed. I have two comments here:
> >>>
> >>> 1. Do we want to introduce hash-based execution in the future? Sort is
> a
> >>> safe choice but not the best in lots of cases. IIUC we only need
> >>> to make sure that before the framework finishes dealing with one key,
> the
> >>> operator doesn't see any data belonging to other keys, thus
> >>> hash-based execution would also do the trick. Oon tricky thing the
> >>> framework might need to deal with is memory constraint and spilling
> >>> in the hash map, but Flink also has some good knowledge about these
> >> stuff.
> >>> 2. Going back to sort-based execution and how to sort keys. From my
> >>> experience, the performance of sorting would be one the most important
> >>> things if we want to achieve good performance of batch execution. And
> >>> normalized keys are actually the key of the performance of sorting.
> >>> If we want to get rid of TypeComparator, I think we still need to find
> a
> >>> way to introduce this back.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <aljos...@apache.org>
> >> wrote:
> >>>> Yes, I think we can address the problem of indeterminacy in a separate
> >>>> FLIP because we're already in it.
> >>>>
> >>>> Aljoscha
> >>>>
> >>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> >>>>> @Seth That's a very good point. I agree that RocksDB has the same
> >>>>> problem. I think we can use the same approach for the sorted shuffles
> >>>>> then. @Aljoscha I agree we should think about making it more
> resilient,
> >>>>> as I guess users might have problems already if they use keys with
> >>>>> non-deterministic binary representation. How do you feel about
> >>>>> addressing that separately purely to limit the scope of this FLIP?
> >>>>>
> >>>>> @Aljoscha I tend to agree with you that the best place to actually
> >> place
> >>>>> the sorting would be in the InputProcessor(s). If there are no more
> >>>>> suggestions in respect to that issue. I'll put this proposal for
> >> voting.
> >>>>> @all Thank you for the feedback so far. I'd like to start a voting
> >>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you
> >> comment
> >>>>> before that, if you still have some outstanding ideas.
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Dawid
> >>>>>
> >>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >>>>>> Seth is right, I was just about to write that as well. There is a
> >>>>>> problem, though, because some of our TypeSerializers are not
> >>>>>> deterministic even though we use them as if they were. Beam excludes
> >>>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
> >>>>>> pretty sure there is also weirdness going on in our KryoSerializer.
> >>>>>>
> >>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
> >>>>>>> There is already an implicit assumption the TypeSerializer for keys
> >> is
> >>>>>>> stable/deterministic, RocksDB compares keys using their serialized
> >> byte
> >>>>>>> strings. I think this is a non-issue (or at least it's not changing
> >> the
> >>>>>>> status quo).
> >>>>>>>
> >>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org>
> >>>> wrote:
> >>>>>>>> +1 for getting rid of the TypeComparator interface and rely on the
> >>>>>>>> serialized representation for grouping.
> >>>>>>>>
> >>>>>>>> Adding a new type to DataStream API is quite difficult at the
> moment
> >>>>>>>> due
> >>>>>>>> to too many components that are required: TypeInformation (tries
> to
> >>>>>>>> deal
> >>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl.
> it's
> >>>>>>>> snapshot interfaces), and TypeComparator (with many methods and
> >>>>>>>> internals such normalized keys etc.).
> >>>>>>>>
> >>>>>>>> If necessary, we can add more simple comparison-related methods to
> >> the
> >>>>>>>> TypeSerializer interface itself in the future (like
> >>>>>>>> TypeSerializer.isDeterministic).
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
> >>>>>>>>> Thanks for publishing the FLIP!
> >>>>>>>>>
> >>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org
> >
> >>>>>>>>> wrote:
> >>>>>>>>>>     1. How to sort/group keys? What representation of the key
> >>>>>>>>>> should we
> >>>>>>>>>>        use? Should we sort on the binary form or should we
> depend
> >> on
> >>>>>>>>>>        Comparators being available.
> >>>>>>>>> Initially, I suggested to Dawid (in private) to do the
> >>>>>>>>> sorting/grouping
> >>>>>>>> by using the binary representation. Then my opinion switched and I
> >>>>>>>> thought
> >>>>>>>> we should use TypeComparator/Comparator because that's what the
> >>>>>>>> DataSet API
> >>>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion
> >>>>>>>> to use
> >>>>>>>> the binary representation because it means we can eventually get
> rid
> >>>>>>>> of the
> >>>>>>>> TypeComparator interface, which is a bit complicated, and because
> we
> >>>>>>>> don't
> >>>>>>>> need any good order in our sort, we only need the grouping.
> >>>>>>>>> This comes with some problems, though: we need to ensure that the
> >>>>>>>> TypeSerializer of the type we're sorting is stable/deterministic.
> >>>>>>>> Beam has
> >>>>>>>> infrastructure for this in the form of Coder.verifyDeterministic()
> >> [1]
> >>>>>>>> which we don't have right now and should add if we go down this
> >> path.
> >>>>>>>>>>     2. Where in the stack should we apply the sorting (this
> >> rather a
> >>>>>>>>>>        discussion about internals)
> >>>>>>>>> Here, I'm gravitating towards the third option of implementing it
> >>>>>>>>> in the
> >>>>>>>> layer of the StreamTask, which probably means implementing a
> custom
> >>>>>>>> InputProcessor. I think it's best to do it in this layer because
> we
> >>>>>>>> would
> >>>>>>>> not mix concerns of different layers as we would if we implemented
> >>>>>>>> this as
> >>>>>>>> a custom StreamOperator. I think this solution is also best when
> it
> >>>>>>>> comes
> >>>>>>>> to multi-input operators.
> >>>>>>>>>>     3. How should we deal with custom implementations of
> >>>>>>>>>> StreamOperators
> >>>>>>>>> I think the cleanest solution would be to go through the complete
> >>>>>>>> operator lifecycle for every key, because then the watermark would
> >> not
> >>>>>>>> oscillate between -Inf and +Inf and we would not break the
> >> semantical
> >>>>>>>> guarantees that we gave to operators so far, in that the watermark
> >> is
> >>>>>>>> strictly monotonically increasing. However, I don't think this
> >>>>>>>> solution is
> >>>>>>>> feasible because it would come with too much overhead. We should
> >>>>>>>> solve this
> >>>>>>>> problem via documentation and maybe educate people to not query
> the
> >>>>>>>> current
> >>>>>>>> watermark or not rely on the watermark being monotonically
> >>>>>>>> increasing in
> >>>>>>>> operator implementations to allow the framework more freedoms in
> how
> >>>>>>>> user
> >>>>>>>> programs are executed.
> >>>>>>>>> Aljoscha
> >>>>>>>>>
> >>>>>>>>> [1]
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
> >>
>
>

Reply via email to