Hi all,
Yun, David M, David A, and I had an offline discussion and talked through a
couple of details that emerged from the discussion here. We believe, we have
found a consensus on these points and would like to share our points for
further feedback:
Let me try to get through the points that were opened in arbitrary order:
1. We want to offer a generic interface for sorted state, not just temporal
state as proposed initially. We would like to...
a) ...offer a single new state type similar to what TemporalListState was
offering (so not offering something like TemporalValueState to keep the API
slim).
b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values
(I'll go into the API further below) - the naming stresses on "Binary" because
we want to make clear that this is what the sort will be based on (see below)
c) ...have our own state descriptor (BinarySortedMultiMapStateDescriptor<UK,
UV>) similar to MapStateDescriptor<UK, UV>
d) ...require TypeSerializer implementations for the key to extend from
LexicographicalTypeSerializer (details below)
2. LexicographicalTypeSerializer basically defines the sort order when
retrieving values: it is based on the serialized binaries, comparing them one-
by-one in an unsigned fashion. For heap-based state backends, these
serializers can also optionally define a Comparator that doesn't require
serialization but needs to retain the same sort order. We would provide
implementations of the range-based operations that will iterate based on
binary keys if this is not provided (by simply converting all relevant keys to
their binary form and using an appropriate comparator).
```
public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T> {
default Optional<Comparator<T>> findComparator() {
return Optional<Comparator<T>>.empty()
}
}
```
3. BinarySortedMultiMap<UK, UV> should offer the following API
```
public class BinarySortedMultiMap<UK, UV> extends State {
void put(UK key, Collection<UV>) throws Exception;
void add(UK key, UV value) throws Exception;
Map.Entry<UK, UV> entryAt(UK key) throws Exception;
Map.Entry<UK, UV> firstEntry() throws Exception;
Map.Entry<UK, UV> lastEntry() throws Exception;
Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws
Exception;
Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception;
Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception;
Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws
Exception;
Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws Exception;
Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey) throws
Exception;
}
```
That's for the core of the points - following a few more things that came up
and some arguments about the "why":
A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]?
-> We looked at various use cases and didn't find a strong need because you
could always fall back to readRange*. In the interest of a slim API, we
thought it would be best to start without these (we can always add them later)
A2) Should we support iterating backwards?
-> We haven't found a compelling use case that needs this. If you need it, at
least for some (?) use cases, you could negate the sort order through the
serializer and achieve the same thing (unless you need to walk in two
directions). Let's rather start with a slim API.
A3) Lazy vs. eager iteration
-> Let's implement our iterators similarly to RocksDBMapIterator by eagerly
retrieving a couple of values (defaults to 128 here) into a cache. This seems
to be the best of both worlds without bloating the API
A4) ChangelogStateBackend
-> Since we require TypeSerializer implementations for the key and those know
the length to serializer (from other requirements, e.g. in the network stack),
it isn't affected by our change except for delegating the new operations to
the underlying state backend.
A5) Defining the binary sort order as one-by-one with unsigned bytes is fine
because it is a very common thing among storage systems. Should a different
binary-based state backend require something else, there could be a mapping
function translating between different definitions.
A6) How to handle Duplicates
-> We let the user handle this by storing a multi-map, i.e. multiple values
for the (primary) sort key. If needed, users can sort these values manually.
As long as we don't have a strong use case where this is not feasible, we
don't need any implicit duplicate handling by the framework (Flink).
A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator
-> We propose to use readRange*** because that seems more explicit/intuitive
in what this is doing.
A8) readRange*** with inclusive/exclusive switch
-> In the interest of a slim API, let's not provide that. The API above will
interpret all keys as _inclusive_ and should a user need exclusive behaviour,
they would in the worst case read one more entry - in most of the cases,
however, this would be served from the cache anyway, so it's not much of a
problem
A9) Why don't we want to provide a BinarySortedMap with value-like semantics
similar to TemporalValueState?
-> We'd like to keep the code overhead in Flink small and not provide two more
state primitives but instead only a single one. For use cases where you don't
want to handle lists, you can use the BinarySortedMultiMap with its put()
method and a list with a single entry that would overwrite the old one. While
retrieving the value(s), you can then assume the list is either empty or has a
single entry similar to what you are currently doing in a
WindowProcessFunction. You can also always add a thin wrapper to provide that
under a more convenient API if you need to.
A10) effects on the CEPOperator?
-> We don't have an overview yet. The buffering of events inside its
`MapState<Long, List<IN>> elementQueueState`, however, is a pattern that would
benefit from our MultiMap since a single add() operation wouldn't require you
to read the whole list again.
Sorry for the long email - we'd be happy to get more feedback and will
incorporate this into the FLIP description soon.
Nico
--
Dr. Nico Kruber | Solutions Architect
Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner