I doubt that any sorting algorithm would work with only knowing the keys are different but without information of which is greater.
Best, Kurt On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Ad. 1 > > Yes, you are right in principle. > > Let me though clarify my proposal a bit. The proposed sort-style > execution aims at a generic KeyedProcessFunction were all the > "aggregations" are actually performed in the user code. It tries to > improve the performance by actually removing the need to use RocksDB e.g.: > > private static final class Summer<K> > extends KeyedProcessFunction<K, Tuple2<K, Integer>, > Tuple2<K, Integer>> { > > .... > > @Override > public void processElement( > Tuple2<K, Integer> value, > Context ctx, > Collector<Tuple2<K, Integer>> out) throws Exception { > if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) { > ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); > timerRegistered.update(true); > } > Integer v = counter.value(); > Integer incomingValue = value.f1; > if (v != null) { > v += incomingValue; > } else { > v = incomingValue; > } > counter.update(v); > } > > .... > > } > > Therefore I don't think the first part of your reply with separating the > write and read workload applies here. We do not aim to create a > competing API with the Table API. We think operations such as joins or > analytical aggregations should be performed in Table API. > > As for the second part I agree it would be nice to fall back to the > sorting approach only if a certain threshold of memory in a State > Backend is used. This has some problems though. We would need a way to > estimate the size of the occupied memory to tell when the threshold is > reached. That is not easily doable by default e.g. in a > MemoryStateBackend, as we do not serialize the values in the state > backend by default. We would have to add that, but this would add the > overhead of the serialization. > > This proposal aims at the cases where we do have a large state that will > not fit into the memory and without the change users are forced to use > RocksDB. If the state fits in memory I agree it will be better to do > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I > think it is important to give users the choice to use one or the other > approach. We might discuss which approach should be the default for > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with > user configured state backend or sorting-based with a single key at a > time backend. Moreover we could think if we should let users choose the > sort vs hash "state backend" per operator. Would that suffice? > > Ad. 2 > > I still think we can just use the first X bytes of the serialized form > as the normalized key and fallback to comparing full keys on clashes. It > is because we are actually not interested in a logical order, but we > care only about the "grouping" aspect of the sorting. Therefore I think > its enough to compare only parts of the full key as the normalized key. > > Thanks again for the really nice and thorough feedback! > > Best, > > Dawid > > On 08/09/2020 14:47, Kurt Young wrote: > > Regarding #1, yes the state backend is definitely hash-based execution. > > However there are some differences between > > batch hash-based execution. The key difference is *random access & > > read/write mixed workload". For example, by using > > state backend in streaming execution, one have to mix the read and write > > operations and all of them are actually random > > access. But in a batch hash execution, we could divide the phases into > > write and read. For example, we can build the > > hash table first, with only write operations. And once the build is done, > > we can start to read and trigger the user codes. > > Take hash aggregation which blink planner implemented as an example, > during > > building phase, as long as the hash map > > could fit into memory, we will update the accumulators directly in the > hash > > map. And once we are running out of memory, > > we then fall back to sort based execution. It improves the performance a > > lot if the incoming data can be processed in > > memory. > > > > Regarding #2, IIUC you are actually describing a binary format of key, > not > > normalized key which is used in DataSet. I will > > take String for example. If we have lots of keys with length all greater > > than, let's say 20. In your proposal, you will encode > > the whole string in the prefix of your composed data ( <key> + > <timestamp> > > + <record> ). And when you compare > > records, you will actually compare the *whole* key of the record. For > > normalized key, it's fixed-length in this case, IIRC it will > > take 8 bytes to represent the string. And the sorter will store the > > normalized key and offset in a dedicated array. When doing > > the sorting, it only sorts this *small* array. If the normalized keys are > > different, you could immediately tell which is greater from > > normalized keys. You only have to compare the full keys if the normalized > > keys are equal and you know in this case the normalized > > key couldn't represent the full key. The reason why Dataset is doing this > > is it's super cache efficient by sorting the *small* array. > > The idea is borrowed from this paper [1]. Let me know if I missed or > > misunderstood anything. > > > > [1] https://dl.acm.org/doi/10.5555/615232.615237 (AlphaSort: a > > cache-sensitive parallel external sort) > > > > Best, > > Kurt > > > > > > On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <dwysakow...@apache.org> > > wrote: > > > >> Hey Kurt, > >> > >> Thank you for comments! > >> > >> Ad. 1 I might have missed something here, but as far as I see it is that > >> using the current execution stack with regular state backends (RocksDB > >> in particular if we want to have spilling capabilities) is equivalent to > >> hash-based execution. I can see a different spilling state backend > >> implementation in the future, but I think it is not batch specifc. Or am > >> I missing something? > >> > >> Ad. 2 Totally agree that normalized keys are important to the > >> performance. I think though TypeComparators are not a necessity to have > >> that. Actually this proposal is heading towards only ever performing > >> "normalized keys" comparison. I have not included in the proposal the > >> binary format which we will use for sorting (partially because I forgot, > >> and partially because I thought it was too much of an implementation > >> detail). Let me include it here though, as it might clear the situation > >> a bit here. > >> > >> In DataSet, at times we have KeySelectors which extract keys based on > >> field indices or names. This allows in certain situation to extract the > >> key from serialized records. Compared to DataSet, in DataStream, the key > >> is always described with a black-box KeySelector, or differently with a > >> function which extracts a key from a deserialized record. In turn there > >> is no way to create a comparator that could compare records by > >> extracting the key from a serialized record (neither with, nor without > >> key normalization). We suggest that the input for the sorter will be > >> > >> <key> + <timestamp> + <record> > >> > >> Without having the key prepended we would have to deserialize the record > >> for every key comparison. > >> > >> Therefore if we agree that we perform binary comparison for keys (which > >> are always prepended), it is actually equivalent to a DataSet with > >> TypeComparators that support key normalization. > >> > >> Let me know if that is clear, or I have missed something here. > >> > >> Best, > >> > >> Dawid > >> > >> On 08/09/2020 03:39, Kurt Young wrote: > >>> Hi Dawid, thanks for bringing this up, it's really exciting to see that > >>> batch execution is introduced in DataStream. From the flip, it seems > >>> we are sticking with sort based execution mode (at least for now), > which > >>> will sort the whole input data before any *keyed* operation is > >>> executed. I have two comments here: > >>> > >>> 1. Do we want to introduce hash-based execution in the future? Sort is > a > >>> safe choice but not the best in lots of cases. IIUC we only need > >>> to make sure that before the framework finishes dealing with one key, > the > >>> operator doesn't see any data belonging to other keys, thus > >>> hash-based execution would also do the trick. Oon tricky thing the > >>> framework might need to deal with is memory constraint and spilling > >>> in the hash map, but Flink also has some good knowledge about these > >> stuff. > >>> 2. Going back to sort-based execution and how to sort keys. From my > >>> experience, the performance of sorting would be one the most important > >>> things if we want to achieve good performance of batch execution. And > >>> normalized keys are actually the key of the performance of sorting. > >>> If we want to get rid of TypeComparator, I think we still need to find > a > >>> way to introduce this back. > >>> > >>> Best, > >>> Kurt > >>> > >>> > >>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <aljos...@apache.org> > >> wrote: > >>>> Yes, I think we can address the problem of indeterminacy in a separate > >>>> FLIP because we're already in it. > >>>> > >>>> Aljoscha > >>>> > >>>> On 07.09.20 17:00, Dawid Wysakowicz wrote: > >>>>> @Seth That's a very good point. I agree that RocksDB has the same > >>>>> problem. I think we can use the same approach for the sorted shuffles > >>>>> then. @Aljoscha I agree we should think about making it more > resilient, > >>>>> as I guess users might have problems already if they use keys with > >>>>> non-deterministic binary representation. How do you feel about > >>>>> addressing that separately purely to limit the scope of this FLIP? > >>>>> > >>>>> @Aljoscha I tend to agree with you that the best place to actually > >> place > >>>>> the sorting would be in the InputProcessor(s). If there are no more > >>>>> suggestions in respect to that issue. I'll put this proposal for > >> voting. > >>>>> @all Thank you for the feedback so far. I'd like to start a voting > >>>>> thread on the proposal tomorrow. Therefore I'd appreciate if you > >> comment > >>>>> before that, if you still have some outstanding ideas. > >>>>> > >>>>> Best, > >>>>> > >>>>> Dawid > >>>>> > >>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote: > >>>>>> Seth is right, I was just about to write that as well. There is a > >>>>>> problem, though, because some of our TypeSerializers are not > >>>>>> deterministic even though we use them as if they were. Beam excludes > >>>>>> the FloatCoder, for example, and the AvroCoder in certain cases. I'm > >>>>>> pretty sure there is also weirdness going on in our KryoSerializer. > >>>>>> > >>>>>> On 04.09.20 14:59, Seth Wiesman wrote: > >>>>>>> There is already an implicit assumption the TypeSerializer for keys > >> is > >>>>>>> stable/deterministic, RocksDB compares keys using their serialized > >> byte > >>>>>>> strings. I think this is a non-issue (or at least it's not changing > >> the > >>>>>>> status quo). > >>>>>>> > >>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther <twal...@apache.org> > >>>> wrote: > >>>>>>>> +1 for getting rid of the TypeComparator interface and rely on the > >>>>>>>> serialized representation for grouping. > >>>>>>>> > >>>>>>>> Adding a new type to DataStream API is quite difficult at the > moment > >>>>>>>> due > >>>>>>>> to too many components that are required: TypeInformation (tries > to > >>>>>>>> deal > >>>>>>>> with logical fields for TypeComparators), TypeSerializer (incl. > it's > >>>>>>>> snapshot interfaces), and TypeComparator (with many methods and > >>>>>>>> internals such normalized keys etc.). > >>>>>>>> > >>>>>>>> If necessary, we can add more simple comparison-related methods to > >> the > >>>>>>>> TypeSerializer interface itself in the future (like > >>>>>>>> TypeSerializer.isDeterministic). > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote: > >>>>>>>>> Thanks for publishing the FLIP! > >>>>>>>>> > >>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <dwysakow...@apache.org > > > >>>>>>>>> wrote: > >>>>>>>>>> 1. How to sort/group keys? What representation of the key > >>>>>>>>>> should we > >>>>>>>>>> use? Should we sort on the binary form or should we > depend > >> on > >>>>>>>>>> Comparators being available. > >>>>>>>>> Initially, I suggested to Dawid (in private) to do the > >>>>>>>>> sorting/grouping > >>>>>>>> by using the binary representation. Then my opinion switched and I > >>>>>>>> thought > >>>>>>>> we should use TypeComparator/Comparator because that's what the > >>>>>>>> DataSet API > >>>>>>>> uses. After talking to Stephan, I'm again encouraged in my opinion > >>>>>>>> to use > >>>>>>>> the binary representation because it means we can eventually get > rid > >>>>>>>> of the > >>>>>>>> TypeComparator interface, which is a bit complicated, and because > we > >>>>>>>> don't > >>>>>>>> need any good order in our sort, we only need the grouping. > >>>>>>>>> This comes with some problems, though: we need to ensure that the > >>>>>>>> TypeSerializer of the type we're sorting is stable/deterministic. > >>>>>>>> Beam has > >>>>>>>> infrastructure for this in the form of Coder.verifyDeterministic() > >> [1] > >>>>>>>> which we don't have right now and should add if we go down this > >> path. > >>>>>>>>>> 2. Where in the stack should we apply the sorting (this > >> rather a > >>>>>>>>>> discussion about internals) > >>>>>>>>> Here, I'm gravitating towards the third option of implementing it > >>>>>>>>> in the > >>>>>>>> layer of the StreamTask, which probably means implementing a > custom > >>>>>>>> InputProcessor. I think it's best to do it in this layer because > we > >>>>>>>> would > >>>>>>>> not mix concerns of different layers as we would if we implemented > >>>>>>>> this as > >>>>>>>> a custom StreamOperator. I think this solution is also best when > it > >>>>>>>> comes > >>>>>>>> to multi-input operators. > >>>>>>>>>> 3. How should we deal with custom implementations of > >>>>>>>>>> StreamOperators > >>>>>>>>> I think the cleanest solution would be to go through the complete > >>>>>>>> operator lifecycle for every key, because then the watermark would > >> not > >>>>>>>> oscillate between -Inf and +Inf and we would not break the > >> semantical > >>>>>>>> guarantees that we gave to operators so far, in that the watermark > >> is > >>>>>>>> strictly monotonically increasing. However, I don't think this > >>>>>>>> solution is > >>>>>>>> feasible because it would come with too much overhead. We should > >>>>>>>> solve this > >>>>>>>> problem via documentation and maybe educate people to not query > the > >>>>>>>> current > >>>>>>>> watermark or not rely on the watermark being monotonically > >>>>>>>> increasing in > >>>>>>>> operator implementations to allow the framework more freedoms in > how > >>>>>>>> user > >>>>>>>> programs are executed. > >>>>>>>>> Aljoscha > >>>>>>>>> > >>>>>>>>> [1] > >> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184 > >> > >