Hi Reuven,

I noticed that there was an implementation of the in-memory OrderedListState 
introduced [1]. Where can I find out more regarding the plan and design? Is 
there a design doc? I'd like to know more details about the implementation to 
see if it fits my use case. I was hoping it would have a 
remove(TimestampedValue<T> e) method.

Thanks,
-Tyson


[1]: 
https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41


On 2020/08/03 21:41:46, Catlyn Kong <catl...@yelp.com> wrote: 
> Hey folks,
> 
> Sry I'm late to this thread but this might be very helpful for the problem
> we're dealing with. Do we have a design doc or a jira ticket I can follow?
> 
> Cheers,
> Catlyn
> 
> On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote:
> 
> > My questions were just an example. I fully agree there is a fundamental
> > need for a sorted state (of some form, and I also think this links to
> > efficient implementation of retrations) - I was reacting to Kenn's question
> > about BIP. This one would be pretty nice example why it would be good to
> > have such a "process" - not everything can be solved on ML and there are
> > fundamental decisions that might need a closer attention.
> > On 6/18/20 5:28 PM, Reuven Lax wrote:
> >
> > Jan - my proposal is exactly TimeSortedBagState (more accurately -
> > TimeSortedListState), though I went a bit further and also proposed a way
> > to have a dynamic number of tagged TimeSortedBagStates.
> >
> > You are correct that the runner doesn't really have to store the data time
> > sorted - what's actually needed is the ability to fetch and remove
> > timestamp ranges of data (though that does include fetching the entire
> > list); TimeOrderedState is probably a more accurate name then
> > TimeSortedState. I don't think we could get away with operations that only
> > act on the smallest timestamp, however we could limit the API to only being
> > able to fetch and remove prefixes of data (ordered by timestamp). However
> > if we support prefixes, we might as well support arbitrary subranges.
> >
> > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> >> Big +1 for a BIP, as this might really help clarify all the pros and cons
> >> of all possibilities. There seem to be questions that need answering and
> >> motivating use cases - do we need sorted map state or can we solve our use
> >> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does
> >> that really have to be time-sorted structure, or does it "only" have to
> >> have operations that can efficiently find and remove element with smallest
> >> timestamp (like a PriorityQueue)?
> >>
> >> Jan
> >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
> >>
> >> Zooming in from generic philosophy to be clear: adding time ordered
> >> buffer to the Fn state API is *not* a shortcut.It has benefits that will
> >> not be achieved by SDK-side implementation on top of either ordered or
> >> unordered multimap. Are those benefits worth expanding the API? I don't
> >> know.
> >>
> >> A change to allow a runner to have a specialized implementation for
> >> time-buffered state would be one or more StateKey types, right? Reuven,
> >> maybe put this and your Java API in a doc? A BIP? Seems like there's at
> >> least the following to explore:
> >>
> >>  - how that Java API would map to an SDK-side implementation on top of
> >> multimap state key
> >>  - how that Java API would map to a new StateKey
> >>  - whether there's actually more than one relevant implementation of that
> >> StateKey
> >>  - whether SDK-side implementation on some other state key would be
> >> performant enough in all SDK languages (present and future)
> >>
> >> Zooming back out to generic philosophy: Proliferation of StateKey
> >> types tuned by runners (which can very easily still share implementation)
> >> is probably better than proliferation of complex SDK-side implementations
> >> with varying completeness and performance.
> >>
> >> Kenn
> >>
> >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com> wrote:
> >>
> >>> It might help for me to describe what I have in mind. I'm still
> >>> proposing that we build multimap, just not a globally-sorted multimap.
> >>>
> >>> My previous proposal was that we provide a Multimap<Key, Value> state
> >>> type, sorted by key. this would have two additional operations -
> >>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
> >>> endKey). The primary use case was timestamp sorting, but I felt that a
> >>> sorted multimap provided a nice generalization - after all, you can simply
> >>> key the multimap by timestamp to get timestamp sorting.
> >>>
> >>> This approach had some issues immediately that would take some work to
> >>> solve. Since a multimap key can have any type and a runner will only be
> >>> able to sort by encoded type, we would need to introduce a concept of
> >>> order-preserving coders into Beam and plumb that through. Robert pointed
> >>> out that even our existing standard coders for simple integral types don't
> >>> preserve order, so there will likely be surprises here.
> >>>
> >>> My current proposal is for a multimap that is not sorted by key, but
> >>> that can support.ordered values for a single key. Remember that a multimap
> >>> maps K -> Iterable<V>, so this means that each individual Iterable<V> is
> >>> ordered, but the keys have no specific order relative to each other. This
> >>> is not too different from many multimap implementations where the keys are
> >>> unordered, but the list of values for a single key at least has a stable
> >>> order.
> >>>
> >>> The interface would look like this:
> >>>
> >>> public interface MultimapState<K, V> extends State {
> >>>   // Add a value with a default timestamp.
> >>>   void put(K key, V value);
> >>>
> >>>   // Add a timestamped value.
> >>>   void put(K, key, TimestampedValue<V> value);
> >>>
> >>>   // Remove all values for a key.
> >>>   void remove (K key);
> >>>
> >>>   // Remove all values for a key with timestamps within the specified
> >>> range.
> >>>   void removeRange(K key, Instant startTs, Instant endTs);
> >>>
> >>>   // Get an Iterable of values for V. The Iterable will be returned
> >>> sorted by timestamp.
> >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
> >>>
> >>>   // Get an Iterable of values for V in the specified range. The
> >>> Iterable will be returned sorted by timestamp.
> >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, Instant
> >>> startTs, Instant endTs);
> >>>
> >>>   ReadableState<Iterable<K>> keys();
> >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
> >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
> >>> }
> >>>
> >>> We can of course provide helper functions that allow using MultimapState
> >>> without deailing with TimestampValue for users who only want a multimap 
> >>> and
> >>> don't want sorting.
> >>>
> >>> I think many users will only need a single sorted list - not a full
> >>> multimap. It's worth offering this as well, and we can simply build it on
> >>> top of MultimapState. It will look like an extension of BagState
> >>>
> >>> public interface TimestampSortedListState<T> extends State {
> >>>   void add(TimestampedValue<T> value);
> >>>   Iterable<TimestampedValue<T>> read();
> >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
> >>> endTs);
> >>>   void clearRange(Instant startTs, Instant endTs);
> >>> }
> >>>
> >>>
> >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote:
> >>>
> >>>> The portability layer is meant to live across multiple versions of Beam
> >>>> and I don't think it should be treated by doing the simple and useful 
> >>>> thing
> >>>> now since I believe it will lead to a proliferation of the API.
> >>>>
> >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <k...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> I have thoughts on the subject of whether to have APIs just for the
> >>>>> lowest-level building blocks versus having APIs for higher-level
> >>>>> constructs. Specifically this applies to providing only unsorted 
> >>>>> multimap
> >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
> >>>>> time-ordered buffer; if it turns out to be easy to go all the way to 
> >>>>> sorted
> >>>>> multimap that's nice-to-have; if it turns out to be easy to implement on
> >>>>> top of unsorted map state that should probably be under the hood
> >>>>>
> >>>>> Reasons to build low-level multimap in the runner & fn api and layer
> >>>>> higher-level things in the SDK:
> >>>>>
> >>>>>  - It is less implementation for runners if they have to only provide
> >>>>> fewer lower-level building blocks like multimap state.
> >>>>>  - There are many more runners than SDKs (and will be even more and
> >>>>> more) so this saves overall.
> >>>>>
> >>>>> Reasons to build higher-level constructs directly in the runner and fn
> >>>>> api:
> >>>>>
> >>>>>  - Having multiple higher-level state types may actually be less
> >>>>> implementation than one complex state type, especially if they map to
> >>>>> runner primitives.
> >>>>>  - The runner may have better specialized implementations, especially
> >>>>> for something like a time-ordered buffer.
> >>>>>  - The particular access patterns in an SDK-based implementation may
> >>>>> not be ideal for each runner's underlying implementation of the 
> >>>>> low-level
> >>>>> building block.
> >>>>>  - There may be excessive gRPC overhead even for optimal access
> >>>>> patterns.
> >>>>>
> >>>>> There are ways to have best of both worlds, like:
> >>>>>
> >>>>> 1. Define multiple state types according to fundamental access
> >>>>> patterns, like we did this before portability.
> >>>>> 2. If it is easy to layer one on top of the other, do that inside the
> >>>>> runner. Provide shared code so for runners providing the lowest-level
> >>>>> primitive they get all the types for free.
> >>>>>
> >>>>> I understand that this is an oversimplification. It still creates some
> >>>>> more work. And APIs are a burden so it is good to introduce as few as
> >>>>> possible for maintenance. But it has performance benefits and also 
> >>>>> unblocks
> >>>>> "just doing the simple and useful thing now" which I always like to do 
> >>>>> as
> >>>>> long as it is compatible with future changes. If the APIs are 
> >>>>> fundamental,
> >>>>> like sets, maps, timestamp ordering, then it is safe to guess that they
> >>>>> will change rarely and be useful forever.
> >>>>>
> >>>>> Kenn
> >>>>>
> >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com> wrote:
> >>>>>
> >>>>>> I would be glad to take a stab at how to provide sorting on top of
> >>>>>> unsorted multimap state.
> >>>>>> Based upon your description, you want integer keys representing
> >>>>>> timestamps and arbitrary user value for the values, is that correct?
> >>>>>> What kinds of operations do you need on the sorted map state in order
> >>>>>> of efficiency requirements?
> >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
> >>>>>> What kinds of operations do we expect the underlying unsorted map
> >>>>>> state to be able to provide?
> >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
> >>>>>> enumerate(K)?)
> >>>>>>
> >>>>>> I went through a similar exercise of how to provide a list like side
> >>>>>> input view over a multimap[1] side input which efficiently allowed
> >>>>>> computation of size and provided random access while only having 
> >>>>>> access to
> >>>>>> get(K) and enumerate K's.
> >>>>>>
> >>>>>> 1:
> >>>>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
> >>>>>>
> >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com> wrote:
> >>>>>>
> >>>>>>> Bringing this subject up again,
> >>>>>>>
> >>>>>>> I've spent some time looking into implementing this for the Dataflow
> >>>>>>> runner. I'm unable to find a way to implement the arbitrary sorted 
> >>>>>>> multimap
> >>>>>>> efficiently for the case where there are large numbers of unique keys.
> >>>>>>> Since the primary driving use case is timestamp ordering (i.e. key is 
> >>>>>>> event
> >>>>>>> timestamp), you would expect to have nearly a new key per element. I
> >>>>>>> considered Luke's suggestion above, but unfortunately it doesn't 
> >>>>>>> really
> >>>>>>> solve this issue.
> >>>>>>>
> >>>>>>> The primary use case for sorting always seems to be sorting by
> >>>>>>> timestamp. I want to propose that instead of building the 
> >>>>>>> fully-general
> >>>>>>> sorted multimap, we instead focus on a state type where the sort key 
> >>>>>>> is an
> >>>>>>> integral type (like a timestamp or an integer). There is still a 
> >>>>>>> valid use
> >>>>>>> case for multimap, but we can provide that as an unordered state. At 
> >>>>>>> least
> >>>>>>> for Dataflow, it will be much easier
> >>>>>>>
> >>>>>>> While my difficulties here may be specific to the Dataflow runner,
> >>>>>>> any such support would have to be built into other runners as well, 
> >>>>>>> and
> >>>>>>> limiting to integral sorting likely makes it easier for other runners 
> >>>>>>> to
> >>>>>>> implement this. Also, if you look at this
> >>>>>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
> >>>>>>>  Flink
> >>>>>>> comment pointed out by Aljoscha, for Flink the main use case 
> >>>>>>> identified was
> >>>>>>> also timestamp sorting. This will also simplify the API design for 
> >>>>>>> this
> >>>>>>> feature: Sorted multimap with arbitrary keys would require us to 
> >>>>>>> introduce
> >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a new
> >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral types, 
> >>>>>>> the API
> >>>>>>> design is simpler as integral types can be represented directly.
> >>>>>>>
> >>>>>>> Reuven
> >>>>>>>
> >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> wrote:
> >>>>>>>
> >>>>>>>> This sounds to me like a potential runner strategy. However if a
> >>>>>>>> runner can natively support sorted maps (e.g. we expect the Dataflow 
> >>>>>>>> runner
> >>>>>>>> to be able to do so, and I think it would be useful for other 
> >>>>>>>> runners as
> >>>>>>>> well), then it's probably preferable to allow the runner to use its 
> >>>>>>>> native
> >>>>>>>> capabilities.
> >>>>>>>>
> >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> For the API that you proposed, the map key is always "void" and
> >>>>>>>>> the sort key == user key. So in my example of
> >>>>>>>>> key: dummy value
> >>>>>>>>> key.000: token, (0001, value4)
> >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
> >>>>>>>>> key.01: token
> >>>>>>>>> key.1: token, (1011, value3)
> >>>>>>>>> you would have:
> >>>>>>>>> "void": dummy value
> >>>>>>>>> "void".000: token, (0001, value4)
> >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
> >>>>>>>>> "void".01: token
> >>>>>>>>> "void".1: token, (1011, value3)
> >>>>>>>>>
> >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking
> >>>>>>>>> the the prefixes until you find a common prefix for K and then 
> >>>>>>>>> filter for
> >>>>>>>>> values where they have a sort key <= K. Using the example above, to 
> >>>>>>>>> find
> >>>>>>>>> entriesUntil(0010) you would:
> >>>>>>>>> look for key."", miss
> >>>>>>>>> look for key.0, miss
> >>>>>>>>> look for key.00, miss
> >>>>>>>>> look for key.000, hit, sort all contained values using secondary
> >>>>>>>>> key, provide value4 to user
> >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we
> >>>>>>>>> sort all contained values using secondary key, filter out value2 and
> >>>>>>>>> provide value1
> >>>>>>>>>
> >>>>>>>>> void removeUntil(K limit) also translates into walking the
> >>>>>>>>> prefixes but instead we will clear them when we have a "hit" with 
> >>>>>>>>> some
> >>>>>>>>> special logic for when the sort key is a prefix of the key. Used the
> >>>>>>>>> example, to removeUntil(0010) you would:
> >>>>>>>>> look for key."", miss
> >>>>>>>>> look for key.0, miss
> >>>>>>>>> look for key.00, miss
> >>>>>>>>> look for key.000, hit, clear
> >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we
> >>>>>>>>> sort all contained values using secondary key, store in memory all 
> >>>>>>>>> values
> >>>>>>>>> that > 0010, clear and append values stored in memory.
> >>>>>>>>>
> >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Can you explain how fetching and deleting ranges of keys would
> >>>>>>>>>> work with this data structure?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lc...@google.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Reuven, for the example, I assume that we never want to store
> >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do then 
> >>>>>>>>>>> we will
> >>>>>>>>>>> create a new longer prefix splitting up the values based upon the 
> >>>>>>>>>>> sort key.
> >>>>>>>>>>>
> >>>>>>>>>>> Tuple representation in examples below is (key, sort key, value)
> >>>>>>>>>>> and . is a character outside of the alphabet which can be 
> >>>>>>>>>>> represented by
> >>>>>>>>>>> using an escaping encoding that wraps the key + sort key encoding.
> >>>>>>>>>>>
> >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
> >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this case its 
> >>>>>>>>>>> 0, so we
> >>>>>>>>>>> append value to the map at key.0 ending up with (we also set the 
> >>>>>>>>>>> key to any
> >>>>>>>>>>> dummy value to know that it it contains values):
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key."": token, (0010, value1)
> >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all
> >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to key."" 
> >>>>>>>>>>> ending up
> >>>>>>>>>>> with:
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
> >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all
> >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full, so we 
> >>>>>>>>>>> partition
> >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the "" 
> >>>>>>>>>>> prefix
> >>>>>>>>>>> ending up with:
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
> >>>>>>>>>>> key.1: token, (1011, value3)
> >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all
> >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is full, 
> >>>>>>>>>>> so we
> >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but notice 
> >>>>>>>>>>> this
> >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00 again to 
> >>>>>>>>>>> 000, 001.
> >>>>>>>>>>> We also clear the 0 prefix ending up with:
> >>>>>>>>>>> key: dummy value
> >>>>>>>>>>> key.000: token, (0001, value4)
> >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
> >>>>>>>>>>> key.01: token
> >>>>>>>>>>> key.1: token, (1011, value3)
> >>>>>>>>>>>
> >>>>>>>>>>> We are effectively building a trie[1] where we only have values
> >>>>>>>>>>> at the leaves and control how full each leaf can be. There are 
> >>>>>>>>>>> other trie
> >>>>>>>>>>> representations like a radix tree that may be better.
> >>>>>>>>>>>
> >>>>>>>>>>> Looking up the values in sorted order for "key" would go like
> >>>>>>>>>>> this:
> >>>>>>>>>>> Is key set, yes
> >>>>>>>>>>> look for key."", miss
> >>>>>>>>>>> look for key.0, miss
> >>>>>>>>>>> look for key.00, miss
> >>>>>>>>>>> look for key.000, hit, sort all contained values using secondary
> >>>>>>>>>>> key, provide value4 to user
> >>>>>>>>>>> look for key.001, hit, sort all contained values using secondary
> >>>>>>>>>>> key, provide value1 followed by value2 to user
> >>>>>>>>>>> look for key.01, hit, empty, return no values to user
> >>>>>>>>>>> look for key.1, hit, sort all contained values using secondary
> >>>>>>>>>>> key, provide value3 to user
> >>>>>>>>>>> we have walked the entire prefix space, signal end of iterable
> >>>>>>>>>>>
> >>>>>>>>>>> Some notes for the above:
> >>>>>>>>>>> * The dummy value is used to know that the key contains values
> >>>>>>>>>>> and the token is to know whether there are any values deeper in 
> >>>>>>>>>>> the trie so
> >>>>>>>>>>> when we know when to stop searching.
> >>>>>>>>>>> * If we can recalculate the sort key from the combination of the
> >>>>>>>>>>> key and value, then we don't need to store it.
> >>>>>>>>>>> * Keys with lots of values will perform worse then keys with
> >>>>>>>>>>> less values since we have to look up more keys but they will be 
> >>>>>>>>>>> empty
> >>>>>>>>>>> reads. The number of misses can be controlled by how many 
> >>>>>>>>>>> elements we are
> >>>>>>>>>>> willing to store at a given node before we subdivide.
> >>>>>>>>>>>
> >>>>>>>>>>> In reality you could build a lot of structures (e.g. red black
> >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the cost of
> >>>>>>>>>>> rebalancing/re-organizing the structure in map form and whether 
> >>>>>>>>>>> it has a
> >>>>>>>>>>> convenient pre-order traversal for lookups.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <re...@google.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Some great comments!
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by
> >>>>>>>>>>>> runners to be efficient. We can of course provide a default 
> >>>>>>>>>>>> (inefficient)
> >>>>>>>>>>>> implementation, but ideally runners would provide better ones.
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by
> >>>>>>>>>>>> this. E.g.
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
> >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag the 
> >>>>>>>>>>>> coders that
> >>>>>>>>>>>> do preserve order, and only accept those as key coders 
> >>>>>>>>>>>> Alternatively we
> >>>>>>>>>>>> could present a more limited API - e.g. only allowing a 
> >>>>>>>>>>>> hard-coded set of
> >>>>>>>>>>>> types to be used as keys - but that seems counter to the 
> >>>>>>>>>>>> direction Beam
> >>>>>>>>>>>> usually goes. So users will have two ways .of creating multimap 
> >>>>>>>>>>>> state specs:
> >>>>>>>>>>>>
> >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
> >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
> >>>>>>>>>>>>
> >>>>>>>>>>>> or
> >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
> >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), 
> >>>>>>>>>>>> StringUtf8Coder.of());
> >>>>>>>>>>>>
> >>>>>>>>>>>> The second one will validate that the key coder preserves
> >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism 
> >>>>>>>>>>>> checking in
> >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these functions 
> >>>>>>>>>>>> that use
> >>>>>>>>>>>> coder inference to "guess" the coder, but those will do the same 
> >>>>>>>>>>>> checking)
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also the API I proposed did support random access! We could
> >>>>>>>>>>>> separate out OrderedBagState again if we think the use cases are
> >>>>>>>>>>>> fundamentally different. I merged the proposal into that of 
> >>>>>>>>>>>> MultimapState
> >>>>>>>>>>>> because there seemed be 99% overlap.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Reuven
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
> >>>>>>>>>>>> rober...@google.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <re...@google.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>> >
> >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
> >>>>>>>>>>>>> al...@google.com> wrote:
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
> >>>>>>>>>>>>> lc...@google.com> wrote:
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
> >>>>>>>>>>>>> ruw...@google.com> wrote:
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
> >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from the
> >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This is 
> >>>>>>>>>>>>> O(n^2) in the
> >>>>>>>>>>>>> number of input trades.
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade
> >>>>>>>>>>>>> state?
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State
> >>>>>>>>>>>>> {
> >>>>>>>>>>>>> >>>>>   // Add a value to the map.
> >>>>>>>>>>>>> >>>>>   void put(K key, V value);
> >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
> >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
> >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
> >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
> >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <= limit.
> >>>>>>>>>>>>> returned elements are sorted by the key.
> >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
> >>>>>>>>>>>>> limit);
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
> >>>>>>>>>>>>> >>>>>   void remove(K key);
> >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
> >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove
> >>>>>>>>>>>>> all entries in the map with keys < limit.
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>>>
> >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key.
> >>>>>>>>>>>>> In order to make this easier for users, I propose that we 
> >>>>>>>>>>>>> introduce a new
> >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this tag 
> >>>>>>>>>>>>> guarantees
> >>>>>>>>>>>>> that the encoded value preserves the same ordering as the base 
> >>>>>>>>>>>>> Java type.
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>>
> >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves the
> >>>>>>>>>>>>> same ordering as the base Java type"?
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>>
> >>>>>>>>>>>>> >>> Lets say A and B represent two different instances of the
> >>>>>>>>>>>>> same Java type like a double, then A < B (using the languages 
> >>>>>>>>>>>>> comparison
> >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions 
> >>>>>>>>>>>>> are compared
> >>>>>>>>>>>>> lexicographically)
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >>
> >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff
> >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What 
> >>>>>>>>>>>>> happens A,
> >>>>>>>>>>>>> B sort differently in different languages?
> >>>>>>>>>>>>> >
> >>>>>>>>>>>>> >
> >>>>>>>>>>>>> > That would have to be the property of the coder (which means
> >>>>>>>>>>>>> that this property probably needs to be represented in the 
> >>>>>>>>>>>>> portability
> >>>>>>>>>>>>> representation of the coder). I imagine the common use cases 
> >>>>>>>>>>>>> will be for
> >>>>>>>>>>>>> simple coders like int, long, string, etc., which are likely to 
> >>>>>>>>>>>>> sort the
> >>>>>>>>>>>>> same in most languages.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The standard coders for both double and integral types do not
> >>>>>>>>>>>>> respect
> >>>>>>>>>>>>> the natural ordering (consider negative values). KV coders
> >>>>>>>>>>>>> violate the
> >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I think
> >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
> >>>>>>>>>>>>> surprises. (The
> >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
> >>>>>>>>>>>>> (string?)-producing callable as a parameter of course). (As for
> >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or something
> >>>>>>>>>>>>> like
> >>>>>>>>>>>>> that...rather than Map which tends to imply random access.)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> 

Reply via email to