Bringing this subject up again,

I've spent some time looking into implementing this for the Dataflow
runner. I'm unable to find a way to implement the arbitrary sorted multimap
efficiently for the case where there are large numbers of unique keys.
Since the primary driving use case is timestamp ordering (i.e. key is event
timestamp), you would expect to have nearly a new key per element. I
considered Luke's suggestion above, but unfortunately it doesn't really
solve this issue.

The primary use case for sorting always seems to be sorting by timestamp. I
want to propose that instead of building the fully-general sorted multimap,
we instead focus on a state type where the sort key is an integral type
(like a timestamp or an integer). There is still a valid use case for
multimap, but we can provide that as an unordered state. At least for
Dataflow, it will be much easier

While my difficulties here may be specific to the Dataflow runner, any such
support would have to be built into other runners as well, and limiting to
integral sorting likely makes it easier for other runners to implement
this. Also, if you look at this
<https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
Flink
comment pointed out by Aljoscha, for Flink the main use case identified was
also timestamp sorting. This will also simplify the API design for this
feature: Sorted multimap with arbitrary keys would require us to introduce
a way of mapping natural ordering to encoded ordering (i.e. a new
OrderPreservingCoder), but if we limit sort keys to integral types, the API
design is simpler as integral types can be represented directly.

Reuven

On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> wrote:

