Hi Yun & Nico,

few thoughts on the discussion

1) Making the TemporalListState generic

This is just not possible with the current infrastructure w.r.t type
serializers as the sorting key *needs to be comparable on the binary level*
(serialized form).

What I could imagine, is introducing some kind of `Sorted(List|Map)State`
with explicit binary keys. User would either have to work directly with
`byte[]` keys or provide a function for transforming keys into the binary
representation that could be sorted (this would have to be different from
`TypeSerializer` which could get more fancy with the binary representation,
eg. to save up space -> varints).

This kind of interface might be really hard to grasp by the pipeline
authors. There needs to be a deeper understanding how the byte comparison
works (eg. it needs to be different from the java byte comparison which
compares bytes as `signed`). This could be maybe partially mitigated by
providing predefined `to binary sorting key` functions for the common
primitives / types.

2) Duplicates

I guess, this all boils down to dealing with duplicates / values for the
> same timestamp.
>

We should never have duplicates. Let's try to elaborate on what having the
duplicates really means in this context.

a) Temporal Table point of view

There could be only a single snapshot of the table at any given point in
time (~ physics). If we allow for duplicates we violate this, as it's not
certain what the actual state of the table is at that point in time. In
case of the temporal join, what should the elements from the other side
join against?

If we happen to have a duplicate, it actually brings us to b) causality
(which could actually answer the previous question).

b) Causality

When building any kind of state machine, it's important to think about
causality (if we switch the order of events, state transitions no longer
result in the same state). Temporal table is a specific type of the state
machine.

There are several approaches to mitigate this:
I) nano-second precision -> the chance that two events affecting the same
thing happen at the exactly same nanosecond is negligible (from the
physical standpoint)
II) the sorting key is tuple of (timestamp, sequential id) -> an example
could be early firings (you get speculative results from a windowed
aggregation with timestamp = EOW, but you can easily assign the order in
which these have happened)
III) last write with the same timestamp wins -> this is a special case of
II) when we're sure that elements with the duplicate timestamp come in order

c) Secondary Sort

Handling the actual duplicates requires secondary sort key, which might
complicate the `to binary sorting key` interface discussed in 1) -
basically some kind of user provided duplicate handler.

d) Temporal Value State

The above point apply to the temporal value state as well, it just pushes
the responsibility away from the state interface. I'm still not convinced
that this is a right direction.

-

For most use cases I've seen, the millisecond precision is more than enough
(+ the last write wins as a fallback). Supporting the use cases where it's
not actually enough (I've seen that as well in the past, especially with
the early firings), might be actually a good case for a more generic form
of the state, that we've discussed in 1).

3) Map vs List

I think this also boils down to the discussion of how to handle duplicates.
>From the commonly accepted contracts, list implies that there could be
duplicates and map implies otherwise. One concern about `Map` is that it
also implies that you should be able to do a point query.

Best,
D.

.

On Fri, Apr 22, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote:

