The portability layer is meant to live across multiple versions of Beam and
I don't think it should be treated by doing the simple and useful thing now
since I believe it will lead to a proliferation of the API.

On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <[email protected]> wrote:

> I have thoughts on the subject of whether to have APIs just for the
> lowest-level building blocks versus having APIs for higher-level
> constructs. Specifically this applies to providing only unsorted multimap
> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
> time-ordered buffer; if it turns out to be easy to go all the way to sorted
> multimap that's nice-to-have; if it turns out to be easy to implement on
> top of unsorted map state that should probably be under the hood
>
> Reasons to build low-level multimap in the runner & fn api and layer
> higher-level things in the SDK:
>
>  - It is less implementation for runners if they have to only provide
> fewer lower-level building blocks like multimap state.
>  - There are many more runners than SDKs (and will be even more and more)
> so this saves overall.
>
> Reasons to build higher-level constructs directly in the runner and fn api:
>
>  - Having multiple higher-level state types may actually be less
> implementation than one complex state type, especially if they map to
> runner primitives.
>  - The runner may have better specialized implementations, especially for
> something like a time-ordered buffer.
>  - The particular access patterns in an SDK-based implementation may not
> be ideal for each runner's underlying implementation of the low-level
> building block.
>  - There may be excessive gRPC overhead even for optimal access patterns.
>
> There are ways to have best of both worlds, like:
>
> 1. Define multiple state types according to fundamental access patterns,
> like we did this before portability.
> 2. If it is easy to layer one on top of the other, do that inside the
> runner. Provide shared code so for runners providing the lowest-level
> primitive they get all the types for free.
>
> I understand that this is an oversimplification. It still creates some
> more work. And APIs are a burden so it is good to introduce as few as
> possible for maintenance. But it has performance benefits and also unblocks
> "just doing the simple and useful thing now" which I always like to do as
> long as it is compatible with future changes. If the APIs are fundamental,
> like sets, maps, timestamp ordering, then it is safe to guess that they
> will change rarely and be useful forever.
>
> Kenn
>
> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <[email protected]> wrote:
>
>> I would be glad to take a stab at how to provide sorting on top of
>> unsorted multimap state.
>> Based upon your description, you want integer keys representing
>> timestamps and arbitrary user value for the values, is that correct?
>> What kinds of operations do you need on the sorted map state in order of
>> efficiency requirements?
>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
>> What kinds of operations do we expect the underlying unsorted map state
>> to be able to provide?
>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>> enumerate(K)?)
>>
>> I went through a similar exercise of how to provide a list like side
>> input view over a multimap[1] side input which efficiently allowed
>> computation of size and provided random access while only having access to
>> get(K) and enumerate K's.
>>
>> 1:
>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>>
>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <[email protected]> wrote:
>>
>>> Bringing this subject up again,
>>>
>>> I've spent some time looking into implementing this for the Dataflow
>>> runner. I'm unable to find a way to implement the arbitrary sorted multimap
>>> efficiently for the case where there are large numbers of unique keys.
>>> Since the primary driving use case is timestamp ordering (i.e. key is event
>>> timestamp), you would expect to have nearly a new key per element. I
>>> considered Luke's suggestion above, but unfortunately it doesn't really
>>> solve this issue.
>>>
>>> The primary use case for sorting always seems to be sorting by
>>> timestamp. I want to propose that instead of building the fully-general
>>> sorted multimap, we instead focus on a state type where the sort key is an
>>> integral type (like a timestamp or an integer). There is still a valid use
>>> case for multimap, but we can provide that as an unordered state. At least
>>> for Dataflow, it will be much easier
>>>
>>> While my difficulties here may be specific to the Dataflow runner, any
>>> such support would have to be built into other runners as well, and
>>> limiting to integral sorting likely makes it easier for other runners to
>>> implement this. Also, if you look at this
>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
>>>  Flink
>>> comment pointed out by Aljoscha, for Flink the main use case identified was
>>> also timestamp sorting. This will also simplify the API design for this
>>> feature: Sorted multimap with arbitrary keys would require us to introduce
>>> a way of mapping natural ordering to encoded ordering (i.e. a new
>>> OrderPreservingCoder), but if we limit sort keys to integral types, the API
>>> design is simpler as integral types can be represented directly.
>>>
>>> Reuven
>>>
>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> This sounds to me like a potential runner strategy. However if a runner
>>>> can natively support sorted maps (e.g. we expect the Dataflow runner to be
>>>> able to do so, and I think it would be useful for other runners as well),
>>>> then it's probably preferable to allow the runner to use its native
>>>> capabilities.
>>>>
>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> For the API that you proposed, the map key is always "void" and the
>>>>> sort key == user key. So in my example of
>>>>> key: dummy value
>>>>> key.000: token, (0001, value4)
>>>>> key.001: token, (0010, value1), (0011, value2)
>>>>> key.01: token
>>>>> key.1: token, (1011, value3)
>>>>> you would have:
>>>>> "void": dummy value
>>>>> "void".000: token, (0001, value4)
>>>>> "void".001: token, (0010, value1), (0011, value2)
>>>>> "void".01: token
>>>>> "void".1: token, (1011, value3)
>>>>>
>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the
>>>>> the prefixes until you find a common prefix for K and then filter for
>>>>> values where they have a sort key <= K. Using the example above, to find
>>>>> entriesUntil(0010) you would:
>>>>> look for key."", miss
>>>>> look for key.0, miss
>>>>> look for key.00, miss
>>>>> look for key.000, hit, sort all contained values using secondary key,
>>>>> provide value4 to user
>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort
>>>>> all contained values using secondary key, filter out value2 and provide
>>>>> value1
>>>>>
>>>>> void removeUntil(K limit) also translates into walking the prefixes
>>>>> but instead we will clear them when we have a "hit" with some special 
>>>>> logic
>>>>> for when the sort key is a prefix of the key. Used the example, to
>>>>> removeUntil(0010) you would:
>>>>> look for key."", miss
>>>>> look for key.0, miss
>>>>> look for key.00, miss
>>>>> look for key.000, hit, clear
>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort
>>>>> all contained values using secondary key, store in memory all values that 
>>>>> >
>>>>> 0010, clear and append values stored in memory.
>>>>>
>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Can you explain how fetching and deleting ranges of keys would work
>>>>>> with this data structure?
>>>>>>
>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> Reuven, for the example, I assume that we never want to store more
>>>>>>> then 2 values at a given sort key prefix, and if we do then we will 
>>>>>>> create
>>>>>>> a new longer prefix splitting up the values based upon the sort key.
>>>>>>>
>>>>>>> Tuple representation in examples below is (key, sort key, value) and
>>>>>>> . is a character outside of the alphabet which can be represented by 
>>>>>>> using
>>>>>>> an escaping encoding that wraps the key + sort key encoding.
>>>>>>>
>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of
>>>>>>> 0010 finding one that is not empty. In this case its 0, so we append 
>>>>>>> value
>>>>>>> to the map at key.0 ending up with (we also set the key to any dummy 
>>>>>>> value
>>>>>>> to know that it it contains values):
>>>>>>> key: dummy value
>>>>>>> key."": token, (0010, value1)
>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all the
>>>>>>> prefixes of 0010, finding "", so we append value2 to key."" ending up 
>>>>>>> with:
>>>>>>> key: dummy value
>>>>>>> key."": token, (0010, value1), (0011, value2)
>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all the
>>>>>>> prefixes of 1011 finding "" but notice that it is full, so we partition 
>>>>>>> all
>>>>>>> the values into two prefixes 0 and 1. We also clear the "" prefix 
>>>>>>> ending up
>>>>>>> with:
>>>>>>> key: dummy value
>>>>>>> key.0: token, (0010, value1), (0011, value2)
>>>>>>> key.1: token, (1011, value3)
>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all the
>>>>>>> prefixes of the value finding 0 but notice that it is full, so we 
>>>>>>> partition
>>>>>>> all the values into two prefixes 00 and 01 but notice this doesn't help 
>>>>>>> us
>>>>>>> since 00 will be too full so we split 00 again to 000, 001. We also 
>>>>>>> clear
>>>>>>> the 0 prefix ending up with:
>>>>>>> key: dummy value
>>>>>>> key.000: token, (0001, value4)
>>>>>>> key.001: token, (0010, value1), (0011, value2)
>>>>>>> key.01: token
>>>>>>> key.1: token, (1011, value3)
>>>>>>>
>>>>>>> We are effectively building a trie[1] where we only have values at
>>>>>>> the leaves and control how full each leaf can be. There are other trie
>>>>>>> representations like a radix tree that may be better.
>>>>>>>
>>>>>>> Looking up the values in sorted order for "key" would go like this:
>>>>>>> Is key set, yes
>>>>>>> look for key."", miss
>>>>>>> look for key.0, miss
>>>>>>> look for key.00, miss
>>>>>>> look for key.000, hit, sort all contained values using secondary
>>>>>>> key, provide value4 to user
>>>>>>> look for key.001, hit, sort all contained values using secondary
>>>>>>> key, provide value1 followed by value2 to user
>>>>>>> look for key.01, hit, empty, return no values to user
>>>>>>> look for key.1, hit, sort all contained values using secondary key,
>>>>>>> provide value3 to user
>>>>>>> we have walked the entire prefix space, signal end of iterable
>>>>>>>
>>>>>>> Some notes for the above:
>>>>>>> * The dummy value is used to know that the key contains values and
>>>>>>> the token is to know whether there are any values deeper in the trie so
>>>>>>> when we know when to stop searching.
>>>>>>> * If we can recalculate the sort key from the combination of the key
>>>>>>> and value, then we don't need to store it.
>>>>>>> * Keys with lots of values will perform worse then keys with less
>>>>>>> values since we have to look up more keys but they will be empty reads. 
>>>>>>> The
>>>>>>> number of misses can be controlled by how many elements we are willing 
>>>>>>> to
>>>>>>> store at a given node before we subdivide.
>>>>>>>
>>>>>>> In reality you could build a lot of structures (e.g. red black tree,
>>>>>>> binary tree) using the sort key, the issue is the cost of
>>>>>>> rebalancing/re-organizing the structure in map form and whether it has a
>>>>>>> convenient pre-order traversal for lookups.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Some great comments!
>>>>>>>>
>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by
>>>>>>>> runners to be efficient. We can of course provide a default 
>>>>>>>> (inefficient)
>>>>>>>> implementation, but ideally runners would provide better ones.
>>>>>>>>
>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by this.
>>>>>>>> E.g.
>>>>>>>>
>>>>>>>> *Robert* Great point about standard coders not satisfying this.
>>>>>>>> That's why I suggested that we provide a way to tag the coders that do
>>>>>>>> preserve order, and only accept those as key coders Alternatively we 
>>>>>>>> could
>>>>>>>> present a more limited API - e.g. only allowing a hard-coded set of 
>>>>>>>> types
>>>>>>>> to be used as keys - but that seems counter to the direction Beam 
>>>>>>>> usually
>>>>>>>> goes. So users will have two ways .of creating multimap state specs:
>>>>>>>>
>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>>>>>>
>>>>>>>> or
>>>>>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>>>>>>
>>>>>>>> The second one will validate that the key coder preserves order,
>>>>>>>> and fails otherwise (similar to coder determinism checking in 
>>>>>>>> GroupByKey).
>>>>>>>> (BTW we would also have versions of these functions that use coder
>>>>>>>> inference to "guess" the coder, but those will do the same checking)
>>>>>>>>
>>>>>>>> Also the API I proposed did support random access! We could
>>>>>>>> separate out OrderedBagState again if we think the use cases are
>>>>>>>> fundamentally different. I merged the proposal into that of 
>>>>>>>> MultimapState
>>>>>>>> because there seemed be 99% overlap.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> A few obvious problems with this code:
>>>>>>>>> >>>>>   1. Removing the elements already processed from the bag
>>>>>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in the
>>>>>>>>> number of input trades.
>>>>>>>>> >>>>
>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state?
>>>>>>>>> >>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State {
>>>>>>>>> >>>>>   // Add a value to the map.
>>>>>>>>> >>>>>   void put(K key, V value);
>>>>>>>>> >>>>>   // Get all values for a given key.
>>>>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>>>>>>>> >>>>>  // Return all entries in the map.
>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>>>>>>>> >>>>>   // Return all entries in the map with keys <= limit.
>>>>>>>>> returned elements are sorted by the key.
>>>>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit);
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>  // Remove all values with the given key;
>>>>>>>>> >>>>>   void remove(K key);
>>>>>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
>>>>>>>>> >>>>>   void removeUntil(K limit);
>>>>>>>>> >>>>
>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove all
>>>>>>>>> entries in the map with keys < limit.
>>>>>>>>> >>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key. In
>>>>>>>>> order to make this easier for users, I propose that we introduce a 
>>>>>>>>> new tag
>>>>>>>>> on Coders PreservesOrder. A Coder that contains this tag guarantees 
>>>>>>>>> that
>>>>>>>>> the encoded value preserves the same ordering as the base Java type.
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>> Could you clarify what is  "encoded value preserves the same
>>>>>>>>> ordering as the base Java type"?
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>> Lets say A and B represent two different instances of the same
>>>>>>>>> Java type like a double, then A < B (using the languages comparison
>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are 
>>>>>>>>> compared
>>>>>>>>> lexicographically)
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff
>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What 
>>>>>>>>> happens A,
>>>>>>>>> B sort differently in different languages?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > That would have to be the property of the coder (which means
>>>>>>>>> that this property probably needs to be represented in the portability
>>>>>>>>> representation of the coder). I imagine the common use cases will be 
>>>>>>>>> for
>>>>>>>>> simple coders like int, long, string, etc., which are likely to sort 
>>>>>>>>> the
>>>>>>>>> same in most languages.
>>>>>>>>>
>>>>>>>>> The standard coders for both double and integral types do not
>>>>>>>>> respect
>>>>>>>>> the natural ordering (consider negative values). KV coders violate
>>>>>>>>> the
>>>>>>>>> "natural" lexicographic ordering on components as well. I think
>>>>>>>>> implicitly sorting on encoded value would yield many surprises.
>>>>>>>>> (The
>>>>>>>>> state, of course, could take a order-preserving, bytes
>>>>>>>>> (string?)-producing callable as a parameter of course). (As for
>>>>>>>>> naming, I'd probably call this OrderedBagState or something like
>>>>>>>>> that...rather than Map which tends to imply random access.)
>>>>>>>>>
>>>>>>>>

Reply via email to