This sounds to me like a potential runner strategy. However if a runner can
natively support sorted maps (e.g. we expect the Dataflow runner to be able
to do so, and I think it would be useful for other runners as well), then
it's probably preferable to allow the runner to use its native capabilities.

On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <> wrote:

> For the API that you proposed, the map key is always "void" and the sort
> key == user key. So in my example of
> key: dummy value
> key.000: token, (0001, value4)
> key.001: token, (0010, value1), (0011, value2)
> key.01: token
> key.1: token, (1011, value3)
> you would have:
> "void": dummy value
> "void".000: token, (0001, value4)
> "void".001: token, (0010, value1), (0011, value2)
> "void".01: token
> "void".1: token, (1011, value3)
> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the the
> prefixes until you find a common prefix for K and then filter for values
> where they have a sort key <= K. Using the example above, to find
> entriesUntil(0010) you would:
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, sort all contained values using secondary key,
> provide value4 to user
> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
> contained values using secondary key, filter out value2 and provide value1
> void removeUntil(K limit) also translates into walking the prefixes but
> instead we will clear them when we have a "hit" with some special logic for
> when the sort key is a prefix of the key. Used the example, to
> removeUntil(0010) you would:
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, clear
> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
> contained values using secondary key, store in memory all values that >
> 0010, clear and append values stored in memory.
> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <> wrote:
>> Can you explain how fetching and deleting ranges of keys would work with
>> this data structure?
>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <> wrote:
>>> Reuven, for the example, I assume that we never want to store more then
>>> 2 values at a given sort key prefix, and if we do then we will create a new
>>> longer prefix splitting up the values based upon the sort key.
>>> Tuple representation in examples below is (key, sort key, value) and .
>>> is a character outside of the alphabet which can be represented by using an
>>> escaping encoding that wraps the key + sort key encoding.
>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of
>>> 0010 finding one that is not empty. In this case its 0, so we append value
>>> to the map at key.0 ending up with (we also set the key to any dummy value
>>> to know that it it contains values):
>>> key: dummy value
>>> key."": token, (0010, value1)
>>> Now we insert (key, 0011, value2), we again lookup "key" + all the
>>> prefixes of 0010, finding "", so we append value2 to key."" ending up with:
>>> key: dummy value
>>> key."": token, (0010, value1), (0011, value2)
>>> Now we insert (key, 1011, value3), we again lookup "key" + all the
>>> prefixes of 1011 finding "" but notice that it is full, so we partition all
>>> the values into two prefixes 0 and 1. We also clear the "" prefix ending up
>>> with:
>>> key: dummy value
>>> key.0: token, (0010, value1), (0011, value2)
>>> key.1: token, (1011, value3)
>>> Now we insert (key, 0001, value4), we again lookup "key" + all the
>>> prefixes of the value finding 0 but notice that it is full, so we partition
>>> all the values into two prefixes 00 and 01 but notice this doesn't help us
>>> since 00 will be too full so we split 00 again to 000, 001. We also clear
>>> the 0 prefix ending up with:
>>> key: dummy value
>>> key.000: token, (0001, value4)
>>> key.001: token, (0010, value1), (0011, value2)
>>> key.01: token
>>> key.1: token, (1011, value3)
>>> We are effectively building a trie[1] where we only have values at the
>>> leaves and control how full each leaf can be. There are other trie
>>> representations like a radix tree that may be better.
>>> Looking up the values in sorted order for "key" would go like this:
>>> Is key set, yes
>>> look for key."", miss
>>> look for key.0, miss
>>> look for key.00, miss
>>> look for key.000, hit, sort all contained values using secondary key,
>>> provide value4 to user
>>> look for key.001, hit, sort all contained values using secondary key,
>>> provide value1 followed by value2 to user
>>> look for key.01, hit, empty, return no values to user
>>> look for key.1, hit, sort all contained values using secondary key,
>>> provide value3 to user
>>> we have walked the entire prefix space, signal end of iterable
>>> Some notes for the above:
>>> * The dummy value is used to know that the key contains values and the
>>> token is to know whether there are any values deeper in the trie so when we
>>> know when to stop searching.
>>> * If we can recalculate the sort key from the combination of the key and
>>> value, then we don't need to store it.
>>> * Keys with lots of values will perform worse then keys with less values
>>> since we have to look up more keys but they will be empty reads. The number
>>> of misses can be controlled by how many elements we are willing to store at
>>> a given node before we subdivide.
>>> In reality you could build a lot of structures (e.g. red black tree,
>>> binary tree) using the sort key, the issue is the cost of
>>> rebalancing/re-organizing the structure in map form and whether it has a
>>> convenient pre-order traversal for lookups.
>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <> wrote:
>>>> Some great comments!
>>>> *Aljoscha*: absolutely this would have to be implemented by runners to
>>>> be efficient. We can of course provide a default (inefficient)
>>>> implementation, but ideally runners would provide better ones.
>>>> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>>>> *Robert* Great point about standard coders not satisfying this. That's
>>>> why I suggested that we provide a way to tag the coders that do preserve
>>>> order, and only accept those as key coders Alternatively we could present a
>>>> more limited API - e.g. only allowing a hard-coded set of types to be used
>>>> as keys - but that seems counter to the direction Beam usually goes. So
>>>> users will have two ways .of creating multimap state specs:
>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>> or
>>>>    private final StateSpec<MultimapState<Long, String>> state =
>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>>>> The second one will validate that the key coder preserves order, and
>>>> fails otherwise (similar to coder determinism checking in GroupByKey). (BTW
>>>> we would also have versions of these functions that use coder inference to
>>>> "guess" the coder, but those will do the same checking)
>>>> Also the API I proposed did support random access! We could separate
>>>> out OrderedBagState again if we think the use cases are fundamentally
>>>> different. I merged the proposal into that of MultimapState because there
>>>> seemed be 99% overlap.
>>>> Reuven
>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <>
>>>> wrote:
>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <> wrote:
>>>>> >
>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <>
>>>>> wrote:
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <>
>>>>> wrote:
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <>
>>>>> wrote:
>>>>> >>>>>
>>>>> >>>>> A few obvious problems with this code:
>>>>> >>>>>   1. Removing the elements already processed from the bag
>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in the
>>>>> number of input trades.
>>>>> >>>>
>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state?
>>>>> >>>>
>>>>> >>>>>
>>>>> >>>>> public interface SortedMultimapState<K, V> extends State {
>>>>> >>>>>   // Add a value to the map.
>>>>> >>>>>   void put(K key, V value);
>>>>> >>>>>   // Get all values for a given key.
>>>>> >>>>>   ReadableState<Iterable<V>> get(K key);
>>>>> >>>>>  // Return all entries in the map.
>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>>>> >>>>>   // Return all entries in the map with keys <= limit. returned
>>>>> elements are sorted by the key.
>>>>> >>>>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit);
>>>>> >>>>>
>>>>> >>>>>  // Remove all values with the given key;
>>>>> >>>>>   void remove(K key);
>>>>> >>>>>  // Remove all entries in the map with keys <= limit.
>>>>> >>>>>   void removeUntil(K limit);
>>>>> >>>>
>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove all
>>>>> entries in the map with keys < limit.
>>>>> >>>>
>>>>> >>>>>
>>>>> >>>>> Runners will sort based on the encoded value of the key. In
>>>>> order to make this easier for users, I propose that we introduce a new tag
>>>>> on Coders PreservesOrder. A Coder that contains this tag guarantees that
>>>>> the encoded value preserves the same ordering as the base Java type.
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> Could you clarify what is  "encoded value preserves the same
>>>>> ordering as the base Java type"?
>>>>> >>>
>>>>> >>>
>>>>> >>> Lets say A and B represent two different instances of the same
>>>>> Java type like a double, then A < B (using the languages comparison
>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are 
>>>>> compared
>>>>> lexicographically)
>>>>> >>
>>>>> >>
>>>>> >> Since coders are shared across SDKs, do we expect A < B iff e(A) <
>>>>> e(P) property to hold for all languages we support? What happens A, B sort
>>>>> differently in different languages?
>>>>> >
>>>>> >
>>>>> > That would have to be the property of the coder (which means that
>>>>> this property probably needs to be represented in the portability
>>>>> representation of the coder). I imagine the common use cases will be for
>>>>> simple coders like int, long, string, etc., which are likely to sort the
>>>>> same in most languages.
>>>>> The standard coders for both double and integral types do not respect
>>>>> the natural ordering (consider negative values). KV coders violate the
>>>>> "natural" lexicographic ordering on components as well. I think
>>>>> implicitly sorting on encoded value would yield many surprises. (The
>>>>> state, of course, could take a order-preserving, bytes
>>>>> (string?)-producing callable as a parameter of course). (As for
>>>>> naming, I'd probably call this OrderedBagState or something like
>>>>> that...rather than Map which tends to imply random access.)

Reply via email to