On Wed, Sep 16, 2020 at 8:48 AM Tyson Hamilton <[email protected]> wrote:

> The use case is to support an unbounded stream-stream join, where the
> elements are arriving in roughly time sorted order. Removing a specific
> element from the timestamp indexed collection is necessary when a match is
> found.
>

Just checking - this is an optimization when you already know that the join
is 1:1?

Kenn


> Having clearRange is helpful to expire elements that are no longer
> relevant according to a user-provided time based join predicate (e.g. WHEN
> ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).
>
> I'll think a bit more on how to use MapState instead if having a remove()
> like method for a single element isn't an option.
>
> On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax <[email protected]> wrote:
>
>> Hi,
>>
>> Currently we only support removing a timestamp range. You can remove a
>> single timestamp of course by removing [ts, ts+1), however if there are
>> multiple elements with the same timestamp this will remove all of those
>> elements.
>>
>> Does this fit your use case? If not, I wonder if MapState is closer to
>> what you are looking for?
>>
>> Reuven
>>
>> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <[email protected]>
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> I noticed that there was an implementation of the in-memory
>>> OrderedListState introduced [1]. Where can I find out more regarding the
>>> plan and design? Is there a design doc? I'd like to know more details about
>>> the implementation to see if it fits my use case. I was hoping it would
>>> have a remove(TimestampedValue<T> e) method.
>>>
>>> Thanks,
>>> -Tyson
>>>
>>>
>>> [1]:
>>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>>>
>>>
>>> On 2020/08/03 21:41:46, Catlyn Kong <[email protected]> wrote:
>>> > Hey folks,
>>> >
>>> > Sry I'm late to this thread but this might be very helpful for the
>>> problem
>>> > we're dealing with. Do we have a design doc or a jira ticket I can
>>> follow?
>>> >
>>> > Cheers,
>>> > Catlyn
>>> >
>>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <[email protected]> wrote:
>>> >
>>> > > My questions were just an example. I fully agree there is a
>>> fundamental
>>> > > need for a sorted state (of some form, and I also think this links to
>>> > > efficient implementation of retrations) - I was reacting to Kenn's
>>> question
>>> > > about BIP. This one would be pretty nice example why it would be
>>> good to
>>> > > have such a "process" - not everything can be solved on ML and there
>>> are
>>> > > fundamental decisions that might need a closer attention.
>>> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
>>> > >
>>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
>>> > > TimeSortedListState), though I went a bit further and also proposed
>>> a way
>>> > > to have a dynamic number of tagged TimeSortedBagStates.
>>> > >
>>> > > You are correct that the runner doesn't really have to store the
>>> data time
>>> > > sorted - what's actually needed is the ability to fetch and remove
>>> > > timestamp ranges of data (though that does include fetching the
>>> entire
>>> > > list); TimeOrderedState is probably a more accurate name then
>>> > > TimeSortedState. I don't think we could get away with operations
>>> that only
>>> > > act on the smallest timestamp, however we could limit the API to
>>> only being
>>> > > able to fetch and remove prefixes of data (ordered by timestamp).
>>> However
>>> > > if we support prefixes, we might as well support arbitrary subranges.
>>> > >
>>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <[email protected]>
>>> wrote:
>>> > >
>>> > >> Big +1 for a BIP, as this might really help clarify all the pros
>>> and cons
>>> > >> of all possibilities. There seem to be questions that need
>>> answering and
>>> > >> motivating use cases - do we need sorted map state or can we solve
>>> our use
>>> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
>>> Does
>>> > >> that really have to be time-sorted structure, or does it "only"
>>> have to
>>> > >> have operations that can efficiently find and remove element with
>>> smallest
>>> > >> timestamp (like a PriorityQueue)?
>>> > >>
>>> > >> Jan
>>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>>> > >>
>>> > >> Zooming in from generic philosophy to be clear: adding time ordered
>>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
>>> will
>>> > >> not be achieved by SDK-side implementation on top of either ordered
>>> or
>>> > >> unordered multimap. Are those benefits worth expanding the API? I
>>> don't
>>> > >> know.
>>> > >>
>>> > >> A change to allow a runner to have a specialized implementation for
>>> > >> time-buffered state would be one or more StateKey types, right?
>>> Reuven,
>>> > >> maybe put this and your Java API in a doc? A BIP? Seems like
>>> there's at
>>> > >> least the following to explore:
>>> > >>
>>> > >>  - how that Java API would map to an SDK-side implementation on top
>>> of
>>> > >> multimap state key
>>> > >>  - how that Java API would map to a new StateKey
>>> > >>  - whether there's actually more than one relevant implementation
>>> of that
>>> > >> StateKey
>>> > >>  - whether SDK-side implementation on some other state key would be
>>> > >> performant enough in all SDK languages (present and future)
>>> > >>
>>> > >> Zooming back out to generic philosophy: Proliferation of StateKey
>>> > >> types tuned by runners (which can very easily still share
>>> implementation)
>>> > >> is probably better than proliferation of complex SDK-side
>>> implementations
>>> > >> with varying completeness and performance.
>>> > >>
>>> > >> Kenn
>>> > >>
>>> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <[email protected]>
>>> wrote:
>>> > >>
>>> > >>> It might help for me to describe what I have in mind. I'm still
>>> > >>> proposing that we build multimap, just not a globally-sorted
>>> multimap.
>>> > >>>
>>> > >>> My previous proposal was that we provide a Multimap<Key, Value>
>>> state
>>> > >>> type, sorted by key. this would have two additional operations -
>>> > >>> multimap.getRange(startKey, endKey) and
>>> multimap.deleteRange(startKey,
>>> > >>> endKey). The primary use case was timestamp sorting, but I felt
>>> that a
>>> > >>> sorted multimap provided a nice generalization - after all, you
>>> can simply
>>> > >>> key the multimap by timestamp to get timestamp sorting.
>>> > >>>
>>> > >>> This approach had some issues immediately that would take some
>>> work to
>>> > >>> solve. Since a multimap key can have any type and a runner will
>>> only be
>>> > >>> able to sort by encoded type, we would need to introduce a concept
>>> of
>>> > >>> order-preserving coders into Beam and plumb that through. Robert
>>> pointed
>>> > >>> out that even our existing standard coders for simple integral
>>> types don't
>>> > >>> preserve order, so there will likely be surprises here.
>>> > >>>
>>> > >>> My current proposal is for a multimap that is not sorted by key,
>>> but
>>> > >>> that can support.ordered values for a single key. Remember that a
>>> multimap
>>> > >>> maps K -> Iterable<V>, so this means that each individual
>>> Iterable<V> is
>>> > >>> ordered, but the keys have no specific order relative to each
>>> other. This
>>> > >>> is not too different from many multimap implementations where the
>>> keys are
>>> > >>> unordered, but the list of values for a single key at least has a
>>> stable
>>> > >>> order.
>>> > >>>
>>> > >>> The interface would look like this:
>>> > >>>
>>> > >>> public interface MultimapState<K, V> extends State {
>>> > >>>   // Add a value with a default timestamp.
>>> > >>>   void put(K key, V value);
>>> > >>>
>>> > >>>   // Add a timestamped value.
>>> > >>>   void put(K, key, TimestampedValue<V> value);
>>> > >>>
>>> > >>>   // Remove all values for a key.
>>> > >>>   void remove (K key);
>>> > >>>
>>> > >>>   // Remove all values for a key with timestamps within the
>>> specified
>>> > >>> range.
>>> > >>>   void removeRange(K key, Instant startTs, Instant endTs);
>>> > >>>
>>> > >>>   // Get an Iterable of values for V. The Iterable will be returned
>>> > >>> sorted by timestamp.
>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> get(K key);
>>> > >>>
>>> > >>>   // Get an Iterable of values for V in the specified range. The
>>> > >>> Iterable will be returned sorted by timestamp.
>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
>>> Instant
>>> > >>> startTs, Instant endTs);
>>> > >>>
>>> > >>>   ReadableState<Iterable<K>> keys();
>>> > >>>   ReadableState<Iterable<TimestampedValue<V>>> values();
>>> > >>>   ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
>>> > >>> }
>>> > >>>
>>> > >>> We can of course provide helper functions that allow using
>>> MultimapState
>>> > >>> without deailing with TimestampValue for users who only want a
>>> multimap and
>>> > >>> don't want sorting.
>>> > >>>
>>> > >>> I think many users will only need a single sorted list - not a full
>>> > >>> multimap. It's worth offering this as well, and we can simply
>>> build it on
>>> > >>> top of MultimapState. It will look like an extension of BagState
>>> > >>>
>>> > >>> public interface TimestampSortedListState<T> extends State {
>>> > >>>   void add(TimestampedValue<T> value);
>>> > >>>   Iterable<TimestampedValue<T>> read();
>>> > >>>   Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
>>> > >>> endTs);
>>> > >>>   void clearRange(Instant startTs, Instant endTs);
>>> > >>> }
>>> > >>>
>>> > >>>
>>> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <[email protected]>
>>> wrote:
>>> > >>>
>>> > >>>> The portability layer is meant to live across multiple versions
>>> of Beam
>>> > >>>> and I don't think it should be treated by doing the simple and
>>> useful thing
>>> > >>>> now since I believe it will lead to a proliferation of the API.
>>> > >>>>
>>> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <[email protected]>
>>> > >>>> wrote:
>>> > >>>>
>>> > >>>>> I have thoughts on the subject of whether to have APIs just for
>>> the
>>> > >>>>> lowest-level building blocks versus having APIs for higher-level
>>> > >>>>> constructs. Specifically this applies to providing only unsorted
>>> multimap
>>> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to
>>> focus on
>>> > >>>>> time-ordered buffer; if it turns out to be easy to go all the
>>> way to sorted
>>> > >>>>> multimap that's nice-to-have; if it turns out to be easy to
>>> implement on
>>> > >>>>> top of unsorted map state that should probably be under the hood
>>> > >>>>>
>>> > >>>>> Reasons to build low-level multimap in the runner & fn api and
>>> layer
>>> > >>>>> higher-level things in the SDK:
>>> > >>>>>
>>> > >>>>>  - It is less implementation for runners if they have to only
>>> provide
>>> > >>>>> fewer lower-level building blocks like multimap state.
>>> > >>>>>  - There are many more runners than SDKs (and will be even more
>>> and
>>> > >>>>> more) so this saves overall.
>>> > >>>>>
>>> > >>>>> Reasons to build higher-level constructs directly in the runner
>>> and fn
>>> > >>>>> api:
>>> > >>>>>
>>> > >>>>>  - Having multiple higher-level state types may actually be less
>>> > >>>>> implementation than one complex state type, especially if they
>>> map to
>>> > >>>>> runner primitives.
>>> > >>>>>  - The runner may have better specialized implementations,
>>> especially
>>> > >>>>> for something like a time-ordered buffer.
>>> > >>>>>  - The particular access patterns in an SDK-based implementation
>>> may
>>> > >>>>> not be ideal for each runner's underlying implementation of the
>>> low-level
>>> > >>>>> building block.
>>> > >>>>>  - There may be excessive gRPC overhead even for optimal access
>>> > >>>>> patterns.
>>> > >>>>>
>>> > >>>>> There are ways to have best of both worlds, like:
>>> > >>>>>
>>> > >>>>> 1. Define multiple state types according to fundamental access
>>> > >>>>> patterns, like we did this before portability.
>>> > >>>>> 2. If it is easy to layer one on top of the other, do that
>>> inside the
>>> > >>>>> runner. Provide shared code so for runners providing the
>>> lowest-level
>>> > >>>>> primitive they get all the types for free.
>>> > >>>>>
>>> > >>>>> I understand that this is an oversimplification. It still
>>> creates some
>>> > >>>>> more work. And APIs are a burden so it is good to introduce as
>>> few as
>>> > >>>>> possible for maintenance. But it has performance benefits and
>>> also unblocks
>>> > >>>>> "just doing the simple and useful thing now" which I always like
>>> to do as
>>> > >>>>> long as it is compatible with future changes. If the APIs are
>>> fundamental,
>>> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess
>>> that they
>>> > >>>>> will change rarely and be useful forever.
>>> > >>>>>
>>> > >>>>> Kenn
>>> > >>>>>
>>> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <[email protected]>
>>> wrote:
>>> > >>>>>
>>> > >>>>>> I would be glad to take a stab at how to provide sorting on top
>>> of
>>> > >>>>>> unsorted multimap state.
>>> > >>>>>> Based upon your description, you want integer keys representing
>>> > >>>>>> timestamps and arbitrary user value for the values, is that
>>> correct?
>>> > >>>>>> What kinds of operations do you need on the sorted map state in
>>> order
>>> > >>>>>> of efficiency requirements?
>>> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
>>> ClearAll(Range[x, y))
>>> > >>>>>> What kinds of operations do we expect the underlying unsorted
>>> map
>>> > >>>>>> state to be able to provide?
>>> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>>> > >>>>>> enumerate(K)?)
>>> > >>>>>>
>>> > >>>>>> I went through a similar exercise of how to provide a list like
>>> side
>>> > >>>>>> input view over a multimap[1] side input which efficiently
>>> allowed
>>> > >>>>>> computation of size and provided random access while only
>>> having access to
>>> > >>>>>> get(K) and enumerate K's.
>>> > >>>>>>
>>> > >>>>>> 1:
>>> > >>>>>>
>>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>>> > >>>>>>
>>> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <[email protected]>
>>> wrote:
>>> > >>>>>>
>>> > >>>>>>> Bringing this subject up again,
>>> > >>>>>>>
>>> > >>>>>>> I've spent some time looking into implementing this for the
>>> Dataflow
>>> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary
>>> sorted multimap
>>> > >>>>>>> efficiently for the case where there are large numbers of
>>> unique keys.
>>> > >>>>>>> Since the primary driving use case is timestamp ordering (i.e.
>>> key is event
>>> > >>>>>>> timestamp), you would expect to have nearly a new key per
>>> element. I
>>> > >>>>>>> considered Luke's suggestion above, but unfortunately it
>>> doesn't really
>>> > >>>>>>> solve this issue.
>>> > >>>>>>>
>>> > >>>>>>> The primary use case for sorting always seems to be sorting by
>>> > >>>>>>> timestamp. I want to propose that instead of building the
>>> fully-general
>>> > >>>>>>> sorted multimap, we instead focus on a state type where the
>>> sort key is an
>>> > >>>>>>> integral type (like a timestamp or an integer). There is still
>>> a valid use
>>> > >>>>>>> case for multimap, but we can provide that as an unordered
>>> state. At least
>>> > >>>>>>> for Dataflow, it will be much easier
>>> > >>>>>>>
>>> > >>>>>>> While my difficulties here may be specific to the Dataflow
>>> runner,
>>> > >>>>>>> any such support would have to be built into other runners as
>>> well, and
>>> > >>>>>>> limiting to integral sorting likely makes it easier for other
>>> runners to
>>> > >>>>>>> implement this. Also, if you look at this
>>> > >>>>>>> <
>>> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
>>> Flink
>>> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case
>>> identified was
>>> > >>>>>>> also timestamp sorting. This will also simplify the API design
>>> for this
>>> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us
>>> to introduce
>>> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a
>>> new
>>> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral
>>> types, the API
>>> > >>>>>>> design is simpler as integral types can be represented
>>> directly.
>>> > >>>>>>>
>>> > >>>>>>> Reuven
>>> > >>>>>>>
>>> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <[email protected]>
>>> wrote:
>>> > >>>>>>>
>>> > >>>>>>>> This sounds to me like a potential runner strategy. However
>>> if a
>>> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the
>>> Dataflow runner
>>> > >>>>>>>> to be able to do so, and I think it would be useful for other
>>> runners as
>>> > >>>>>>>> well), then it's probably preferable to allow the runner to
>>> use its native
>>> > >>>>>>>> capabilities.
>>> > >>>>>>>>
>>> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <
>>> [email protected]>
>>> > >>>>>>>> wrote:
>>> > >>>>>>>>
>>> > >>>>>>>>> For the API that you proposed, the map key is always "void"
>>> and
>>> > >>>>>>>>> the sort key == user key. So in my example of
>>> > >>>>>>>>> key: dummy value
>>> > >>>>>>>>> key.000: token, (0001, value4)
>>> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>> key.01: token
>>> > >>>>>>>>> key.1: token, (1011, value3)
>>> > >>>>>>>>> you would have:
>>> > >>>>>>>>> "void": dummy value
>>> > >>>>>>>>> "void".000: token, (0001, value4)
>>> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>> "void".01: token
>>> > >>>>>>>>> "void".1: token, (1011, value3)
>>> > >>>>>>>>>
>>> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into
>>> walking
>>> > >>>>>>>>> the the prefixes until you find a common prefix for K and
>>> then filter for
>>> > >>>>>>>>> values where they have a sort key <= K. Using the example
>>> above, to find
>>> > >>>>>>>>> entriesUntil(0010) you would:
>>> > >>>>>>>>> look for key."", miss
>>> > >>>>>>>>> look for key.0, miss
>>> > >>>>>>>>> look for key.00, miss
>>> > >>>>>>>>> look for key.000, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>> key, provide value4 to user
>>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010
>>> so we
>>> > >>>>>>>>> sort all contained values using secondary key, filter out
>>> value2 and
>>> > >>>>>>>>> provide value1
>>> > >>>>>>>>>
>>> > >>>>>>>>> void removeUntil(K limit) also translates into walking the
>>> > >>>>>>>>> prefixes but instead we will clear them when we have a "hit"
>>> with some
>>> > >>>>>>>>> special logic for when the sort key is a prefix of the key.
>>> Used the
>>> > >>>>>>>>> example, to removeUntil(0010) you would:
>>> > >>>>>>>>> look for key."", miss
>>> > >>>>>>>>> look for key.0, miss
>>> > >>>>>>>>> look for key.00, miss
>>> > >>>>>>>>> look for key.000, hit, clear
>>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010
>>> so we
>>> > >>>>>>>>> sort all contained values using secondary key, store in
>>> memory all values
>>> > >>>>>>>>> that > 0010, clear and append values stored in memory.
>>> > >>>>>>>>>
>>> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <
>>> [email protected]>
>>> > >>>>>>>>> wrote:
>>> > >>>>>>>>>
>>> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys
>>> would
>>> > >>>>>>>>>> work with this data structure?
>>> > >>>>>>>>>>
>>> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <
>>> [email protected]>
>>> > >>>>>>>>>> wrote:
>>> > >>>>>>>>>>
>>> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to
>>> store
>>> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we
>>> do then we will
>>> > >>>>>>>>>>> create a new longer prefix splitting up the values based
>>> upon the sort key.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key,
>>> value)
>>> > >>>>>>>>>>> and . is a character outside of the alphabet which can be
>>> represented by
>>> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key
>>> encoding.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the
>>> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this
>>> case its 0, so we
>>> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also
>>> set the key to any
>>> > >>>>>>>>>>> dummy value to know that it it contains values):
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key."": token, (0010, value1)
>>> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" +
>>> all
>>> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to
>>> key."" ending up
>>> > >>>>>>>>>>> with:
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2)
>>> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" +
>>> all
>>> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is
>>> full, so we partition
>>> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear
>>> the "" prefix
>>> > >>>>>>>>>>> ending up with:
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>>>> key.1: token, (1011, value3)
>>> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" +
>>> all
>>> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is
>>> full, so we
>>> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but
>>> notice this
>>> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00
>>> again to 000, 001.
>>> > >>>>>>>>>>> We also clear the 0 prefix ending up with:
>>> > >>>>>>>>>>> key: dummy value
>>> > >>>>>>>>>>> key.000: token, (0001, value4)
>>> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>> > >>>>>>>>>>> key.01: token
>>> > >>>>>>>>>>> key.1: token, (1011, value3)
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> We are effectively building a trie[1] where we only have
>>> values
>>> > >>>>>>>>>>> at the leaves and control how full each leaf can be. There
>>> are other trie
>>> > >>>>>>>>>>> representations like a radix tree that may be better.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go
>>> like
>>> > >>>>>>>>>>> this:
>>> > >>>>>>>>>>> Is key set, yes
>>> > >>>>>>>>>>> look for key."", miss
>>> > >>>>>>>>>>> look for key.0, miss
>>> > >>>>>>>>>>> look for key.00, miss
>>> > >>>>>>>>>>> look for key.000, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>>>> key, provide value4 to user
>>> > >>>>>>>>>>> look for key.001, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>>>> key, provide value1 followed by value2 to user
>>> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user
>>> > >>>>>>>>>>> look for key.1, hit, sort all contained values using
>>> secondary
>>> > >>>>>>>>>>> key, provide value3 to user
>>> > >>>>>>>>>>> we have walked the entire prefix space, signal end of
>>> iterable
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> Some notes for the above:
>>> > >>>>>>>>>>> * The dummy value is used to know that the key contains
>>> values
>>> > >>>>>>>>>>> and the token is to know whether there are any values
>>> deeper in the trie so
>>> > >>>>>>>>>>> when we know when to stop searching.
>>> > >>>>>>>>>>> * If we can recalculate the sort key from the combination
>>> of the
>>> > >>>>>>>>>>> key and value, then we don't need to store it.
>>> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys
>>> with
>>> > >>>>>>>>>>> less values since we have to look up more keys but they
>>> will be empty
>>> > >>>>>>>>>>> reads. The number of misses can be controlled by how many
>>> elements we are
>>> > >>>>>>>>>>> willing to store at a given node before we subdivide.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red
>>> black
>>> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the
>>> cost of
>>> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and
>>> whether it has a
>>> > >>>>>>>>>>> convenient pre-order traversal for lookups.
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <
>>> [email protected]>
>>> > >>>>>>>>>>> wrote:
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>> Some great comments!
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented
>>> by
>>> > >>>>>>>>>>>> runners to be efficient. We can of course provide a
>>> default (inefficient)
>>> > >>>>>>>>>>>> implementation, but ideally runners would provide better
>>> ones.
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed
>>> by
>>> > >>>>>>>>>>>> this. E.g.
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying
>>> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag
>>> the coders that
>>> > >>>>>>>>>>>> do preserve order, and only accept those as key coders
>>> Alternatively we
>>> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a
>>> hard-coded set of
>>> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the
>>> direction Beam
>>> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating
>>> multimap state specs:
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>>> state =
>>> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(),
>>> StringUtf8Coder.of());
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> or
>>> > >>>>>>>>>>>>    private final StateSpec<MultimapState<Long, String>>
>>> state =
>>> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(),
>>> StringUtf8Coder.of());
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> The second one will validate that the key coder preserves
>>> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism
>>> checking in
>>> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these
>>> functions that use
>>> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do
>>> the same checking)
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> Also the API I proposed did support random access! We
>>> could
>>> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use
>>> cases are
>>> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that
>>> of MultimapState
>>> > >>>>>>>>>>>> because there seemed be 99% overlap.
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> Reuven
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
>>> > >>>>>>>>>>>> [email protected]> wrote:
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <
>>> [email protected]>
>>> > >>>>>>>>>>>>> wrote:
>>> > >>>>>>>>>>>>> >
>>> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <
>>> > >>>>>>>>>>>>> [email protected]> wrote:
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <
>>> > >>>>>>>>>>>>> [email protected]> wrote:
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <
>>> > >>>>>>>>>>>>> [email protected]> wrote:
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code:
>>> > >>>>>>>>>>>>> >>>>>   1. Removing the elements already processed from
>>> the
>>> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This
>>> is O(n^2) in the
>>> > >>>>>>>>>>>>> number of input trades.
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting
>>> trade
>>> > >>>>>>>>>>>>> state?
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends
>>> State
>>> > >>>>>>>>>>>>> {
>>> > >>>>>>>>>>>>> >>>>>   // Add a value to the map.
>>> > >>>>>>>>>>>>> >>>>>   void put(K key, V value);
>>> > >>>>>>>>>>>>> >>>>>   // Get all values for a given key.
>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>> > >>>>>>>>>>>>> >>>>>  // Return all entries in the map.
>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>> > >>>>>>>>>>>>> >>>>>   // Return all entries in the map with keys <=
>>> limit.
>>> > >>>>>>>>>>>>> returned elements are sorted by the key.
>>> > >>>>>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K
>>> > >>>>>>>>>>>>> limit);
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>>  // Remove all values with the given key;
>>> > >>>>>>>>>>>>> >>>>>   void remove(K key);
>>> > >>>>>>>>>>>>> >>>>>  // Remove all entries in the map with keys <=
>>> limit.
>>> > >>>>>>>>>>>>> >>>>>   void removeUntil(K limit);
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will
>>> remove
>>> > >>>>>>>>>>>>> all entries in the map with keys < limit.
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>>>
>>> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of
>>> the key.
>>> > >>>>>>>>>>>>> In order to make this easier for users, I propose that
>>> we introduce a new
>>> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this
>>> tag guarantees
>>> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as
>>> the base Java type.
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>>
>>> > >>>>>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves
>>> the
>>> > >>>>>>>>>>>>> same ordering as the base Java type"?
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>>
>>> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances
>>> of the
>>> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the
>>> languages comparison
>>> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded
>>> versions are compared
>>> > >>>>>>>>>>>>> lexicographically)
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >>
>>> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A <
>>> B iff
>>> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we
>>> support? What happens A,
>>> > >>>>>>>>>>>>> B sort differently in different languages?
>>> > >>>>>>>>>>>>> >
>>> > >>>>>>>>>>>>> >
>>> > >>>>>>>>>>>>> > That would have to be the property of the coder (which
>>> means
>>> > >>>>>>>>>>>>> that this property probably needs to be represented in
>>> the portability
>>> > >>>>>>>>>>>>> representation of the coder). I imagine the common use
>>> cases will be for
>>> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are
>>> likely to sort the
>>> > >>>>>>>>>>>>> same in most languages.
>>> > >>>>>>>>>>>>>
>>> > >>>>>>>>>>>>> The standard coders for both double and integral types
>>> do not
>>> > >>>>>>>>>>>>> respect
>>> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV
>>> coders
>>> > >>>>>>>>>>>>> violate the
>>> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well.
>>> I think
>>> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many
>>> > >>>>>>>>>>>>> surprises. (The
>>> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes
>>> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course).
>>> (As for
>>> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or
>>> something
>>> > >>>>>>>>>>>>> like
>>> > >>>>>>>>>>>>> that...rather than Map which tends to imply random
>>> access.)
>>> > >>>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> >
>>>
>>

Reply via email to