Hey folks, Sry I'm late to this thread but this might be very helpful for the problem we're dealing with. Do we have a design doc or a jira ticket I can follow?
Cheers, Catlyn On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <[email protected]> wrote: > My questions were just an example. I fully agree there is a fundamental > need for a sorted state (of some form, and I also think this links to > efficient implementation of retrations) - I was reacting to Kenn's question > about BIP. This one would be pretty nice example why it would be good to > have such a "process" - not everything can be solved on ML and there are > fundamental decisions that might need a closer attention. > On 6/18/20 5:28 PM, Reuven Lax wrote: > > Jan - my proposal is exactly TimeSortedBagState (more accurately - > TimeSortedListState), though I went a bit further and also proposed a way > to have a dynamic number of tagged TimeSortedBagStates. > > You are correct that the runner doesn't really have to store the data time > sorted - what's actually needed is the ability to fetch and remove > timestamp ranges of data (though that does include fetching the entire > list); TimeOrderedState is probably a more accurate name then > TimeSortedState. I don't think we could get away with operations that only > act on the smallest timestamp, however we could limit the API to only being > able to fetch and remove prefixes of data (ordered by timestamp). However > if we support prefixes, we might as well support arbitrary subranges. > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <[email protected]> wrote: > >> Big +1 for a BIP, as this might really help clarify all the pros and cons >> of all possibilities. There seem to be questions that need answering and >> motivating use cases - do we need sorted map state or can we solve our use >> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does >> that really have to be time-sorted structure, or does it "only" have to >> have operations that can efficiently find and remove element with smallest >> timestamp (like a PriorityQueue)? >> >> Jan >> On 6/18/20 5:32 AM, Kenneth Knowles wrote: >> >> Zooming in from generic philosophy to be clear: adding time ordered >> buffer to the Fn state API is *not* a shortcut.It has benefits that will >> not be achieved by SDK-side implementation on top of either ordered or >> unordered multimap. Are those benefits worth expanding the API? I don't >> know. >> >> A change to allow a runner to have a specialized implementation for >> time-buffered state would be one or more StateKey types, right? Reuven, >> maybe put this and your Java API in a doc? A BIP? Seems like there's at >> least the following to explore: >> >> - how that Java API would map to an SDK-side implementation on top of >> multimap state key >> - how that Java API would map to a new StateKey >> - whether there's actually more than one relevant implementation of that >> StateKey >> - whether SDK-side implementation on some other state key would be >> performant enough in all SDK languages (present and future) >> >> Zooming back out to generic philosophy: Proliferation of StateKey >> types tuned by runners (which can very easily still share implementation) >> is probably better than proliferation of complex SDK-side implementations >> with varying completeness and performance. >> >> Kenn >> >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <[email protected]> wrote: >> >>> It might help for me to describe what I have in mind. I'm still >>> proposing that we build multimap, just not a globally-sorted multimap. >>> >>> My previous proposal was that we provide a Multimap<Key, Value> state >>> type, sorted by key. this would have two additional operations - >>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey, >>> endKey). The primary use case was timestamp sorting, but I felt that a >>> sorted multimap provided a nice generalization - after all, you can simply >>> key the multimap by timestamp to get timestamp sorting. >>> >>> This approach had some issues immediately that would take some work to >>> solve. Since a multimap key can have any type and a runner will only be >>> able to sort by encoded type, we would need to introduce a concept of >>> order-preserving coders into Beam and plumb that through. Robert pointed >>> out that even our existing standard coders for simple integral types don't >>> preserve order, so there will likely be surprises here. >>> >>> My current proposal is for a multimap that is not sorted by key, but >>> that can support.ordered values for a single key. Remember that a multimap >>> maps K -> Iterable<V>, so this means that each individual Iterable<V> is >>> ordered, but the keys have no specific order relative to each other. This >>> is not too different from many multimap implementations where the keys are >>> unordered, but the list of values for a single key at least has a stable >>> order. >>> >>> The interface would look like this: >>> >>> public interface MultimapState<K, V> extends State { >>> // Add a value with a default timestamp. >>> void put(K key, V value); >>> >>> // Add a timestamped value. >>> void put(K, key, TimestampedValue<V> value); >>> >>> // Remove all values for a key. >>> void remove (K key); >>> >>> // Remove all values for a key with timestamps within the specified >>> range. >>> void removeRange(K key, Instant startTs, Instant endTs); >>> >>> // Get an Iterable of values for V. The Iterable will be returned >>> sorted by timestamp. >>> ReadableState<Iterable<TimestampedValue<V>>> get(K key); >>> >>> // Get an Iterable of values for V in the specified range. The >>> Iterable will be returned sorted by timestamp. >>> ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, Instant >>> startTs, Instant endTs); >>> >>> ReadableState<Iterable<K>> keys(); >>> ReadableState<Iterable<TimestampedValue<V>>> values(); >>> ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries; >>> } >>> >>> We can of course provide helper functions that allow using MultimapState >>> without deailing with TimestampValue for users who only want a multimap and >>> don't want sorting. >>> >>> I think many users will only need a single sorted list - not a full >>> multimap. It's worth offering this as well, and we can simply build it on >>> top of MultimapState. It will look like an extension of BagState >>> >>> public interface TimestampSortedListState<T> extends State { >>> void add(TimestampedValue<T> value); >>> Iterable<TimestampedValue<T>> read(); >>> Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant >>> endTs); >>> void clearRange(Instant startTs, Instant endTs); >>> } >>> >>> >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <[email protected]> wrote: >>> >>>> The portability layer is meant to live across multiple versions of Beam >>>> and I don't think it should be treated by doing the simple and useful thing >>>> now since I believe it will lead to a proliferation of the API. >>>> >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >>>>> I have thoughts on the subject of whether to have APIs just for the >>>>> lowest-level building blocks versus having APIs for higher-level >>>>> constructs. Specifically this applies to providing only unsorted multimap >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on >>>>> time-ordered buffer; if it turns out to be easy to go all the way to >>>>> sorted >>>>> multimap that's nice-to-have; if it turns out to be easy to implement on >>>>> top of unsorted map state that should probably be under the hood >>>>> >>>>> Reasons to build low-level multimap in the runner & fn api and layer >>>>> higher-level things in the SDK: >>>>> >>>>> - It is less implementation for runners if they have to only provide >>>>> fewer lower-level building blocks like multimap state. >>>>> - There are many more runners than SDKs (and will be even more and >>>>> more) so this saves overall. >>>>> >>>>> Reasons to build higher-level constructs directly in the runner and fn >>>>> api: >>>>> >>>>> - Having multiple higher-level state types may actually be less >>>>> implementation than one complex state type, especially if they map to >>>>> runner primitives. >>>>> - The runner may have better specialized implementations, especially >>>>> for something like a time-ordered buffer. >>>>> - The particular access patterns in an SDK-based implementation may >>>>> not be ideal for each runner's underlying implementation of the low-level >>>>> building block. >>>>> - There may be excessive gRPC overhead even for optimal access >>>>> patterns. >>>>> >>>>> There are ways to have best of both worlds, like: >>>>> >>>>> 1. Define multiple state types according to fundamental access >>>>> patterns, like we did this before portability. >>>>> 2. If it is easy to layer one on top of the other, do that inside the >>>>> runner. Provide shared code so for runners providing the lowest-level >>>>> primitive they get all the types for free. >>>>> >>>>> I understand that this is an oversimplification. It still creates some >>>>> more work. And APIs are a burden so it is good to introduce as few as >>>>> possible for maintenance. But it has performance benefits and also >>>>> unblocks >>>>> "just doing the simple and useful thing now" which I always like to do as >>>>> long as it is compatible with future changes. If the APIs are fundamental, >>>>> like sets, maps, timestamp ordering, then it is safe to guess that they >>>>> will change rarely and be useful forever. >>>>> >>>>> Kenn >>>>> >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> I would be glad to take a stab at how to provide sorting on top of >>>>>> unsorted multimap state. >>>>>> Based upon your description, you want integer keys representing >>>>>> timestamps and arbitrary user value for the values, is that correct? >>>>>> What kinds of operations do you need on the sorted map state in order >>>>>> of efficiency requirements? >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y)) >>>>>> What kinds of operations do we expect the underlying unsorted map >>>>>> state to be able to provide? >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. >>>>>> enumerate(K)?) >>>>>> >>>>>> I went through a similar exercise of how to provide a list like side >>>>>> input view over a multimap[1] side input which efficiently allowed >>>>>> computation of size and provided random access while only having access >>>>>> to >>>>>> get(K) and enumerate K's. >>>>>> >>>>>> 1: >>>>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 >>>>>> >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Bringing this subject up again, >>>>>>> >>>>>>> I've spent some time looking into implementing this for the Dataflow >>>>>>> runner. I'm unable to find a way to implement the arbitrary sorted >>>>>>> multimap >>>>>>> efficiently for the case where there are large numbers of unique keys. >>>>>>> Since the primary driving use case is timestamp ordering (i.e. key is >>>>>>> event >>>>>>> timestamp), you would expect to have nearly a new key per element. I >>>>>>> considered Luke's suggestion above, but unfortunately it doesn't really >>>>>>> solve this issue. >>>>>>> >>>>>>> The primary use case for sorting always seems to be sorting by >>>>>>> timestamp. I want to propose that instead of building the fully-general >>>>>>> sorted multimap, we instead focus on a state type where the sort key is >>>>>>> an >>>>>>> integral type (like a timestamp or an integer). There is still a valid >>>>>>> use >>>>>>> case for multimap, but we can provide that as an unordered state. At >>>>>>> least >>>>>>> for Dataflow, it will be much easier >>>>>>> >>>>>>> While my difficulties here may be specific to the Dataflow runner, >>>>>>> any such support would have to be built into other runners as well, and >>>>>>> limiting to integral sorting likely makes it easier for other runners to >>>>>>> implement this. Also, if you look at this >>>>>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >>>>>>> Flink >>>>>>> comment pointed out by Aljoscha, for Flink the main use case identified >>>>>>> was >>>>>>> also timestamp sorting. This will also simplify the API design for this >>>>>>> feature: Sorted multimap with arbitrary keys would require us to >>>>>>> introduce >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a new >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral types, the >>>>>>> API >>>>>>> design is simpler as integral types can be represented directly. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> This sounds to me like a potential runner strategy. However if a >>>>>>>> runner can natively support sorted maps (e.g. we expect the Dataflow >>>>>>>> runner >>>>>>>> to be able to do so, and I think it would be useful for other runners >>>>>>>> as >>>>>>>> well), then it's probably preferable to allow the runner to use its >>>>>>>> native >>>>>>>> capabilities. >>>>>>>> >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> For the API that you proposed, the map key is always "void" and >>>>>>>>> the sort key == user key. So in my example of >>>>>>>>> key: dummy value >>>>>>>>> key.000: token, (0001, value4) >>>>>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>>>>> key.01: token >>>>>>>>> key.1: token, (1011, value3) >>>>>>>>> you would have: >>>>>>>>> "void": dummy value >>>>>>>>> "void".000: token, (0001, value4) >>>>>>>>> "void".001: token, (0010, value1), (0011, value2) >>>>>>>>> "void".01: token >>>>>>>>> "void".1: token, (1011, value3) >>>>>>>>> >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking >>>>>>>>> the the prefixes until you find a common prefix for K and then filter >>>>>>>>> for >>>>>>>>> values where they have a sort key <= K. Using the example above, to >>>>>>>>> find >>>>>>>>> entriesUntil(0010) you would: >>>>>>>>> look for key."", miss >>>>>>>>> look for key.0, miss >>>>>>>>> look for key.00, miss >>>>>>>>> look for key.000, hit, sort all contained values using secondary >>>>>>>>> key, provide value4 to user >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we >>>>>>>>> sort all contained values using secondary key, filter out value2 and >>>>>>>>> provide value1 >>>>>>>>> >>>>>>>>> void removeUntil(K limit) also translates into walking the >>>>>>>>> prefixes but instead we will clear them when we have a "hit" with some >>>>>>>>> special logic for when the sort key is a prefix of the key. Used the >>>>>>>>> example, to removeUntil(0010) you would: >>>>>>>>> look for key."", miss >>>>>>>>> look for key.0, miss >>>>>>>>> look for key.00, miss >>>>>>>>> look for key.000, hit, clear >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we >>>>>>>>> sort all contained values using secondary key, store in memory all >>>>>>>>> values >>>>>>>>> that > 0010, clear and append values stored in memory. >>>>>>>>> >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Can you explain how fetching and deleting ranges of keys would >>>>>>>>>> work with this data structure? >>>>>>>>>> >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Reuven, for the example, I assume that we never want to store >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do then we >>>>>>>>>>> will >>>>>>>>>>> create a new longer prefix splitting up the values based upon the >>>>>>>>>>> sort key. >>>>>>>>>>> >>>>>>>>>>> Tuple representation in examples below is (key, sort key, value) >>>>>>>>>>> and . is a character outside of the alphabet which can be >>>>>>>>>>> represented by >>>>>>>>>>> using an escaping encoding that wraps the key + sort key encoding. >>>>>>>>>>> >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this case its 0, >>>>>>>>>>> so we >>>>>>>>>>> append value to the map at key.0 ending up with (we also set the >>>>>>>>>>> key to any >>>>>>>>>>> dummy value to know that it it contains values): >>>>>>>>>>> key: dummy value >>>>>>>>>>> key."": token, (0010, value1) >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to key."" >>>>>>>>>>> ending up >>>>>>>>>>> with: >>>>>>>>>>> key: dummy value >>>>>>>>>>> key."": token, (0010, value1), (0011, value2) >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full, so we >>>>>>>>>>> partition >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the "" >>>>>>>>>>> prefix >>>>>>>>>>> ending up with: >>>>>>>>>>> key: dummy value >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2) >>>>>>>>>>> key.1: token, (1011, value3) >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is full, so >>>>>>>>>>> we >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but notice this >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00 again to >>>>>>>>>>> 000, 001. >>>>>>>>>>> We also clear the 0 prefix ending up with: >>>>>>>>>>> key: dummy value >>>>>>>>>>> key.000: token, (0001, value4) >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>>>>>>> key.01: token >>>>>>>>>>> key.1: token, (1011, value3) >>>>>>>>>>> >>>>>>>>>>> We are effectively building a trie[1] where we only have values >>>>>>>>>>> at the leaves and control how full each leaf can be. There are >>>>>>>>>>> other trie >>>>>>>>>>> representations like a radix tree that may be better. >>>>>>>>>>> >>>>>>>>>>> Looking up the values in sorted order for "key" would go like >>>>>>>>>>> this: >>>>>>>>>>> Is key set, yes >>>>>>>>>>> look for key."", miss >>>>>>>>>>> look for key.0, miss >>>>>>>>>>> look for key.00, miss >>>>>>>>>>> look for key.000, hit, sort all contained values using secondary >>>>>>>>>>> key, provide value4 to user >>>>>>>>>>> look for key.001, hit, sort all contained values using secondary >>>>>>>>>>> key, provide value1 followed by value2 to user >>>>>>>>>>> look for key.01, hit, empty, return no values to user >>>>>>>>>>> look for key.1, hit, sort all contained values using secondary >>>>>>>>>>> key, provide value3 to user >>>>>>>>>>> we have walked the entire prefix space, signal end of iterable >>>>>>>>>>> >>>>>>>>>>> Some notes for the above: >>>>>>>>>>> * The dummy value is used to know that the key contains values >>>>>>>>>>> and the token is to know whether there are any values deeper in the >>>>>>>>>>> trie so >>>>>>>>>>> when we know when to stop searching. >>>>>>>>>>> * If we can recalculate the sort key from the combination of the >>>>>>>>>>> key and value, then we don't need to store it. >>>>>>>>>>> * Keys with lots of values will perform worse then keys with >>>>>>>>>>> less values since we have to look up more keys but they will be >>>>>>>>>>> empty >>>>>>>>>>> reads. The number of misses can be controlled by how many elements >>>>>>>>>>> we are >>>>>>>>>>> willing to store at a given node before we subdivide. >>>>>>>>>>> >>>>>>>>>>> In reality you could build a lot of structures (e.g. red black >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the cost of >>>>>>>>>>> rebalancing/re-organizing the structure in map form and whether it >>>>>>>>>>> has a >>>>>>>>>>> convenient pre-order traversal for lookups. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Some great comments! >>>>>>>>>>>> >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by >>>>>>>>>>>> runners to be efficient. We can of course provide a default >>>>>>>>>>>> (inefficient) >>>>>>>>>>>> implementation, but ideally runners would provide better ones. >>>>>>>>>>>> >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by >>>>>>>>>>>> this. E.g. >>>>>>>>>>>> >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag the >>>>>>>>>>>> coders that >>>>>>>>>>>> do preserve order, and only accept those as key coders >>>>>>>>>>>> Alternatively we >>>>>>>>>>>> could present a more limited API - e.g. only allowing a hard-coded >>>>>>>>>>>> set of >>>>>>>>>>>> types to be used as keys - but that seems counter to the direction >>>>>>>>>>>> Beam >>>>>>>>>>>> usually goes. So users will have two ways .of creating multimap >>>>>>>>>>>> state specs: >>>>>>>>>>>> >>>>>>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>>>>>> >>>>>>>>>>>> or >>>>>>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), >>>>>>>>>>>> StringUtf8Coder.of()); >>>>>>>>>>>> >>>>>>>>>>>> The second one will validate that the key coder preserves >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism checking >>>>>>>>>>>> in >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these functions >>>>>>>>>>>> that use >>>>>>>>>>>> coder inference to "guess" the coder, but those will do the same >>>>>>>>>>>> checking) >>>>>>>>>>>> >>>>>>>>>>>> Also the API I proposed did support random access! We could >>>>>>>>>>>> separate out OrderedBagState again if we think the use cases are >>>>>>>>>>>> fundamentally different. I merged the proposal into that of >>>>>>>>>>>> MultimapState >>>>>>>>>>>> because there seemed be 99% overlap. >>>>>>>>>>>> >>>>>>>>>>>> Reuven >>>>>>>>>>>> >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> A few obvious problems with this code: >>>>>>>>>>>>> >>>>> 1. Removing the elements already processed from the >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This is >>>>>>>>>>>>> O(n^2) in the >>>>>>>>>>>>> number of input trades. >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade >>>>>>>>>>>>> state? >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State >>>>>>>>>>>>> { >>>>>>>>>>>>> >>>>> // Add a value to the map. >>>>>>>>>>>>> >>>>> void put(K key, V value); >>>>>>>>>>>>> >>>>> // Get all values for a given key. >>>>>>>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >>>>>>>>>>>>> >>>>> // Return all entries in the map. >>>>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >>>>>>>>>>>>> >>>>> // Return all entries in the map with keys <= limit. >>>>>>>>>>>>> returned elements are sorted by the key. >>>>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K >>>>>>>>>>>>> limit); >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> // Remove all values with the given key; >>>>>>>>>>>>> >>>>> void remove(K key); >>>>>>>>>>>>> >>>>> // Remove all entries in the map with keys <= limit. >>>>>>>>>>>>> >>>>> void removeUntil(K limit); >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove >>>>>>>>>>>>> all entries in the map with keys < limit. >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key. >>>>>>>>>>>>> In order to make this easier for users, I propose that we >>>>>>>>>>>>> introduce a new >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this tag >>>>>>>>>>>>> guarantees >>>>>>>>>>>>> that the encoded value preserves the same ordering as the base >>>>>>>>>>>>> Java type. >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> Could you clarify what is "encoded value preserves the >>>>>>>>>>>>> same ordering as the base Java type"? >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> Lets say A and B represent two different instances of the >>>>>>>>>>>>> same Java type like a double, then A < B (using the languages >>>>>>>>>>>>> comparison >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions >>>>>>>>>>>>> are compared >>>>>>>>>>>>> lexicographically) >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What >>>>>>>>>>>>> happens A, >>>>>>>>>>>>> B sort differently in different languages? >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > That would have to be the property of the coder (which means >>>>>>>>>>>>> that this property probably needs to be represented in the >>>>>>>>>>>>> portability >>>>>>>>>>>>> representation of the coder). I imagine the common use cases will >>>>>>>>>>>>> be for >>>>>>>>>>>>> simple coders like int, long, string, etc., which are likely to >>>>>>>>>>>>> sort the >>>>>>>>>>>>> same in most languages. >>>>>>>>>>>>> >>>>>>>>>>>>> The standard coders for both double and integral types do not >>>>>>>>>>>>> respect >>>>>>>>>>>>> the natural ordering (consider negative values). KV coders >>>>>>>>>>>>> violate the >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I think >>>>>>>>>>>>> implicitly sorting on encoded value would yield many >>>>>>>>>>>>> surprises. (The >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes >>>>>>>>>>>>> (string?)-producing callable as a parameter of course). (As for >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or something >>>>>>>>>>>>> like >>>>>>>>>>>>> that...rather than Map which tends to imply random access.) >>>>>>>>>>>>> >>>>>>>>>>>>