> Hi Nico,
>
> I did not mean that we need to support all APIs in NavigableMap, and it is
> indeed too heavily to implement them all.
> Moreover, I also prefer iterator-like API instead of the original #tailMap
> like API. I just use NavigableMap's API to show examples.
>
> I think we can introduce new APIs like:
>
> SortedMapState<UK, UV> extends State
>   Map.Entry<UK, UV> firstEntry() throws Exception;
>   Map.Entry<UK, UV> lastEntry() throws Exception;
>   Iterator<Map.Entry<UK, UV>> headIterator(UK endUserKey) throws Exception;
>   Iterator<Map.Entry<UK, UV>> tailIterator(UK startUserKey) throws
> Exception;
>   Iterator<Map.Entry<UK, UV>> subIterator(UK startUserKey, UK endUserKey)
> throws Exception;
>
> Since SortedMapState has several new APIs, I prefer to introduce new state
> descriptor to distinguish with the original map state.
>
> For the API of SortedMapOfListsState, I would not be strong against, and I
> just want to know the actual benefits if we really want to introduce API.
>
> When talking about the part of changelog state backend, my concern is
> about how to group keys together in the changelog logger.
> I can share a problem, and before this I need to spend some time on how to
> implement serializer to keep the order of serialized bytes same as original
> java objects first.
> For the fixed-length serializer, such as LongSerializer, we just need to
> ensure all numbers are positive or inverting the sign bit.
> However, for non-fixed-length serializer, such as StringSerializer, it
> will write the length of the bytes first, which will break the natural
> order if comparing the bytes. Thus, we might need to avoid to write the
> length in the serialized bytes.
> On the other hand, changelog logger would record operation per key one by
> one in the logs. We need to consider how to distinguish each key in the
> combined serialized byte arrays.
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Nico Kruber <n...@apache.org>
> *Sent:* Thursday, April 21, 2022 23:50
> *To:* dev <dev@flink.apache.org>
> *Cc:* David Morávek <david.mora...@gmail.com>; Yun Tang <myas...@live.com>
> *Subject:* Re: [DISCUSS] FLIP-220: Temporal State
>
> Thanks Yun Tang for your clarifications.
> Let me keep my original structure and reply in these points...
>
> 3. Should we generalise the Temporal***State to offer arbitrary key types
> and
> not just Long timestamps?
>
> The use cases you detailed do indeed look similar to the ones we were
> optimising in our TemporalState PoC...
>
> I don't think, I'd like to offer a full implementation of NavigableMap
> though
> because that seems quite some overhead to implement while we can cover the
> mentioned examples with the proposed APIs already when using iterators as
> well
> as single-value retrievals.
> So far, when we were iterating from the smallest key, we could just use
> Long.MIN_VALUE and start from there. That would be difficult to generalise
> for
> arbitrary data types because you may not always know the smallest possible
> value for a certain serialized type (unless we put this into the
> appropriate
> serializer interface).
>
> I see two options here:
> a) a slim API but using NULL as an indicator for smallest/largest
> depending on
> the context, e.g.
>   - `readRange(null, key)` means from beginning to key
>   - `readRange(key, null)` means from key to end
>   - `readRange(null, null)` means from beginning to end
>   - `value[AtOr]Before(null)` means largest available key
>   - `value[AtOr]After(null)` means smallest available key
> b) a larger API with special methods for each of these use cases similar
> to
> what NavigableMap has but based on iterators and single-value functions
> only
>
> > BTW, I prefer to introduce another state descriptor instead of current
> map
> > state descriptor.
>
> Can you elaborate on this? We currently don't need extra functionality, so
> this would be a plain copy of the MapStateDescriptor...
>
> > For the API of SortedMapOfListsState, I think this is a bit bounded to
> > current implementation of RocksDB state-backend.
>
> Actually, I don't think this is special to RocksDB but generic to all
> state
> backends that do not hold values in memory and allow fast append-like
> operations.
> Additionally, since this is a very common use case and RocksDB is also
> widely
> used, I wouldn't want to continue without this specialization. For a
> similar
> reason, we offer ListState and not just ValueState<List>...
>
>
> 4. ChangelogStateBackend
>
> > For the discussion of ChangelogStateBackend, you can think of changelog
> > state-backend as a write-ahead-log service. And we need to record the
> > changes to any state, thus this should be included in the design doc as
> we
> > need to introduce another kind of state, especially you might need to
> > consider how to store key bytes serialized by the new serializer (as we
> > might not be able to write the length in the beginning of serialized
> bytes
> > to make the order of bytes same as natural order).
>
> Since the ChangelogStateBackend "holds the working state in the underlying
> delegatedStateBackend, and forwards state changes to State Changelog", I
> honestly still don't see how this needs special handling. As long as the
> delegated state backend suppors sorted state, ChangelogStateBackend
> doesn't
> have to do anything special except for recording changes to state. Our PoC
> simply uses the namespace for these keys and that's the same thing the
> Window
> API is already using - so there's nothing special here. The order in the
> log
> doesn't have to follow the natural order because this is only required
> inside
> the delegatedStateBackend, isn't it?
>
>
> Nico
>
> On Wednesday, 20 April 2022 17:03:11 CEST Yun Tang wrote:
> > Hi Nico,
> >
> > Thanks for your clarification.
> > For the discussion about generalizing Temporal state to sorted map
> state. I
> > could give some examples of how to use sorted map state in min/max with
> > retract functions.
>
> > As you know, NavigableMap in java has several APIs like:
> >
> >     Map.Entry<K,V> firstEntry();
> >     Map.Entry<K,V> lastEntry();
> >     NavigableMap<K,V> tailMap(K fromKey, boolean inclusive)
> >
> > The #firstEntry API could be used in MinWithRetractAggFunction#updateMin,
> > #lastEntry could be used in MaxWithRetractAggFunction#updateMax, and
> > #tailMap could be used in FirstValueWithRetractAggFunction#retract.
>  If we
> > can introduce SortedMap-like state, these functions could be benefited.
> > BTW, I prefer to introduce another state descriptor instead of current
> map
> > state descriptor.
> > For the API of SortedMapOfListsState, I think this is a bit bounded to
> > current implementation of RocksDB state-backend.
>
> > For the discussion of ChangelogStateBackend, you can think of changelog
> > state-backend as a write-ahead-log service. And we need to record the
> > changes to any state, thus this should be included in the design doc as
> we
> > need to introduce another kind of state, especially you might need to
> > consider how to store key bytes serialized by the new serializer (as we
> > might not be able to write the length in the beginning of serialized
> bytes
> > to make the order of bytes same as natural order).
>
> > Best
> > Yun Tang.
> > ________________________________
> > From: Nico Kruber <n...@apache.org>
> > Sent: Wednesday, April 20, 2022 0:38
> > To: dev <dev@flink.apache.org>
> > Cc: Yun Tang <myas...@live.com>; David Morávek <david.mora...@gmail.com>
> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >
> > Hi all,
> > I have read the discussion points from the last emails and would like to
> > add
>  my two cents on what I believe are the remaining points to solve:
> > 1. Do we need a TemporalValueState?
> >
> > I guess, this all boils down to dealing with duplicates / values for the
> > same
>  timestamp. Either you always have to account for them and thus always
> > have to store a list anyway, or you only need to join with "the latest"
> (or
> > the only) value for a given timestamp and get a nicer API and lower
> > overhead for that use case.
> > At the easiest, you can make an assumption that there is only a single
> > value
>  for each timestamp by contract, e.g. by increasing the timestamp
> > precision and interpreting them as nanoseconds, or maybe milliseconds are
> > already good enough. If that contract breaks, however, you will get into
> > undefined behaviour.
> > The TemporalRowTimeJoinOperator, for example, currently just assumes that
> > there is only a single value on the right side of the join (rightState)
> and
> > I
>  believe many use cases can make that assumption or otherwise you'd have
> > to define the expected behaviour for multiple values at the same
> timestamp,
> > e.g. "join with the most recent value at the time of the left side and if
> > there are multiple values, choose X".
> >
> > I lean towards having a ValueState implementation as well (in addition to
> > lists).
> >
> >
> > 2. User-facing API (Iterators vs. valueAtOr[Before|After])
> >
> > I like the iterable-based APIs that David M was proposing, i.e.
> > - Iterable<TimestampedValue<T>> readRange(long minTimestamp, long
> > limitTimestamp);
> > - void clearRange(long minTimestamp, long limitTimestamp);
> >
> > However, I find Iterables rather cumbersome to work with if you actually
> > only
>  need a single value, e.g. the most recent one.
> > For iterating over a range of values, however, they feel more natural to
> me
> > than our proposal.
> >
> > Actually, if we generalise the key type (see below), we may also need to
> > offer
>  additional value[Before|After] functions to cover "+1" iterations
> > where we cannot simply add 1 as we do now.
> >
> > (a) How about offering both Iterables and
> > value[AtOrBefore|AtOrAfter|Before|
>  After]?
> > This would be similar to what NavigableMap [2] is offering but with a
> more
> > explicit API than "ceiling", "floor",...
> >
> > (b) Our API proposal currently also allows iterating backwards which is
> not
> > covered by the readRange proposal - we could, however, just do that if
> > minTimestamp > limitTimestamp). What do you think?
> >
> > (c) When implementing the iterators, I actually also see two different
> > modes
>  which may differ in performance: I call them iteration with eager
> > vs. lazy value retrieval. Eager retrieval may retrieve all values in a
> > range at once and make them available in memory, e.g. for smaller data
> sets
> > similar to what TemporalRowTimeJoinOperator is doing for the right side
> of
> > the join. This can be spare a lot of Java<->JNI calls and let RocksDB
> > iterate only once (as long at things fit into memory). Lazy retrieval
> would
> > fetch results one-by-one. -> We could set one as default and allow the
> user
> > to override that behaviour.
> >
> > 3. Should we generalise the Temporal***State to offer arbitrary key types
> > and
>  not just Long timestamps?
> >
> > @Yun Tang: can you describe in more detail where you think this would be
> > needed for SQL users? I don't quite get how this would be beneficial. The
> > example you linked doesn't quite show the same behaviour.
> >
> > Other than this, I could see that you can leverage such a generalisation
> > for
>  arbitrary joins between, for example, IDs and ID ranges which don't
> > have a time component attached to it. Given that this shouldn't be too
> > difficult to expose (the functionality has to exist anyway, but otherwise
> > buried into Flink's internals). We'd just have to find suitable names.
> >
> > (a) I don't think TemporalListState<T> is actually SortedMapState<Long,
> > List<T>> because we need efficient "add to list" primitives which cannot
> > easily be made available with a single generic SortedMapState...
> >
> > (b) So the most expressive (yet kind-of ugly) names could be
> > - SortedMapState<Long, ValueType>
> > - SortedMapOfListsState<Long, List<ValueType>>
> >
> > (c) For both of these, we could then re-use the existing
> MapStateDescriptor
> > to
>  define key and value/list-element types and require that the key type /
> > serializer implements a certain RetainingSortOrderSerializer interface
> (and
> > think about a better name for this) which defines the contract that the
> > binary sort order is the same as the Java Object one.
> > -> that can also be verified at runtime to fail early.
> >
> >
> > 4. ChangelogStateBackend: we don't think this needs special attention -
> it
> > is
>  just delegating to the other backends anyway and these methods are
> > already adapted in our POC code
> >
> >
> > @David M, Yun Tang: let me/us know what you think about these proposals
> >
> >
> >
> > Nico
> >
> >
> > [2]
> https://docs.oracle.com/javase/8/docs/api/java/util/NavigableMap.html
> >
> > On Thursday, 14 April 2022 14:15:53 CEST Yun Tang wrote:
> >
> > > Hi David Anderson,
> > >
> > >
> > >
> > > I feel doubted that no motivating use case for this generalization to
> > > SortedMapState. From our internal stats, SQL user would use much more
> > > cases
>  of min/max with retract functions [1] compared with interval join.
> >
> >
> > > From my understanding, the TemporalListState<T> is actually
> > > SortedMapState<Long, List<T>>, while TemporalValueState<T> is
> > > SortedMapState<Long, T>. As you can see, if we just restrict
> > > SortedMapState
>  with the key type as Long, all current two new interfaces
> > > could be replaced.
> >
> >
> >
> > > Moreover, once we introduce temporal state, the extension would be
> > > limited.
>  Apart from Long, many other types could be comparable, e.g.
> > > TimeStamp, Int, Float and so on. How could we handle these feature
> > > request after
> > > TemporalState merged? I don't think introducing too many state types
> is a
> > > good idea. We can only support Long type for the 1st version when
> > > introducing SortedMapState, and then extends it to many other more
> types
> > > in
>  the future. This could balance the feature requests with clean
> > > interfaces design. And thus, we can also use sorted map state in the
> > > popular min/max functions.
> >
> >
> >
> > >
> > >
> > > By the way, current FLIP lacks of consideration of the work on
> changelog
> > > state-backend once two new state types are introduced.
> >
> >
> >
> > >
> > >
> > > [1]
> > >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtim
> > > e
> > >
> /src/main/java/org/apache/flink/table/runtime/functions/aggregate/MinWith
> > > Ret ractAggFunction.java
> >
> >
> >
> > > Best,
> > > Yun Tang
> > > ________________________________
> > > From: David Morávek <david.mora...@gmail.com>
> > > Sent: Wednesday, April 13, 2022 19:50
> > > To: dev <dev@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> > >
> > >
> > >
> > > Here is a very naive implementation [1] from a prototype I did few
> months
> > > back that uses list and insertion sort. Since the list is sorted we can
> > > use
>  binary search to create sub-list, that could leverage the same thing
> > > I've described above.
> > >
> > >
> > >
> > > I think back then I didn't go for the SortedMap as it would be hard to
> > > implement with the current heap state backend internals and would have
> > > bigger memory overhead.
> > >
> > >
> > >
> > > The ideal solution would probably use skip list [2] to lower the
> overhead
> > > of the binary search, while maintaining a reasonable memory footprint.
> > > Other than that it could be pretty much the same as the prototype
> > > implementation [1].
> > >
> > >
> > >
> > > [1]
> > >
> https://github.com/dmvk/flink/blob/ecdbc774b13b515e8c0943b2c143fb1e34eca6f
> > > 0/
> > >
> flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapTempo
> > > ral ListState.java
> >
> >  [2] https://en.wikipedia.org/wiki/Skip_list
> >
> > >
> > >
> > > Best,
> > > D.
> > >
> > >
> > >
> > > On Wed, Apr 13, 2022 at 1:27 PM David Morávek <david.mora...@gmail.com
> >
> > > wrote:
> > >
> > >
> > >
> > >
> > > > Hi David,
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > It seems to me that at least with the heap-based state backend,
> > > > readRange
> > > >
> > > >
> > > >
> > > >> is going to have to do a lot of unnecessary work to implement this
> > > >> isEmpty() operation, since it have will to consider the entire range
> > > >> from
> > > >> MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit isEmpty
> > > >> method?
> > > >> I'm not convinced we need it, but it would be cheaper to implement.
> Or
> > > >> perhaps this join can be rewritten to not need this operation; I
> > > >> haven't
> > > >> thought enough about that alternative.)
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > I think this really boils down to how the returned iterable is going
> to
> > > > be
> > > > implemented. Basically for checking whether state is empty, you need
> to
> > > > do
> > > > something along the lines of:
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Iterables.isEmpty(state.readRange(Long.MIN_VALUE, MAX_VALUE)); //
> > > > basically checking `hasNext() == false` or `isEmpty()` in case of
> > > > `Collection`
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Few notes:
> > > > 1) It could be lazy (the underlying collection doesn't have to be
> > > > materialized - eg. in case of RocksDB);
> > > > 2) For HeapStateBackend it depends on the underlying implementation.
> > > > I'd
> > > > probably do something along the lines of sorted tree (eg. SortedMap /
> > > > NavigableMap), that allows effective range scans / range deletes.
> Then
> > > > you
> > > > could simply do something like (from top of the head):
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > @Value
> > > > class TimestampedKey<K> {
> > > >
> > > >
> > > >
> > > >   K key;
> > > >   long timestamap;
> > > >
> > > >
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > SortedMap<TimestampedKey<K>, V> internalState;
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Iterable<TimestampedValue<V>> readRange(long min, long max) {
> > > >
> > > >
> > > >
> > > >   return toIterable(internalState.subMap(new
> > > >   TimestampedKey(currentKey(),
> > > >
> > > >
> > > >
> > > > min), new TimestampedKey(currentKey(), max)));
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > This should be fairly cheap. The important bit is that the returned
> > > > iterator is always non-null, but could be empty.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Does that answer your question?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > D.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Apr 13, 2022 at 12:21 PM David Anderson <
> da...@alpinegizmo.com>
> > > > wrote:
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >> Yun Tang and Jingsong,
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Some flavor of OrderedMapState is certainly feasible, and I do see
> > > >> some
> > > >> appeal in supporting Binary**State.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> However, I haven't seen a motivating use case for this
> generalization,
> > > >> and
> > > >> would rather keep this as simple as possible. By handling Longs we
> can
> > > >> already optimize a wide range of use cases.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> David
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> >  Hi David,
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > Could you share some explanations why SortedMapState cannot work
> in
> > > >> > details? I just cannot catch up what the statement below means:
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > This was rejected as being overly difficult to implement in a way
> > > >> > that
> > > >> > would cleanly leverage RocksDB’s iterators.
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > Best
> > > >> > Yun Tang
> > > >> > ________________________________
> > > >> > From: Aitozi <gjying1...@gmail.com>
> > > >> > Sent: Tuesday, April 12, 2022 15:00
> > > >> > To: dev@flink.apache.org <dev@flink.apache.org>
> > > >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > Hi David
> > > >> >
> > > >> >
> > > >> >
> > > >> >      I have look through the doc, I think it will be a good
> > > >> >      improvement
> > > >>
> > > >>
> > > >>
> > > >> to
> > > >>
> > > >>
> > > >>
> > > >> > this pattern usage, I'm interested in it. Do you have some POC
> work
> > > >> > to
> > > >> > share for a closer look.
> > > >> > Besides, I have one question that can we support expose the
> > > >> > namespace
> > > >> > in
> > > >> > the different state type not limited to `TemporalState`. By this,
> > > >> > user
> > > >>
> > > >>
> > > >>
> > > >> can
> > > >>
> > > >>
> > > >>
> > > >> > specify the namespace
> > > >> > and the TemporalState is one of the special case that it use
> > > >> > timestamp
> > > >>
> > > >>
> > > >>
> > > >> as
> > > >>
> > > >>
> > > >>
> > > >> > the namespace. I think it will be more extendable.
> > > >> >
> > > >> >
> > > >> >
> > > >> >     What do you think about this ?
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > Best,
> > > >> > Aitozi.
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > David Anderson <dander...@apache.org> 于2022年4月11日周一 20:54写道:
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > > Greetings, Flink developers.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > I would like to open up a discussion of a proposal [1] to add a
> > > >> > > new
> > > >>
> > > >>
> > > >>
> > > >> kind
> > > >>
> > > >>
> > > >>
> > > >> > of
> > > >> >
> > > >> >
> > > >> >
> > > >> > > state to Flink.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > The goal here is to optimize a fairly common pattern, which is
> > > >> > > using
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > MapState<Long, List<Event>>
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > to store lists of events associated with timestamps. This
> pattern
> > > >> > > is
> > > >>
> > > >>
> > > >>
> > > >> used
> > > >>
> > > >>
> > > >>
> > > >> > > internally in quite a few operators that implement sorting and
> > > >> > > joins,
> > > >>
> > > >>
> > > >>
> > > >> and
> > > >>
> > > >>
> > > >>
> > > >> > > it also shows up in user code, for example, when implementing
> > > >> > > custom
> > > >> > > windowing in a KeyedProcessFunction.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that
> > > >> > > achieves
> > > >>
> > > >>
> > > >>
> > > >> a
> > > >>
> > > >>
> > > >>
> > > >> > > more than 2x improvement in throughput when performing these
> > > >>
> > > >>
> > > >>
> > > >> operations
> > > >>
> > > >>
> > > >>
> > > >> > on
> > > >> >
> > > >> >
> > > >> >
> > > >> > > RocksDB by better leveraging the capabilities of the RocksDB
> state
> > > >> >
> > > >> >
> > > >> >
> > > >> > backend.
> > > >> >
> > > >> >
> > > >> >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > See FLIP-220 [1] for details.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Best,
> > > >> > > David
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > > >
> >
> >
> > Dr. Nico Kruber | Solutions Architect
> >
> > Follow us @VervericaData Ververica
> > --
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> > Wehner
> >
> >
>
>
> --
> Dr. Nico Kruber | Solutions Architect
>
> Follow us @VervericaData Ververica
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>
>

Reply via email to