> Ok, got your point now. I agree that it makes more sense to
> make StateBackend return a contract instead of a particular
> implementation. How about we name the new interface as
> `CheckpointableKeyedStateBackend`? We could make
> `BoundedStreamStateBackend` implement
> `CheckpointableKeyedStateBackend` but without checkpoint related
> operations yet, whereas reserving the possibility that the bounded
> stream also supports checkpoint in future. What do you think?
Sounds good to me. Will update the FLIP with the new name.

On 18/09/2020 15:31, Yu Li wrote:
> /bq. The problem is that I could not use this "state backend" in a
> StreamOperator./
> Ok, got your point now. I agree that it makes more sense to
> make StateBackend return a contract instead of a particular
> implementation. How about we name the new interface as
> `CheckpointableKeyedStateBackend`? We could make
> `BoundedStreamStateBackend` implement
> `CheckpointableKeyedStateBackend` but without checkpoint related
> operations yet, whereas reserving the possibility that the bounded
> stream also supports checkpoint in future. What do you think?
>
> /bq. Correct, the goal though is not to outperform the
> HeapStateBackend. The single key state backend requires sorted inputs
> which come with a price. The main goal is to outperform
> RocksDBStateBackend, which is necessary for large states./
> Personally I think the main benefit of introducing a bounded stream
> specific state backend is that we could remove the data after
> processing a key, thus reducing the cost of state storage a lot,
> rather than the routine performance of state processing (smile).
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Fri, 18 Sep 2020 at 20:48, Dawid Wysakowicz <dwysakow...@apache.org
> <mailto:dwysakow...@apache.org>> wrote:
>
>>     ===============================================
>>     /class BoundedStreamInternalStateBackend<K> implements
>>             KeyedStateBackend<K>,
>>             SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
>>             Closeable,
>>             CheckpointListener {/
>>     ===============================================/
>>     /
>     The problem is that I could not use this "state backend" in a
>     StreamOperator. The goal of this effort is that it is mostly
>     transparent to all the implementations of StreamOperator(s). Right
>     now StreamOperator retrieves AbstractKeyedStateBackend through
>     StreamOperatorContext which instantiates it in
>     StreamTaskInitializer etc. The problem is that a big chunk of the
>     current code base uses the AbstractKeyedStateBackend, whereas it
>     really just needs an interface not that particular implementation.
>     The change is really only about separating the contract
>     (InternalKeyedStateBackend) from the implementation
>     (AbstractKeyedStateBackend). My thinking is that it is only an
>     approach to fix a mistake of the past that StateBackend returns a
>     particular implementation rather than a contract.
>
>     I do agree I don't need the `SnapshotStrategy` and
>     `CheckpointListener` interfaces. The thing though is that the
>     runtime expects those contracts from an AbstractKeyedStateBackend.
>
>     BTW, If you'd like to see how does this change really looks like
>     you can check the PR I already opened for it:
>     https://github.com/apache/flink/pull/13405/files
>
>>     Checking the FLIP more closely I found below description: "With a
>>     high number of keys it (HeapStateBackend) suffers a significant
>>     penalty and becomes even less performant for that particular case
>>     than the sorting approach", does it mean "HeapStateBackend"
>>     outperformed "SingleKeyStateBackend" when the number of keys is
>>     relatively small
>     Correct, the goal though is not to outperform the
>     HeapStateBackend. The single key state backend requires sorted
>     inputs which come with a price. The main goal is to outperform
>     RocksDBStateBackend, which is necessary for large states.
>
>>     Thanks for the summary. I think it's more specific and could help
>>     readers to better understand why we cannot use
>>     HeapKeyedStateBackend directly, than the single line description
>>     "when the StateBackend observes a new incoming key it will reset
>>     all acquired state objects so far". What do you think?
>     Sure, I can add it to the document.
>
>     Best,
>
>     Dawid
>
>     On 18/09/2020 14:29, Yu Li wrote:
>>     Thanks for the clarification Dawid. Some of my thoughts:
>>
>>     /bq. The results are times for end-to-end execution of a job.
>>     Therefore the sorting part is included. The actual target of the
>>     replacement is RocksDB, which does the serialization and key
>>     bytes comparison as well./
>>     I see. Checking the FLIP more closely I found below description:
>>     "With a high number of keys it (HeapStateBackend) suffers a
>>     significant penalty and becomes even less performant for that
>>     particular case than the sorting approach", does it mean
>>     "HeapStateBackend" outperformed "SingleKeyStateBackend" when the
>>     number of keys is relatively small? The micro-benchmark of
>>     ValueState removes the key shuffling phase, so its result could
>>     be self-explained.
>>
>>     About `InternalKeyedStateBackend`, let me rephrase my question:
>>     why don't we add the new state backend like below instead of
>>     adding a new interface (and IMHO there's no need to implement the
>>     `SnapshotStrategy` and `CheckpointListener` interfaces since it
>>     doesn't support checkpoint)? Reserved for adding more internal
>>     state backends in future?
>>     ===============================================
>>     /class BoundedStreamInternalStateBackend<K> implements
>>             KeyedStateBackend<K>,
>>             SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
>>             Closeable,
>>             CheckpointListener {/
>>     ===============================================/
>>     /
>>
>>     /bq. Let me though quickly summarize and if you find it useful I
>>     can add it to the FLIP itself./
>>     Thanks for the summary. I think it's more specific and could help
>>     readers to better understand why we cannot use
>>     HeapKeyedStateBackend directly, than the single line description
>>     "when the StateBackend observes a new incoming key it will reset
>>     all acquired state objects so far". What do you think?
>>
>>     Thanks.
>>
>>     Best Regards,
>>     Yu
>>
>>
>>     On Thu, 17 Sep 2020 at 23:38, Dawid Wysakowicz
>>     <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>>
>>         Thanks for the comments Yu.
>>
>>         > First of all, for the performance testing result, I'm
>>         wondering whether the
>>         > sorting cost is counted in the result for both DataSet and
>>         refined
>>         > DataStream implementations. I could think of the saving of
>>         hash computation
>>         > and final iteration to emit the word-count result
>>         (processing a key at a
>>         > time could save such iteration), but not sure whether these
>>         cost savings
>>         > are at the same grade of comparing the key bytes.
>>         The results are times for end-to-end execution of a job.
>>         Therefore the
>>         sorting part is included. The actual target of the replacement is
>>         RocksDB, which does the serialization and key bytes
>>         comparison as well.
>>         On top of that it adds all the RocksDB bookkeeping.
>>
>>         > However, I'm not fully convinced to introduce a new
>>         > `InternalKeyedStateBackend` interface. I agree that we
>>         don't need to take
>>         > the overhead of `AbstractKeyedStateBackend` since we don't
>>         plan to support
>>         > checkpoint for now, but why don't we directly write a state
>>         backend
>>         > implementation for bounded stream? Or are we planning to
>>         introduce more
>>         > internal state backends in future? What's more, the current
>>         design of
>>         > `InternalKeyedStateBackend` in the FLIP document seems to
>>         be extending as
>>         > many interfaces as `AbstractedKeyedStateBackend`
>>         implements, which I guess
>>         > is a typo.
>>         Maybe I was not clear enough about the change. This change
>>         does not
>>         "strip" the AbstractKeyedStateBackend of any functionalities.
>>         My intent
>>         is not to remove any methods of the
>>         AbstractKeyedStateBackend. The
>>         problem here is that the AbstractKeyedStateBackend is an
>>         abstract class
>>         (duh ;)), which does have some predefined implementation.
>>         Moreover it
>>         requires objects such as InternalKeyContex, CloseableRegistry
>>         etc. to be
>>         constructed, which we don't need/want e.g. in the single key
>>         state
>>         backend. My intention here is to make the StateBackend return
>>         only pure
>>         interfaces. (AbstractKeyedStateBackend is the only
>>         non-interface that
>>         StateBackend returns). In other words I just want to make
>>         AbstractKeyedStateBackend a proper interface. It is not a
>>         typo that
>>         InternalKeyedStateBackend extends the same interfaces as
>>         AbstractKeyedStateBackend does.
>>
>>         > Thirdly, I suggest we name the special state backend as
>>         > `BoundedStreamInternalStateBackend`. And from our existing
>>         javadoc of
>>         > `StateBackend` it actually cannot be called a complete
>>         state backend...: "A
>>         > State Backend defines how the state of a streaming
>>         application is stored
>>         > and checkpointed".
>>         Thanks for the suggestion. Sure I can use that name. Yes I do
>>         agree it
>>         is not a full fledged StateBackend. I do want it to be an
>>         internal
>>         class, that is never used explicitly by users.
>>
>>         > Lastly, I didn't find a detailed design of the
>>         "SingleKeyStateBackend" in
>>         > the FLIP,
>>         I did not put it into the design, because 1) I found it
>>         internal. It
>>         does not touch any public facing interfaces. 2) It is rather
>>         straightforward. Let me though quickly summarize and if you
>>         find it
>>         useful I can add it to the FLIP itself.
>>
>>         > as how to detect the key switching
>>         That is rather straightforwad. The state backend works only
>>         with the
>>         assumption that the keys are sorted/grouped together. We keep the
>>         current key and in the setCurrentKey we check if the new key is
>>         different then the current one. Side note: yes, custom user
>>         operators
>>         which call setCurrentKey explicitly might not work in this setup.
>>
>>         > remove the data (especially in the non-windowing
>>         > case), etc.
>>         We only ever keep a single value for a state object. Therefore
>>         ValueState is a very thin wrapper for a value, MapState for a
>>         HashMap,
>>         ListState for a List etc. When the key changes we simply set
>>         the wrapped
>>         value/map/state to null.
>>
>>         I hope this clarifies a few things. Let me know if you have
>>         any questions.
>>
>>         Best,
>>
>>         Dawid
>>
>>         On 17/09/2020 15:28, Yu Li wrote:
>>         > Hi all,
>>         >
>>         > Sorry for being late to the discussion, but I just noticed
>>         there are some
>>         > state backend related changes proposed in this FLIP, so
>>         would like to share
>>         > my two cents.
>>         >
>>         > First of all, for the performance testing result, I'm
>>         wondering whether the
>>         > sorting cost is counted in the result for both DataSet and
>>         refined
>>         > DataStream implementations. I could think of the saving of
>>         hash computation
>>         > and final iteration to emit the word-count result
>>         (processing a key at a
>>         > time could save such iteration), but not sure whether these
>>         cost savings
>>         > are at the same grade of comparing the key bytes.
>>         >
>>         > Regardless of the performance result, I agree that the
>>         capability of
>>         > removing the data after processing a key could prominently
>>         reduce the space
>>         > required by state, so introducing a new state backend for
>>         bounded stream
>>         > makes sense.
>>         >
>>         > However, I'm not fully convinced to introduce a new
>>         > `InternalKeyedStateBackend` interface. I agree that we
>>         don't need to take
>>         > the overhead of `AbstractKeyedStateBackend` since we don't
>>         plan to support
>>         > checkpoint for now, but why don't we directly write a state
>>         backend
>>         > implementation for bounded stream? Or are we planning to
>>         introduce more
>>         > internal state backends in future? What's more, the current
>>         design of
>>         > `InternalKeyedStateBackend` in the FLIP document seems to
>>         be extending as
>>         > many interfaces as `AbstractedKeyedStateBackend`
>>         implements, which I guess
>>         > is a typo.
>>         >
>>         > Thirdly, I suggest we name the special state backend as
>>         > `BoundedStreamInternalStateBackend`. And from our existing
>>         javadoc of
>>         > `StateBackend` it actually cannot be called a complete
>>         state backend...: "A
>>         > State Backend defines how the state of a streaming
>>         application is stored
>>         > and checkpointed".
>>         >
>>         > Lastly, I didn't find a detailed design of the
>>         "SingleKeyStateBackend" in
>>         > the FLIP, and suggest we write the key design down, such as
>>         how to detect
>>         > the key switching and remove the data (especially in the
>>         non-windowing
>>         > case), etc.
>>         >
>>         > Thanks.
>>         >
>>         > Best Regards,
>>         > Yu
>>         >
>>         >
>>         > On Wed, 9 Sep 2020 at 17:18, Kurt Young <ykt...@gmail.com
>>         <mailto:ykt...@gmail.com>> wrote:
>>         >
>>         >> Yes, I didn't intend to block this FLIP, and some of the
>>         comments are
>>         >> actually implementation details.
>>         >> And all of them are handled internally, not visible to
>>         users, thus we can
>>         >> also change or improve them
>>         >> in the future.
>>         >>
>>         >> Best,
>>         >> Kurt
>>         >>
>>         >>
>>         >> On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek
>>         <aljos...@apache.org <mailto:aljos...@apache.org>>
>>         >> wrote:
>>         >>
>>         >>> I think Kurts concerns/comments are very valid and we
>>         need to implement
>>         >>> such things in the future. However, I also think that we
>>         need to get
>>         >>> started somewhere and I think what's proposed in this
>>         FLIP is a good
>>         >>> starting point that we can build on. So we should not get
>>         paralyzed by
>>         >>> thinking too far ahead into the future. Does that make sense?
>>         >>>
>>         >>> Best,
>>         >>> Aljoscha
>>         >>>
>>         >>> On 08.09.20 16:59, Dawid Wysakowicz wrote:
>>         >>>> Ad. 1
>>         >>>>
>>         >>>> Yes, you are right in principle.
>>         >>>>
>>         >>>> Let me though clarify my proposal a bit. The proposed
>>         sort-style
>>         >>>> execution aims at a generic KeyedProcessFunction were
>>         all the
>>         >>>> "aggregations" are actually performed in the user code.
>>         It tries to
>>         >>>> improve the performance by actually removing the need to
>>         use RocksDB
>>         >>> e.g.:
>>         >>>>      private static final class Summer<K>
>>         >>>>              extends KeyedProcessFunction<K, Tuple2<K,
>>         Integer>,
>>         >>>> Tuple2<K, Integer>> {
>>         >>>>
>>         >>>>          ....
>>         >>>>
>>         >>>>          @Override
>>         >>>>          public void processElement(
>>         >>>>                  Tuple2<K, Integer> value,
>>         >>>>                  Context ctx,
>>         >>>>                  Collector<Tuple2<K, Integer>> out)
>>         throws Exception {
>>         >>>>              if (!Objects.equals(timerRegistered.value(),
>>         >> Boolean.TRUE))
>>         >>> {
>>         >>> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>>         >>>>                  timerRegistered.update(true);
>>         >>>>              }
>>         >>>>              Integer v = counter.value();
>>         >>>>              Integer incomingValue = value.f1;
>>         >>>>              if (v != null) {
>>         >>>>                  v += incomingValue;
>>         >>>>              } else {
>>         >>>>                  v = incomingValue;
>>         >>>>              }
>>         >>>>              counter.update(v);
>>         >>>>          }
>>         >>>>
>>         >>>>          ....
>>         >>>>
>>         >>>>     }
>>         >>>>
>>         >>>> Therefore I don't think the first part of your reply
>>         with separating
>>         >> the
>>         >>>> write and read workload applies here. We do not aim to
>>         create a
>>         >>>> competing API with the Table API. We think operations
>>         such as joins or
>>         >>>> analytical aggregations should be performed in Table API.
>>         >>>>
>>         >>>> As for the second part I agree it would be nice to fall
>>         back to the
>>         >>>> sorting approach only if a certain threshold of memory
>>         in a State
>>         >>>> Backend is used. This has some problems though. We would
>>         need a way to
>>         >>>> estimate the size of the occupied memory to tell when
>>         the threshold is
>>         >>>> reached. That is not easily doable by default e.g. in a
>>         >>>> MemoryStateBackend, as we do not serialize the values in
>>         the state
>>         >>>> backend by default. We would have to add that, but this
>>         would add the
>>         >>>> overhead of the serialization.
>>         >>>>
>>         >>>> This proposal aims at the cases where we do have a large
>>         state that
>>         >> will
>>         >>>> not fit into the memory and without the change users are
>>         forced to use
>>         >>>> RocksDB. If the state fits in memory I agree it will be
>>         better to do
>>         >>>> hash-based aggregations e.g. using the
>>         MemoryStateBackend. Therefore I
>>         >>>> think it is important to give users the choice to use
>>         one or the other
>>         >>>> approach. We might discuss which approach should be the
>>         default for
>>         >>>> RuntimeMode.BATCH proposed in FLIP-134. Should it be
>>         hash-based with
>>         >>>> user configured state backend or sorting-based with a
>>         single key at a
>>         >>>> time backend. Moreover we could think if we should let
>>         users choose the
>>         >>>> sort vs hash "state backend" per operator. Would that
>>         suffice?
>>         >>>>
>>         >>>> Ad. 2
>>         >>>>
>>         >>>> I still think we can just use the first X bytes of the
>>         serialized form
>>         >>>> as the normalized key and fallback to comparing full
>>         keys on clashes.
>>         >> It
>>         >>>> is because we are actually not interested in a logical
>>         order, but we
>>         >>>> care only about the "grouping" aspect of the sorting.
>>         Therefore I think
>>         >>>> its enough to compare only parts of the full key as the
>>         normalized key.
>>         >>>>
>>         >>>> Thanks again for the really nice and thorough feedback!
>>         >>>>
>>         >>>> Best,
>>         >>>>
>>         >>>> Dawid
>>         >>>>
>>         >>>> On 08/09/2020 14:47, Kurt Young wrote:
>>         >>>>> Regarding #1, yes the state backend is definitely
>>         hash-based
>>         >> execution.
>>         >>>>> However there are some differences between
>>         >>>>> batch hash-based execution. The key difference is
>>         *random access &
>>         >>>>> read/write mixed workload". For example, by using
>>         >>>>> state backend in streaming execution, one have to mix
>>         the read and
>>         >> write
>>         >>>>> operations and all of them are actually random
>>         >>>>> access. But in a batch hash execution, we could divide
>>         the phases into
>>         >>>>> write and read. For example, we can build the
>>         >>>>> hash table first, with only write operations. And once
>>         the build is
>>         >>> done,
>>         >>>>> we can start to read and trigger the user codes.
>>         >>>>> Take hash aggregation which blink planner implemented
>>         as an example,
>>         >>> during
>>         >>>>> building phase, as long as the hash map
>>         >>>>> could fit into memory, we will update the accumulators
>>         directly in the
>>         >>> hash
>>         >>>>> map. And once we are running out of memory,
>>         >>>>> we then fall back to sort based execution. It improves the
>>         >> performance a
>>         >>>>> lot if the incoming data can be processed in
>>         >>>>> memory.
>>         >>>>>
>>         >>>>> Regarding #2, IIUC you are actually describing a binary
>>         format of key,
>>         >>> not
>>         >>>>> normalized key which is used in DataSet. I will
>>         >>>>> take String for example. If we have lots of keys with
>>         length all
>>         >> greater
>>         >>>>> than, let's say 20. In your proposal, you will encode
>>         >>>>> the whole string in the prefix of your composed data (
>>         <key> +
>>         >>> <timestamp>
>>         >>>>> + <record> ). And when you compare
>>         >>>>> records, you will actually compare the *whole* key of
>>         the record. For
>>         >>>>> normalized key, it's fixed-length in this case, IIRC it
>>         will
>>         >>>>> take 8 bytes to represent the string. And the sorter
>>         will store the
>>         >>>>> normalized key and offset in a dedicated array. When doing
>>         >>>>> the sorting, it only sorts this *small* array. If the
>>         normalized keys
>>         >>> are
>>         >>>>> different, you could immediately tell which is greater from
>>         >>>>> normalized keys. You only have to compare the full keys
>>         if the
>>         >>> normalized
>>         >>>>> keys are equal and you know in this case the normalized
>>         >>>>> key couldn't represent the full key. The reason why
>>         Dataset is doing
>>         >>> this
>>         >>>>> is it's super cache efficient by sorting the *small* array.
>>         >>>>> The idea is borrowed from this paper [1]. Let me know
>>         if I missed or
>>         >>>>> misunderstood anything.
>>         >>>>>
>>         >>>>> [1] https://dl.acm.org/doi/10.5555/615232.615237
>>         (AlphaSort: a
>>         >>>>> cache-sensitive parallel external sort)
>>         >>>>>
>>         >>>>> Best,
>>         >>>>> Kurt
>>         >>>>>
>>         >>>>>
>>         >>>>> On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz <
>>         >> dwysakow...@apache.org <mailto:dwysakow...@apache.org>
>>         >>>>> wrote:
>>         >>>>>
>>         >>>>>> Hey Kurt,
>>         >>>>>>
>>         >>>>>> Thank you for comments!
>>         >>>>>>
>>         >>>>>> Ad. 1 I might have missed something here, but as far
>>         as I see it is
>>         >>> that
>>         >>>>>> using the current execution stack with regular state
>>         backends
>>         >> (RocksDB
>>         >>>>>> in particular if we want to have spilling
>>         capabilities) is equivalent
>>         >>> to
>>         >>>>>> hash-based execution. I can see a different spilling
>>         state backend
>>         >>>>>> implementation in the future, but I think it is not
>>         batch specifc. Or
>>         >>> am
>>         >>>>>> I missing something?
>>         >>>>>>
>>         >>>>>> Ad. 2 Totally agree that normalized keys are important
>>         to the
>>         >>>>>> performance. I think though TypeComparators are not a
>>         necessity to
>>         >> have
>>         >>>>>> that. Actually  this proposal is heading towards only
>>         ever performing
>>         >>>>>> "normalized keys" comparison. I have not included in
>>         the proposal the
>>         >>>>>> binary format which we will use for sorting (partially
>>         because I
>>         >>> forgot,
>>         >>>>>> and partially because I thought it was too much of an
>>         implementation
>>         >>>>>> detail). Let me include it here though, as it might
>>         clear the
>>         >> situation
>>         >>>>>> a bit here.
>>         >>>>>>
>>         >>>>>> In DataSet, at times we have KeySelectors which
>>         extract keys based on
>>         >>>>>> field indices or names. This allows in certain
>>         situation to extract
>>         >> the
>>         >>>>>> key from serialized records. Compared to DataSet, in
>>         DataStream, the
>>         >>> key
>>         >>>>>> is always described with a black-box KeySelector, or
>>         differently
>>         >> with a
>>         >>>>>> function which extracts a key from a deserialized
>>         record.  In turn
>>         >>> there
>>         >>>>>> is no way to create a comparator that could compare
>>         records by
>>         >>>>>> extracting the key from a serialized record (neither
>>         with, nor
>>         >> without
>>         >>>>>> key normalization). We suggest that the input for the
>>         sorter will be
>>         >>>>>>
>>         >>>>>> <key> + <timestamp> + <record>
>>         >>>>>>
>>         >>>>>> Without having the key prepended we would have to
>>         deserialize the
>>         >>> record
>>         >>>>>> for every key comparison.
>>         >>>>>>
>>         >>>>>> Therefore if we agree that we perform binary
>>         comparison for keys
>>         >> (which
>>         >>>>>> are always prepended), it is actually equivalent to a
>>         DataSet with
>>         >>>>>> TypeComparators that support key normalization.
>>         >>>>>>
>>         >>>>>> Let me know if that is clear, or I have missed
>>         something here.
>>         >>>>>>
>>         >>>>>> Best,
>>         >>>>>>
>>         >>>>>> Dawid
>>         >>>>>>
>>         >>>>>> On 08/09/2020 03:39, Kurt Young wrote:
>>         >>>>>>> Hi Dawid, thanks for bringing this up, it's really
>>         exciting to see
>>         >>> that
>>         >>>>>>> batch execution is introduced in DataStream. From the
>>         flip, it seems
>>         >>>>>>> we are sticking with sort based execution mode (at
>>         least for now),
>>         >>> which
>>         >>>>>>> will sort the whole input data before any *keyed*
>>         operation is
>>         >>>>>>> executed. I have two comments here:
>>         >>>>>>>
>>         >>>>>>> 1. Do we want to introduce hash-based execution in
>>         the future? Sort
>>         >>> is a
>>         >>>>>>> safe choice but not the best in lots of cases. IIUC
>>         we only need
>>         >>>>>>> to make sure that before the framework finishes
>>         dealing with one
>>         >> key,
>>         >>> the
>>         >>>>>>> operator doesn't see any data belonging to other
>>         keys, thus
>>         >>>>>>> hash-based execution would also do the trick. Oon
>>         tricky thing the
>>         >>>>>>> framework might need to deal with is memory
>>         constraint and spilling
>>         >>>>>>> in the hash map, but Flink also has some good
>>         knowledge about these
>>         >>>>>> stuff.
>>         >>>>>>> 2. Going back to sort-based execution and how to sort
>>         keys. From my
>>         >>>>>>> experience, the performance of sorting would be one
>>         the most
>>         >> important
>>         >>>>>>> things if we want to achieve good performance of
>>         batch execution.
>>         >> And
>>         >>>>>>> normalized keys are actually the key of the
>>         performance of sorting.
>>         >>>>>>> If we want to get rid of TypeComparator, I think we
>>         still need to
>>         >>> find a
>>         >>>>>>> way to introduce this back.
>>         >>>>>>>
>>         >>>>>>> Best,
>>         >>>>>>> Kurt
>>         >>>>>>>
>>         >>>>>>>
>>         >>>>>>> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek <
>>         >> aljos...@apache.org <mailto:aljos...@apache.org>>
>>         >>>>>> wrote:
>>         >>>>>>>> Yes, I think we can address the problem of
>>         indeterminacy in a
>>         >>> separate
>>         >>>>>>>> FLIP because we're already in it.
>>         >>>>>>>>
>>         >>>>>>>> Aljoscha
>>         >>>>>>>>
>>         >>>>>>>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>         >>>>>>>>> @Seth That's a very good point. I agree that
>>         RocksDB has the same
>>         >>>>>>>>> problem. I think we can use the same approach for
>>         the sorted
>>         >>> shuffles
>>         >>>>>>>>> then. @Aljoscha I agree we should think about
>>         making it more
>>         >>> resilient,
>>         >>>>>>>>> as I guess users might have problems already if
>>         they use keys with
>>         >>>>>>>>> non-deterministic binary representation. How do you
>>         feel about
>>         >>>>>>>>> addressing that separately purely to limit the
>>         scope of this FLIP?
>>         >>>>>>>>>
>>         >>>>>>>>> @Aljoscha I tend to agree with you that the best
>>         place to actually
>>         >>>>>> place
>>         >>>>>>>>> the sorting would be in the InputProcessor(s). If
>>         there are no
>>         >> more
>>         >>>>>>>>> suggestions in respect to that issue. I'll put this
>>         proposal for
>>         >>>>>> voting.
>>         >>>>>>>>> @all Thank you for the feedback so far. I'd like to
>>         start a voting
>>         >>>>>>>>> thread on the proposal tomorrow. Therefore I'd
>>         appreciate if you
>>         >>>>>> comment
>>         >>>>>>>>> before that, if you still have some outstanding ideas.
>>         >>>>>>>>>
>>         >>>>>>>>> Best,
>>         >>>>>>>>>
>>         >>>>>>>>> Dawid
>>         >>>>>>>>>
>>         >>>>>>>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
>>         >>>>>>>>>> Seth is right, I was just about to write that as
>>         well. There is a
>>         >>>>>>>>>> problem, though, because some of our
>>         TypeSerializers are not
>>         >>>>>>>>>> deterministic even though we use them as if they
>>         were. Beam
>>         >>> excludes
>>         >>>>>>>>>> the FloatCoder, for example, and the AvroCoder in
>>         certain cases.
>>         >>> I'm
>>         >>>>>>>>>> pretty sure there is also weirdness going on in our
>>         >> KryoSerializer.
>>         >>>>>>>>>> On 04.09.20 14:59, Seth Wiesman wrote:
>>         >>>>>>>>>>> There is already an implicit assumption the
>>         TypeSerializer for
>>         >>> keys
>>         >>>>>> is
>>         >>>>>>>>>>> stable/deterministic, RocksDB compares keys using
>>         their
>>         >> serialized
>>         >>>>>> byte
>>         >>>>>>>>>>> strings. I think this is a non-issue (or at least
>>         it's not
>>         >>> changing
>>         >>>>>> the
>>         >>>>>>>>>>> status quo).
>>         >>>>>>>>>>>
>>         >>>>>>>>>>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther
>>         <twal...@apache.org <mailto:twal...@apache.org>
>>         >>>>>>>> wrote:
>>         >>>>>>>>>>>> +1 for getting rid of the TypeComparator
>>         interface and rely on
>>         >>> the
>>         >>>>>>>>>>>> serialized representation for grouping.
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> Adding a new type to DataStream API is quite
>>         difficult at the
>>         >>> moment
>>         >>>>>>>>>>>> due
>>         >>>>>>>>>>>> to too many components that are required:
>>         TypeInformation
>>         >> (tries
>>         >>> to
>>         >>>>>>>>>>>> deal
>>         >>>>>>>>>>>> with logical fields for TypeComparators),
>>         TypeSerializer (incl.
>>         >>> it's
>>         >>>>>>>>>>>> snapshot interfaces), and TypeComparator (with
>>         many methods and
>>         >>>>>>>>>>>> internals such normalized keys etc.).
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> If necessary, we can add more simple
>>         comparison-related methods
>>         >>> to
>>         >>>>>> the
>>         >>>>>>>>>>>> TypeSerializer interface itself in the future (like
>>         >>>>>>>>>>>> TypeSerializer.isDeterministic).
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> Regards,
>>         >>>>>>>>>>>> Timo
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>>
>>         >>>>>>>>>>>> On 04.09.20 11:48, Aljoscha Krettek wrote:
>>         >>>>>>>>>>>>> Thanks for publishing the FLIP!
>>         >>>>>>>>>>>>>
>>         >>>>>>>>>>>>> On 2020/09/01 06:49:06, Dawid Wysakowicz <
>>         >>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
>>         >>>>>>>>>>>>> wrote:
>>         >>>>>>>>>>>>>>      1. How to sort/group keys? What
>>         representation of the
>>         >> key
>>         >>>>>>>>>>>>>> should we
>>         >>>>>>>>>>>>>>         use? Should we sort on the binary form
>>         or should we
>>         >>> depend
>>         >>>>>> on
>>         >>>>>>>>>>>>>>         Comparators being available.
>>         >>>>>>>>>>>>> Initially, I suggested to Dawid (in private) to
>>         do the
>>         >>>>>>>>>>>>> sorting/grouping
>>         >>>>>>>>>>>> by using the binary representation. Then my
>>         opinion switched
>>         >> and
>>         >>> I
>>         >>>>>>>>>>>> thought
>>         >>>>>>>>>>>> we should use TypeComparator/Comparator because
>>         that's what the
>>         >>>>>>>>>>>> DataSet API
>>         >>>>>>>>>>>> uses. After talking to Stephan, I'm again
>>         encouraged in my
>>         >>> opinion
>>         >>>>>>>>>>>> to use
>>         >>>>>>>>>>>> the binary representation because it means we
>>         can eventually
>>         >> get
>>         >>> rid
>>         >>>>>>>>>>>> of the
>>         >>>>>>>>>>>> TypeComparator interface, which is a bit
>>         complicated, and
>>         >>> because we
>>         >>>>>>>>>>>> don't
>>         >>>>>>>>>>>> need any good order in our sort, we only need
>>         the grouping.
>>         >>>>>>>>>>>>> This comes with some problems, though: we need
>>         to ensure that
>>         >>> the
>>         >>>>>>>>>>>> TypeSerializer of the type we're sorting is
>>         >> stable/deterministic.
>>         >>>>>>>>>>>> Beam has
>>         >>>>>>>>>>>> infrastructure for this in the form of
>>         >>> Coder.verifyDeterministic()
>>         >>>>>> [1]
>>         >>>>>>>>>>>> which we don't have right now and should add if
>>         we go down this
>>         >>>>>> path.
>>         >>>>>>>>>>>>>>      2. Where in the stack should we apply the
>>         sorting (this
>>         >>>>>> rather a
>>         >>>>>>>>>>>>>>         discussion about internals)
>>         >>>>>>>>>>>>> Here, I'm gravitating towards the third option
>>         of implementing
>>         >>> it
>>         >>>>>>>>>>>>> in the
>>         >>>>>>>>>>>> layer of the StreamTask, which probably means
>>         implementing a
>>         >>> custom
>>         >>>>>>>>>>>> InputProcessor. I think it's best to do it in
>>         this layer
>>         >> because
>>         >>> we
>>         >>>>>>>>>>>> would
>>         >>>>>>>>>>>> not mix concerns of different layers as we would
>>         if we
>>         >>> implemented
>>         >>>>>>>>>>>> this as
>>         >>>>>>>>>>>> a custom StreamOperator. I think this solution
>>         is also best
>>         >> when
>>         >>> it
>>         >>>>>>>>>>>> comes
>>         >>>>>>>>>>>> to multi-input operators.
>>         >>>>>>>>>>>>>>      3. How should we deal with custom
>>         implementations of
>>         >>>>>>>>>>>>>> StreamOperators
>>         >>>>>>>>>>>>> I think the cleanest solution would be to go
>>         through the
>>         >>> complete
>>         >>>>>>>>>>>> operator lifecycle for every key, because then
>>         the watermark
>>         >>> would
>>         >>>>>> not
>>         >>>>>>>>>>>> oscillate between -Inf and +Inf and we would not
>>         break the
>>         >>>>>> semantical
>>         >>>>>>>>>>>> guarantees that we gave to operators so far, in
>>         that the
>>         >>> watermark
>>         >>>>>> is
>>         >>>>>>>>>>>> strictly monotonically increasing. However, I
>>         don't think this
>>         >>>>>>>>>>>> solution is
>>         >>>>>>>>>>>> feasible because it would come with too much
>>         overhead. We
>>         >> should
>>         >>>>>>>>>>>> solve this
>>         >>>>>>>>>>>> problem via documentation and maybe educate
>>         people to not query
>>         >>> the
>>         >>>>>>>>>>>> current
>>         >>>>>>>>>>>> watermark or not rely on the watermark being
>>         monotonically
>>         >>>>>>>>>>>> increasing in
>>         >>>>>>>>>>>> operator implementations to allow the framework
>>         more freedoms
>>         >> in
>>         >>> how
>>         >>>>>>>>>>>> user
>>         >>>>>>>>>>>> programs are executed.
>>         >>>>>>>>>>>>> Aljoscha
>>         >>>>>>>>>>>>>
>>         >>>>>>>>>>>>> [1]
>>         >>
>>         
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L184
>>         >>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to