> This sounds to me like a potential runner strategy. However if a runner
> can natively support sorted maps (e.g. we expect the Dataflow runner to be
> able to do so, and I think it would be useful for other runners as well),
> then it's probably preferable to allow the runner to use its native
> capabilities.
>
> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> For the API that you proposed, the map key is always "void" and the sort
>> key == user key. So in my example of
>> key: dummy value
>> key.000: token, (0001, value4)
>> key.001: token, (0010, value1), (0011, value2)
>> key.01: token
>> key.1: token, (1011, value3)
>> you would have:
>> "void": dummy value
>> "void".000: token, (0001, value4)
>> "void".001: token, (0010, value1), (0011, value2)
>> "void".01: token
>> "void".1: token, (1011, value3)
>>
>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the the
>> prefixes until you find a common prefix for K and then filter for values
>> where they have a sort key <= K. Using the example above, to find
>> entriesUntil(0010) you would:
>> look for key."", miss
>> look for key.0, miss
>> look for key.00, miss
>> look for key.000, hit, sort all contained values using secondary key,
>> provide value4 to user
>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
>> contained values using secondary key, filter out value2 and provide value1
>>
>> void removeUntil(K limit) also translates into walking the prefixes but
>> instead we will clear them when we have a "hit" with some special logic for
>> when the sort key is a prefix of the key. Used the example, to
>> removeUntil(0010) you would:
>> look for key."", miss
>> look for key.0, miss
>> look for key.00, miss
>> look for key.000, hit, clear
>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
>> contained values using secondary key, store in memory all values that >
>> 0010, clear and append values stored in memory.
>>
>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Can you explain how fetching and deleting ranges of keys would work with
>>> this data structure?
>>>
>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Reuven, for the example, I assume that we never want to store more then
>>>> 2 values at a given sort key prefix, and if we do then we will create a new
>>>> longer prefix splitting up the values based upon the sort key.
>>>>
>>>> Tuple representation in examples below is (key, sort key, value) and .
>>>> is a character outside of the alphabet which can be represented by using an
>>>> escaping encoding that wraps the key + sort key encoding.
>>>>
>>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of
>>>> 0010 finding one that is not empty. In this case its 0, so we append value
>>>> to the map at key.0 ending up with (we also set the key to any dummy value
>>>> to know that it it contains values):
>>>> key: dummy value
>>>> key."": token, (0010, value1)
>>>> Now we insert (key, 0011, value2), we again lookup "key" + all the
>>>> prefixes of 0010, finding "", so we append value2 to key."" ending up with:
>>>> key: dummy value
>>>> key."": token, (0010, value1), (0011, value2)
>>>> Now we insert (key, 1011, value3), we again lookup "key" + all the
>>>> prefixes of 1011 finding "" but notice that it is full, so we partition all
>>>> the values into two prefixes 0 and 1. We also clear the "" prefix ending up
>>>> with:
>>>> key: dummy value
>>>> key.0: token, (0010, value1), (0011, value2)
>>>> key.1: token, (1011, value3)
>>>> Now we insert (key, 0001, value4), we again lookup "key" + all the
>>>> prefixes of the value finding 0 but notice that it is full, so we partition
>>>> all the values into two prefixes 00 and 01 but notice this doesn't help us
>>>> since 00 will be too full so we split 00 again to 000, 001. We also clear
>>>> the 0 prefix ending up with:
>>>> key: dummy value
>>>> key.000: token, (0001, value4)
>>>> key.001: token, (0010, value1), (0011, value2)
>>>> key.01: token
>>>> key.1: token, (1011, value3)
>>>>
>>>> We are effectively building a trie[1] where we only have values at the
>>>> leaves and control how full each leaf can be. There are other trie
>>>> representations like a radix tree that may be better.
>>>>
>>>> Looking up the values in sorted order for "key" would go like this:
>>>> Is key set, yes
>>>> look for key."", miss
>>>> look for key.0, miss
>>>> look for key.00, miss
>>>> look for key.000, hit, sort all contained values using secondary key,
>>>> provide value4 to user
>>>> look for key.001, hit, sort all contained values using secondary key,
>>>> provide value1 followed by value2 to user
>>>> look for key.01, hit, empty, return no values to user
>>>> look for key.1, hit, sort all contained values using secondary key,
>>>> provide value3 to user
>>>> we have walked the entire prefix space, signal end of iterable
>>>>
>>>> Some notes for the above:
>>>> * The dummy value is used to know that the key contains values and the
>>>> token is to know whether there are any values deeper in the trie so when we
>>>> know when to stop searching.
>>>> * If we can recalculate the sort key from the combination of the key
>>>> and value, then we don't need to store it.
>>>> * Keys with lots of values will perform worse then keys with less
>>>> values since we have to look up more keys but they will be empty reads. The
>>>> number of misses can be controlled by how many elements we are willing to
>>>> store at a given node before we subdivide.
>>>>
>>>> In reality you could build a lot of structures (e.g. red black tree,
>>>> binary tree) using the sort key, the issue is the cost of
>>>> rebalancing/re-organizing the structure in map form and whether it has a
>>>> convenient pre-order traversal for lookups.
>>>>
>>>>
>>>>
>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Some great comments!
>>>>>
>>>>> *Aljoscha*: absolutely this would have to be implemented by runners
>>>>> to be efficient. We can of course provide a default (inefficient)
>>>>> implementation, but ideally runners would provide better ones.
>>>>>
>>>>> *Jan* Exactly. I think MapState can be dropped or backed by this.
>>>>> E.g.
>>>>>
>>>>> *Robert* Great point about standard coders not satisfying this.
>>>>> That's why I suggested that we provide a way to tag the coders that do
>>>>> preserve order, and only accept those as key coders Alternatively we could
>>>>> present a more limited API - e.g. only allowing a hard-coded set of types
>>>>> to be used as keys - but that seems counter to the direction Beam usually
>>>>> goes. So users will have two ways .of creating multimap state specs:
>>>>>
>>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>>>
>>>>> or
>>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>>>
>>>>> The second one will validate that the key coder preserves order, and
>>>>> fails otherwise (similar to coder determinism checking in GroupByKey). 
>>>>> (BTW
>>>>> we would also have versions of these functions that use coder inference to
>>>>> "guess" the coder, but those will do the same checking)
>>>>>
>>>>> Also the API I proposed did support random access! We could separate
>>>>> out OrderedBagState again if we think the use cases are fundamentally
>>>>> different. I merged the proposal into that of MultimapState because there
>>>>> seemed be 99% overlap.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>> >
>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <al...@google.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <ruw...@google.com>
>>>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>> A few obvious problems with this code:
>>>>>> >>>>>   1. Removing the elements already processed from the bag
>>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in the
>>>>>> number of input trades.
>>>>>> >>>>
>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state?
>>>>>> >>>>
>>>>>> >>>>>
>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State {
>>>>>> >>>>>   // Add a value to the map.
>>>>>> >>>>>   void put(K key, V value);
>>>>>> >>>>>   // Get all values for a given key.
>>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>>>>> >>>>>  // Return all entries in the map.
>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>>>>> >>>>>   // Return all entries in the map with keys <= limit. returned
>>>>>> elements are sorted by the key.
>>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit);
>>>>>> >>>>>
>>>>>> >>>>>  // Remove all values with the given key;
>>>>>> >>>>>   void remove(K key);
>>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
>>>>>> >>>>>   void removeUntil(K limit);
>>>>>> >>>>
>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove all
>>>>>> entries in the map with keys < limit.
>>>>>> >>>>
>>>>>> >>>>>
>>>>>> >>>>> Runners will sort based on the encoded value of the key. In
>>>>>> order to make this easier for users, I propose that we introduce a new 
>>>>>> tag
>>>>>> on Coders PreservesOrder. A Coder that contains this tag guarantees that
>>>>>> the encoded value preserves the same ordering as the base Java type.
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> Could you clarify what is  "encoded value preserves the same
>>>>>> ordering as the base Java type"?
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Lets say A and B represent two different instances of the same
>>>>>> Java type like a double, then A < B (using the languages comparison
>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are 
>>>>>> compared
>>>>>> lexicographically)
>>>>>> >>
>>>>>> >>
>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff e(A) <
>>>>>> e(P) property to hold for all languages we support? What happens A, B 
>>>>>> sort
>>>>>> differently in different languages?
>>>>>> >
>>>>>> >
>>>>>> > That would have to be the property of the coder (which means that
>>>>>> this property probably needs to be represented in the portability
>>>>>> representation of the coder). I imagine the common use cases will be for
>>>>>> simple coders like int, long, string, etc., which are likely to sort the
>>>>>> same in most languages.
>>>>>>
>>>>>> The standard coders for both double and integral types do not respect
>>>>>> the natural ordering (consider negative values). KV coders violate the
>>>>>> "natural" lexicographic ordering on components as well. I think
>>>>>> implicitly sorting on encoded value would yield many surprises. (The
>>>>>> state, of course, could take a order-preserving, bytes
>>>>>> (string?)-producing callable as a parameter of course). (As for
>>>>>> naming, I'd probably call this OrderedBagState or something like
>>>>>> that...rather than Map which tends to imply random access.)
>>>>>>
>>>>>

Reply via email to