Zooming in from generic philosophy to be clear: adding time ordered buffer to the Fn state API is *not* a shortcut.It has benefits that will not be achieved by SDK-side implementation on top of either ordered or unordered multimap. Are those benefits worth expanding the API? I don't know.
A change to allow a runner to have a specialized implementation for time-buffered state would be one or more StateKey types, right? Reuven, maybe put this and your Java API in a doc? A BIP? Seems like there's at least the following to explore: - how that Java API would map to an SDK-side implementation on top of multimap state key - how that Java API would map to a new StateKey - whether there's actually more than one relevant implementation of that StateKey - whether SDK-side implementation on some other state key would be performant enough in all SDK languages (present and future) Zooming back out to generic philosophy: Proliferation of StateKey types tuned by runners (which can very easily still share implementation) is probably better than proliferation of complex SDK-side implementations with varying completeness and performance. Kenn On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <[email protected]> wrote: > It might help for me to describe what I have in mind. I'm still proposing > that we build multimap, just not a globally-sorted multimap. > > My previous proposal was that we provide a Multimap<Key, Value> state > type, sorted by key. this would have two additional operations - > multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey, > endKey). The primary use case was timestamp sorting, but I felt that a > sorted multimap provided a nice generalization - after all, you can simply > key the multimap by timestamp to get timestamp sorting. > > This approach had some issues immediately that would take some work to > solve. Since a multimap key can have any type and a runner will only be > able to sort by encoded type, we would need to introduce a concept of > order-preserving coders into Beam and plumb that through. Robert pointed > out that even our existing standard coders for simple integral types don't > preserve order, so there will likely be surprises here. > > My current proposal is for a multimap that is not sorted by key, but that > can support.ordered values for a single key. Remember that a multimap maps > K -> Iterable<V>, so this means that each individual Iterable<V> is > ordered, but the keys have no specific order relative to each other. This > is not too different from many multimap implementations where the keys are > unordered, but the list of values for a single key at least has a stable > order. > > The interface would look like this: > > public interface MultimapState<K, V> extends State { > // Add a value with a default timestamp. > void put(K key, V value); > > // Add a timestamped value. > void put(K, key, TimestampedValue<V> value); > > // Remove all values for a key. > void remove (K key); > > // Remove all values for a key with timestamps within the specified > range. > void removeRange(K key, Instant startTs, Instant endTs); > > // Get an Iterable of values for V. The Iterable will be returned sorted > by timestamp. > ReadableState<Iterable<TimestampedValue<V>>> get(K key); > > // Get an Iterable of values for V in the specified range. The Iterable > will be returned sorted by timestamp. > ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, Instant > startTs, Instant endTs); > > ReadableState<Iterable<K>> keys(); > ReadableState<Iterable<TimestampedValue<V>>> values(); > ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries; > } > > We can of course provide helper functions that allow using MultimapState > without deailing with TimestampValue for users who only want a multimap and > don't want sorting. > > I think many users will only need a single sorted list - not a full > multimap. It's worth offering this as well, and we can simply build it on > top of MultimapState. It will look like an extension of BagState > > public interface TimestampSortedListState<T> extends State { > void add(TimestampedValue<T> value); > Iterable<TimestampedValue<T>> read(); > Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant endTs); > void clearRange(Instant startTs, Instant endTs); > } > > > On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <[email protected]> wrote: > >> The portability layer is meant to live across multiple versions of Beam >> and I don't think it should be treated by doing the simple and useful thing >> now since I believe it will lead to a proliferation of the API. >> >> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <[email protected]> wrote: >> >>> I have thoughts on the subject of whether to have APIs just for the >>> lowest-level building blocks versus having APIs for higher-level >>> constructs. Specifically this applies to providing only unsorted multimap >>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on >>> time-ordered buffer; if it turns out to be easy to go all the way to sorted >>> multimap that's nice-to-have; if it turns out to be easy to implement on >>> top of unsorted map state that should probably be under the hood >>> >>> Reasons to build low-level multimap in the runner & fn api and layer >>> higher-level things in the SDK: >>> >>> - It is less implementation for runners if they have to only provide >>> fewer lower-level building blocks like multimap state. >>> - There are many more runners than SDKs (and will be even more and >>> more) so this saves overall. >>> >>> Reasons to build higher-level constructs directly in the runner and fn >>> api: >>> >>> - Having multiple higher-level state types may actually be less >>> implementation than one complex state type, especially if they map to >>> runner primitives. >>> - The runner may have better specialized implementations, especially >>> for something like a time-ordered buffer. >>> - The particular access patterns in an SDK-based implementation may not >>> be ideal for each runner's underlying implementation of the low-level >>> building block. >>> - There may be excessive gRPC overhead even for optimal access patterns. >>> >>> There are ways to have best of both worlds, like: >>> >>> 1. Define multiple state types according to fundamental access patterns, >>> like we did this before portability. >>> 2. If it is easy to layer one on top of the other, do that inside the >>> runner. Provide shared code so for runners providing the lowest-level >>> primitive they get all the types for free. >>> >>> I understand that this is an oversimplification. It still creates some >>> more work. And APIs are a burden so it is good to introduce as few as >>> possible for maintenance. But it has performance benefits and also unblocks >>> "just doing the simple and useful thing now" which I always like to do as >>> long as it is compatible with future changes. If the APIs are fundamental, >>> like sets, maps, timestamp ordering, then it is safe to guess that they >>> will change rarely and be useful forever. >>> >>> Kenn >>> >>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <[email protected]> wrote: >>> >>>> I would be glad to take a stab at how to provide sorting on top of >>>> unsorted multimap state. >>>> Based upon your description, you want integer keys representing >>>> timestamps and arbitrary user value for the values, is that correct? >>>> What kinds of operations do you need on the sorted map state in order >>>> of efficiency requirements? >>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y)) >>>> What kinds of operations do we expect the underlying unsorted map state >>>> to be able to provide? >>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. >>>> enumerate(K)?) >>>> >>>> I went through a similar exercise of how to provide a list like side >>>> input view over a multimap[1] side input which efficiently allowed >>>> computation of size and provided random access while only having access to >>>> get(K) and enumerate K's. >>>> >>>> 1: >>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 >>>> >>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Bringing this subject up again, >>>>> >>>>> I've spent some time looking into implementing this for the Dataflow >>>>> runner. I'm unable to find a way to implement the arbitrary sorted >>>>> multimap >>>>> efficiently for the case where there are large numbers of unique keys. >>>>> Since the primary driving use case is timestamp ordering (i.e. key is >>>>> event >>>>> timestamp), you would expect to have nearly a new key per element. I >>>>> considered Luke's suggestion above, but unfortunately it doesn't really >>>>> solve this issue. >>>>> >>>>> The primary use case for sorting always seems to be sorting by >>>>> timestamp. I want to propose that instead of building the fully-general >>>>> sorted multimap, we instead focus on a state type where the sort key is an >>>>> integral type (like a timestamp or an integer). There is still a valid use >>>>> case for multimap, but we can provide that as an unordered state. At least >>>>> for Dataflow, it will be much easier >>>>> >>>>> While my difficulties here may be specific to the Dataflow runner, any >>>>> such support would have to be built into other runners as well, and >>>>> limiting to integral sorting likely makes it easier for other runners to >>>>> implement this. Also, if you look at this >>>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >>>>> Flink >>>>> comment pointed out by Aljoscha, for Flink the main use case identified >>>>> was >>>>> also timestamp sorting. This will also simplify the API design for this >>>>> feature: Sorted multimap with arbitrary keys would require us to introduce >>>>> a way of mapping natural ordering to encoded ordering (i.e. a new >>>>> OrderPreservingCoder), but if we limit sort keys to integral types, the >>>>> API >>>>> design is simpler as integral types can be represented directly. >>>>> >>>>> Reuven >>>>> >>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> This sounds to me like a potential runner strategy. However if a >>>>>> runner can natively support sorted maps (e.g. we expect the Dataflow >>>>>> runner >>>>>> to be able to do so, and I think it would be useful for other runners as >>>>>> well), then it's probably preferable to allow the runner to use its >>>>>> native >>>>>> capabilities. >>>>>> >>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> For the API that you proposed, the map key is always "void" and the >>>>>>> sort key == user key. So in my example of >>>>>>> key: dummy value >>>>>>> key.000: token, (0001, value4) >>>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>>> key.01: token >>>>>>> key.1: token, (1011, value3) >>>>>>> you would have: >>>>>>> "void": dummy value >>>>>>> "void".000: token, (0001, value4) >>>>>>> "void".001: token, (0010, value1), (0011, value2) >>>>>>> "void".01: token >>>>>>> "void".1: token, (1011, value3) >>>>>>> >>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the >>>>>>> the prefixes until you find a common prefix for K and then filter for >>>>>>> values where they have a sort key <= K. Using the example above, to find >>>>>>> entriesUntil(0010) you would: >>>>>>> look for key."", miss >>>>>>> look for key.0, miss >>>>>>> look for key.00, miss >>>>>>> look for key.000, hit, sort all contained values using secondary >>>>>>> key, provide value4 to user >>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we >>>>>>> sort all contained values using secondary key, filter out value2 and >>>>>>> provide value1 >>>>>>> >>>>>>> void removeUntil(K limit) also translates into walking the prefixes >>>>>>> but instead we will clear them when we have a "hit" with some special >>>>>>> logic >>>>>>> for when the sort key is a prefix of the key. Used the example, to >>>>>>> removeUntil(0010) you would: >>>>>>> look for key."", miss >>>>>>> look for key.0, miss >>>>>>> look for key.00, miss >>>>>>> look for key.000, hit, clear >>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we >>>>>>> sort all contained values using secondary key, store in memory all >>>>>>> values >>>>>>> that > 0010, clear and append values stored in memory. >>>>>>> >>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Can you explain how fetching and deleting ranges of keys would work >>>>>>>> with this data structure? >>>>>>>> >>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Reuven, for the example, I assume that we never want to store more >>>>>>>>> then 2 values at a given sort key prefix, and if we do then we will >>>>>>>>> create >>>>>>>>> a new longer prefix splitting up the values based upon the sort key. >>>>>>>>> >>>>>>>>> Tuple representation in examples below is (key, sort key, value) >>>>>>>>> and . is a character outside of the alphabet which can be represented >>>>>>>>> by >>>>>>>>> using an escaping encoding that wraps the key + sort key encoding. >>>>>>>>> >>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes >>>>>>>>> of 0010 finding one that is not empty. In this case its 0, so we >>>>>>>>> append >>>>>>>>> value to the map at key.0 ending up with (we also set the key to any >>>>>>>>> dummy >>>>>>>>> value to know that it it contains values): >>>>>>>>> key: dummy value >>>>>>>>> key."": token, (0010, value1) >>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all the >>>>>>>>> prefixes of 0010, finding "", so we append value2 to key."" ending up >>>>>>>>> with: >>>>>>>>> key: dummy value >>>>>>>>> key."": token, (0010, value1), (0011, value2) >>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all the >>>>>>>>> prefixes of 1011 finding "" but notice that it is full, so we >>>>>>>>> partition all >>>>>>>>> the values into two prefixes 0 and 1. We also clear the "" prefix >>>>>>>>> ending up >>>>>>>>> with: >>>>>>>>> key: dummy value >>>>>>>>> key.0: token, (0010, value1), (0011, value2) >>>>>>>>> key.1: token, (1011, value3) >>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all the >>>>>>>>> prefixes of the value finding 0 but notice that it is full, so we >>>>>>>>> partition >>>>>>>>> all the values into two prefixes 00 and 01 but notice this doesn't >>>>>>>>> help us >>>>>>>>> since 00 will be too full so we split 00 again to 000, 001. We also >>>>>>>>> clear >>>>>>>>> the 0 prefix ending up with: >>>>>>>>> key: dummy value >>>>>>>>> key.000: token, (0001, value4) >>>>>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>>>>> key.01: token >>>>>>>>> key.1: token, (1011, value3) >>>>>>>>> >>>>>>>>> We are effectively building a trie[1] where we only have values at >>>>>>>>> the leaves and control how full each leaf can be. There are other trie >>>>>>>>> representations like a radix tree that may be better. >>>>>>>>> >>>>>>>>> Looking up the values in sorted order for "key" would go like this: >>>>>>>>> Is key set, yes >>>>>>>>> look for key."", miss >>>>>>>>> look for key.0, miss >>>>>>>>> look for key.00, miss >>>>>>>>> look for key.000, hit, sort all contained values using secondary >>>>>>>>> key, provide value4 to user >>>>>>>>> look for key.001, hit, sort all contained values using secondary >>>>>>>>> key, provide value1 followed by value2 to user >>>>>>>>> look for key.01, hit, empty, return no values to user >>>>>>>>> look for key.1, hit, sort all contained values using secondary >>>>>>>>> key, provide value3 to user >>>>>>>>> we have walked the entire prefix space, signal end of iterable >>>>>>>>> >>>>>>>>> Some notes for the above: >>>>>>>>> * The dummy value is used to know that the key contains values and >>>>>>>>> the token is to know whether there are any values deeper in the trie >>>>>>>>> so >>>>>>>>> when we know when to stop searching. >>>>>>>>> * If we can recalculate the sort key from the combination of the >>>>>>>>> key and value, then we don't need to store it. >>>>>>>>> * Keys with lots of values will perform worse then keys with less >>>>>>>>> values since we have to look up more keys but they will be empty >>>>>>>>> reads. The >>>>>>>>> number of misses can be controlled by how many elements we are >>>>>>>>> willing to >>>>>>>>> store at a given node before we subdivide. >>>>>>>>> >>>>>>>>> In reality you could build a lot of structures (e.g. red black >>>>>>>>> tree, binary tree) using the sort key, the issue is the cost of >>>>>>>>> rebalancing/re-organizing the structure in map form and whether it >>>>>>>>> has a >>>>>>>>> convenient pre-order traversal for lookups. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Some great comments! >>>>>>>>>> >>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by >>>>>>>>>> runners to be efficient. We can of course provide a default >>>>>>>>>> (inefficient) >>>>>>>>>> implementation, but ideally runners would provide better ones. >>>>>>>>>> >>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by >>>>>>>>>> this. E.g. >>>>>>>>>> >>>>>>>>>> *Robert* Great point about standard coders not satisfying this. >>>>>>>>>> That's why I suggested that we provide a way to tag the coders that >>>>>>>>>> do >>>>>>>>>> preserve order, and only accept those as key coders Alternatively we >>>>>>>>>> could >>>>>>>>>> present a more limited API - e.g. only allowing a hard-coded set of >>>>>>>>>> types >>>>>>>>>> to be used as keys - but that seems counter to the direction Beam >>>>>>>>>> usually >>>>>>>>>> goes. So users will have two ways .of creating multimap state specs: >>>>>>>>>> >>>>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>>>> >>>>>>>>>> or >>>>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>>>> >>>>>>>>>> The second one will validate that the key coder preserves order, >>>>>>>>>> and fails otherwise (similar to coder determinism checking in >>>>>>>>>> GroupByKey). >>>>>>>>>> (BTW we would also have versions of these functions that use coder >>>>>>>>>> inference to "guess" the coder, but those will do the same checking) >>>>>>>>>> >>>>>>>>>> Also the API I proposed did support random access! We could >>>>>>>>>> separate out OrderedBagState again if we think the use cases are >>>>>>>>>> fundamentally different. I merged the proposal into that of >>>>>>>>>> MultimapState >>>>>>>>>> because there seemed be 99% overlap. >>>>>>>>>> >>>>>>>>>> Reuven >>>>>>>>>> >>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> A few obvious problems with this code: >>>>>>>>>>> >>>>> 1. Removing the elements already processed from the bag >>>>>>>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in >>>>>>>>>>> the >>>>>>>>>>> number of input trades. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state? >>>>>>>>>>> >>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State { >>>>>>>>>>> >>>>> // Add a value to the map. >>>>>>>>>>> >>>>> void put(K key, V value); >>>>>>>>>>> >>>>> // Get all values for a given key. >>>>>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >>>>>>>>>>> >>>>> // Return all entries in the map. >>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >>>>>>>>>>> >>>>> // Return all entries in the map with keys <= limit. >>>>>>>>>>> returned elements are sorted by the key. >>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> // Remove all values with the given key; >>>>>>>>>>> >>>>> void remove(K key); >>>>>>>>>>> >>>>> // Remove all entries in the map with keys <= limit. >>>>>>>>>>> >>>>> void removeUntil(K limit); >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove >>>>>>>>>>> all entries in the map with keys < limit. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key. >>>>>>>>>>> In order to make this easier for users, I propose that we introduce >>>>>>>>>>> a new >>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this tag >>>>>>>>>>> guarantees >>>>>>>>>>> that the encoded value preserves the same ordering as the base Java >>>>>>>>>>> type. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Could you clarify what is "encoded value preserves the >>>>>>>>>>> same ordering as the base Java type"? >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> Lets say A and B represent two different instances of the >>>>>>>>>>> same Java type like a double, then A < B (using the languages >>>>>>>>>>> comparison >>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are >>>>>>>>>>> compared >>>>>>>>>>> lexicographically) >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff >>>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What >>>>>>>>>>> happens A, >>>>>>>>>>> B sort differently in different languages? >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > That would have to be the property of the coder (which means >>>>>>>>>>> that this property probably needs to be represented in the >>>>>>>>>>> portability >>>>>>>>>>> representation of the coder). I imagine the common use cases will >>>>>>>>>>> be for >>>>>>>>>>> simple coders like int, long, string, etc., which are likely to >>>>>>>>>>> sort the >>>>>>>>>>> same in most languages. >>>>>>>>>>> >>>>>>>>>>> The standard coders for both double and integral types do not >>>>>>>>>>> respect >>>>>>>>>>> the natural ordering (consider negative values). KV coders >>>>>>>>>>> violate the >>>>>>>>>>> "natural" lexicographic ordering on components as well. I think >>>>>>>>>>> implicitly sorting on encoded value would yield many surprises. >>>>>>>>>>> (The >>>>>>>>>>> state, of course, could take a order-preserving, bytes >>>>>>>>>>> (string?)-producing callable as a parameter of course). (As for >>>>>>>>>>> naming, I'd probably call this OrderedBagState or something like >>>>>>>>>>> that...rather than Map which tends to imply random access.) >>>>>>>>>>> >>>>>>>>>>
