Big +1 for a BIP, as this might really help clarify all the pros and cons of all possibilities. There seem to be questions that need answering and motivating use cases - do we need sorted map state or can we solve our use cases by something simpler - e.g. the mentioned TimeSortedBagState? Does that really have to be time-sorted structure, or does it "only" have to have operations that can efficiently find and remove element with smallest timestamp (like a PriorityQueue)?

Jan

On 6/18/20 5:32 AM, Kenneth Knowles wrote:
Zooming in from generic philosophy to be clear: adding time ordered buffer to the Fn state API is *not* a shortcut.It has benefits that will not be achieved by SDK-side implementation on top of either ordered or unordered multimap. Are those benefits worth expanding the API? I don't know.

A change to allow a runner to have a specialized implementation for time-buffered state would be one or more StateKey types, right? Reuven, maybe put this and your Java API in a doc? A BIP? Seems like there's at least the following to explore:

 - how that Java API would map to an SDK-side implementation on top of multimap state key
 - how that Java API would map to a new StateKey
 - whether there's actually more than one relevant implementation of that StateKey  - whether SDK-side implementation on some other state key would be performant enough in all SDK languages (present and future)

Zooming back out to generic philosophy: Proliferation of StateKey types tuned by runners (which can very easily still share implementation) is probably better than proliferation of complex SDK-side implementations with varying completeness and performance.

Kenn

On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

    It might help for me to describe what I have in mind. I'm still
    proposing that we build multimap, just not a globally-sorted
    multimap.

    My previous proposal was that we provide a Multimap<Key, Value>
    state type, sorted by key. this would have two additional
    operations - multimap.getRange(startKey, endKey) and
    multimap.deleteRange(startKey, endKey). The primary use case was
    timestamp sorting, but I felt that a sorted multimap provided a
    nice generalization - after all, you can simply key the multimap
    by timestamp to get timestamp sorting.

    This approach had some issues immediately that would take some
    work to solve. Since a multimap key can have any type and a runner
    will only be able to sort by encoded type, we would need to
    introduce a concept of order-preserving coders into Beam and
    plumb that through. Robert pointed out that even our existing
    standard coders for simple integral types don't preserve order, so
    there will likely be surprises here.

    My current proposal is for a multimap that is not sorted by key,
    but that can support.ordered values for a single key. Remember
    that a multimap maps K -> Iterable<V>, so this means that each
    individual Iterable<V> is ordered, but the keys have no specific
    order relative to each other. This is not too different from many
    multimap implementations where the keys are unordered, but the
    list of values for a single key at least has a stable order.

    The interface would look like this:

    public interface MultimapState<K, V> extends State {
      // Add a value with a default timestamp.
      void put(K key, V value);

      // Add a timestamped value.
      void put(K, key, TimestampedValue<V> value);

      // Remove all values for a key.
      void remove (K key);

      // Remove all values for a key with timestamps within the
    specified range.
      void removeRange(K key, Instant startTs, Instant endTs);

      // Get an Iterable of values for V. The Iterable will be
    returned sorted by timestamp.
    ReadableState<Iterable<TimestampedValue<V>>> get(K key);

      // Get an Iterable of values for V in the specified range. The
    Iterable will be returned sorted by timestamp.
    ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
    Instant startTs, Instant endTs);

      ReadableState<Iterable<K>> keys();
    ReadableState<Iterable<TimestampedValue<V>>> values();
      ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
    }

    We can of course provide helper functions that allow using
    MultimapState without deailing with TimestampValue for users who
    only want a multimap and don't want sorting.

    I think many users will only need a single sorted list - not a
    full multimap. It's worth offering this as well, and we can simply
    build it on top of MultimapState. It will look like an extension
    of BagState

    public interface TimestampSortedListState<T> extends State {
      void add(TimestampedValue<T> value);
      Iterable<TimestampedValue<T>> read();
      Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
    endTs);
      void clearRange(Instant startTs, Instant endTs);
    }


    On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com
    <mailto:lc...@google.com>> wrote:

        The portability layer is meant to live across multiple
        versions of Beam and I don't think it should be treated by
        doing the simple and useful thing now since I believe it will
        lead to a proliferation of the API.

        On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles
        <k...@apache.org <mailto:k...@apache.org>> wrote:

            I have thoughts on the subject of whether to have APIs
            just for the lowest-level building blocks versus having
            APIs for higher-level constructs. Specifically this
            applies to providing only unsorted multimap vs what I will
            call "time-ordered buffer". TL;DR: I'd vote to focus on
            time-ordered buffer; if it turns out to be easy to go all
            the way to sorted multimap that's nice-to-have; if it
            turns out to be easy to implement on top of unsorted map
            state that should probably be under the hood

            Reasons to build low-level multimap in the runner & fn api
            and layer higher-level things in the SDK:

             - It is less implementation for runners if they have to
            only provide fewer lower-level building blocks like
            multimap state.
             - There are many more runners than SDKs (and will be even
            more and more) so this saves overall.

            Reasons to build higher-level constructs directly in the
            runner and fn api:

             - Having multiple higher-level state types may actually
            be less implementation than one complex state type,
            especially if they map to runner primitives.
             - The runner may have better specialized implementations,
            especially for something like a time-ordered buffer.
             - The particular access patterns in an SDK-based
            implementation may not be ideal for each runner's
            underlying implementation of the low-level building block.
             - There may be excessive gRPC overhead even for optimal
            access patterns.

            There are ways to have best of both worlds, like:

            1. Define multiple state types according to fundamental
            access patterns, like we did this before portability.
            2. If it is easy to layer one on top of the other, do that
            inside the runner. Provide shared code so for runners
            providing the lowest-level primitive they get all the
            types for free.

            I understand that this is an oversimplification. It still
            creates some more work. And APIs are a burden so it is
            good to introduce as few as possible for maintenance. But
            it has performance benefits and also unblocks "just doing
            the simple and useful thing now" which I always like to do
            as long as it is compatible with future changes. If the
            APIs are fundamental, like sets, maps, timestamp ordering,
            then it is safe to guess that they will change rarely and
            be useful forever.

            Kenn

            On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik
            <lc...@google.com <mailto:lc...@google.com>> wrote:

                I would be glad to take a stab at how to provide
                sorting on top of unsorted multimap state.
                Based upon your description, you want integer keys
                representing timestamps and arbitrary user value for
                the values, is that correct?
                What kinds of operations do you need on the sorted map
                state in order of efficiency requirements?
                (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
                ClearAll(Range[x, y))
                What kinds of operations do we expect the underlying
                unsorted map state to be able to provide?
                (at a minimum Get(K), Append(K), Clear(K) but what
                else e.g. enumerate(K)?)

                I went through a similar exercise of how to provide a
                list like side input view over a multimap[1] side
                input which efficiently allowed computation of size
                and provided random access while only having access to
                get(K) and enumerate K's.

                1:
                
https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568

                On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax
                <re...@google.com <mailto:re...@google.com>> wrote:

                    Bringing this subject up again,

                    I've spent some time looking into implementing
                    this for the Dataflow runner. I'm unable to find a
                    way to implement the arbitrary sorted multimap
                    efficiently for the case where there are large
                    numbers of unique keys. Since the primary driving
                    use case is timestamp ordering (i.e. key is event
                    timestamp), you would expect to have nearly a new
                    key per element. I considered Luke's suggestion
                    above, but unfortunately it doesn't really solve
                    this issue.

                    The primary use case for sorting always seems to
                    be sorting by timestamp. I want to propose that
                    instead of building the fully-general sorted
                    multimap, we instead focus on a state type where
                    the sort key is an integral type (like a timestamp
                    or an integer). There is still a valid use case
                    for multimap, but we can provide that as an
                    unordered state. At least for Dataflow, it will be
                    much easier

                    While my difficulties here may be specific to the
                    Dataflow runner, any such support would have to be
                    built into other runners as well, and limiting to
                    integral sorting likely makes it easier for other
                    runners to implement this. Also, if you look at
                    this
                    
<https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
 Flink
                    comment pointed out by Aljoscha, for Flink the
                    main use case identified was also timestamp
                    sorting. This will also simplify the API design
                    for this feature: Sorted multimap with arbitrary
                    keys would require us to introduce a way of
                    mapping natural ordering to encoded ordering (i.e.
                    a new OrderPreservingCoder), but if we limit sort
                    keys to integral types, the API design is simpler
                    as integral types can be represented directly.

                    Reuven

                    On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax
                    <re...@google.com <mailto:re...@google.com>> wrote:

                        This sounds to me like a potential runner
                        strategy. However if a runner can natively
                        support sorted maps (e.g. we expect the
                        Dataflow runner to be able to do so, and I
                        think it would be useful for other runners as
                        well), then it's probably preferable to allow
                        the runner to use its native capabilities.

                        On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik
                        <lc...@google.com <mailto:lc...@google.com>>
                        wrote:

                            For the API that you proposed, the map key
                            is always "void" and the sort key == user
                            key. So in my example of
                            key: dummy value
                            key.000: token, (0001, value4)
                            key.001: token, (0010, value1), (0011, value2)
                            key.01: token
                            key.1: token, (1011, value3)
                            you would have:
                            "void": dummy value
                            "void".000: token, (0001, value4)
                            "void".001: token, (0010, value1), (0011,
                            value2)
                            "void".01: token
                            "void".1: token, (1011, value3)

                            Iterable<KV<K, V>> entriesUntil(K limit)
                            translates into walking the the prefixes
                            until you find a common prefix for K and
                            then filter for values where they have a
                            sort key <= K. Using the example above, to
                            find entriesUntil(0010) you would:
                            look for key."", miss
                            look for key.0, miss
                            look for key.00, miss
                            look for key.000, hit, sort all contained
                            values using secondary key, provide value4
                            to user
                            look for key.001, hit, notice that 001 is
                            a prefix of 0010 so we sort all contained
                            values using secondary key, filter out
                            value2 and provide value1

                            void removeUntil(K limit) also translates
                            into walking the prefixes but instead we
                            will clear them when we have a "hit" with
                            some special logic for when the sort key
                            is a prefix of the key. Used the example,
                            to removeUntil(0010) you would:
                            look for key."", miss
                            look for key.0, miss
                            look for key.00, miss
                            look for key.000, hit, clear
                            look for key.001, hit, notice that 001 is
                            a prefix of 0010 so we sort all contained
                            values using secondary key, store in
                            memory all values that > 0010, clear and
                            append values stored in memory.

                            On Fri, May 24, 2019 at 10:36 AM Reuven
                            Lax <re...@google.com
                            <mailto:re...@google.com>> wrote:

                                Can you explain how fetching and
                                deleting ranges of keys would work
                                with this data structure?

                                On Fri, May 24, 2019 at 9:50 AM Lukasz
                                Cwik <lc...@google.com
                                <mailto:lc...@google.com>> wrote:

                                    Reuven, for the example, I assume
                                    that we never want to store more
                                    then 2 values at a given sort key
                                    prefix, and if we do then we will
                                    create a new longer prefix
                                    splitting up the values based upon
                                    the sort key.

                                    Tuple representation in examples
                                    below is (key, sort key, value)
                                    and . is a character outside of
                                    the alphabet which can be
                                    represented by using an escaping
                                    encoding that wraps the key + sort
                                    key encoding.

                                    To insert (key, 0010, value1), we
                                    lookup "key" + all the prefixes of
                                    0010 finding one that is not
                                    empty. In this case its 0, so we
                                    append value to the map at key.0
                                    ending up with (we also set the
                                    key to any dummy value to know
                                    that it it contains values):
                                    key: dummy value
                                    key."": token, (0010, value1)
                                    Now we insert (key, 0011, value2),
                                    we again lookup "key" + all the
                                    prefixes of 0010, finding "", so
                                    we append value2 to key."" ending
                                    up with:
                                    key: dummy value
                                    key."": token, (0010, value1),
                                    (0011, value2)
                                    Now we insert (key, 1011, value3),
                                    we again lookup "key" + all the
                                    prefixes of 1011 finding "" but
                                    notice that it is full, so we
                                    partition all the values into two
                                    prefixes 0 and 1. We also clear
                                    the "" prefix ending up with:
                                    key: dummy value
                                    key.0: token, (0010, value1),
                                    (0011, value2)
                                    key.1: token, (1011, value3)
                                    Now we insert (key, 0001, value4),
                                    we again lookup "key" + all the
                                    prefixes of the value finding 0
                                    but notice that it is full, so we
                                    partition all the values into two
                                    prefixes 00 and 01 but notice this
                                    doesn't help us since 00 will be
                                    too full so we split 00 again to
                                    000, 001. We also clear the 0
                                    prefix ending up with:
                                    key: dummy value
                                    key.000: token, (0001, value4)
                                    key.001: token, (0010, value1),
                                    (0011, value2)
                                    key.01: token
                                    key.1: token, (1011, value3)

                                    We are effectively building a
                                    trie[1] where we only have values
                                    at the leaves and control how full
                                    each leaf can be. There are other
                                    trie representations like a radix
                                    tree that may be better.

                                    Looking up the values in sorted
                                    order for "key" would go like this:
                                    Is key set, yes
                                    look for key."", miss
                                    look for key.0, miss
                                    look for key.00, miss
                                    look for key.000, hit, sort all
                                    contained values using secondary
                                    key, provide value4 to user
                                    look for key.001, hit, sort all
                                    contained values using secondary
                                    key, provide value1 followed by
                                    value2 to user
                                    look for key.01, hit, empty,
                                    return no values to user
                                    look for key.1, hit, sort all
                                    contained values using secondary
                                    key, provide value3 to user
                                    we have walked the entire prefix
                                    space, signal end of iterable

                                    Some notes for the above:
                                    * The dummy value is used to know
                                    that the key contains values and
                                    the token is to know whether there
                                    are any values deeper in the trie
                                    so when we know when to stop
                                    searching.
                                    * If we can recalculate the sort
                                    key from the combination of the
                                    key and value, then we don't need
                                    to store it.
                                    * Keys with lots of values will
                                    perform worse then keys with less
                                    values since we have to look up
                                    more keys but they will be empty
                                    reads. The number of misses can be
                                    controlled by how many elements we
                                    are willing to store at a given
                                    node before we subdivide.

                                    In reality you could build a lot
                                    of structures (e.g. red black
                                    tree, binary tree) using the sort
                                    key, the issue is the cost of
                                    rebalancing/re-organizing the
                                    structure in map form and whether
                                    it has a convenient pre-order
                                    traversal for lookups.



                                    On Fri, May 24, 2019 at 8:14 AM
                                    Reuven Lax <re...@google.com
                                    <mailto:re...@google.com>> wrote:

                                        Some great comments!

                                        *Aljoscha*: absolutely this
                                        would have to be implemented
                                        by runners to be efficient. We
                                        can of course provide a
                                        default (inefficient)
                                        implementation, but ideally
                                        runners would provide better ones.

                                        *Jan* Exactly. I think
                                        MapState can be dropped or
                                        backed by this. E.g.

                                        *Robert* Great point about
                                        standard coders not satisfying
                                        this. That's why I suggested
                                        that we provide a way to tag
                                        the coders that do preserve
                                        order, and only accept those
                                        as key coders Alternatively we
                                        could present a more limited
                                        API - e.g. only allowing a
                                        hard-coded set of types to be
                                        used as keys - but that seems
                                        counter to the direction Beam
                                        usually goes. So users will
                                        have two ways .of creating
                                        multimap state specs:

                                           private final
                                        StateSpec<MultimapState<Long,
                                        String>> state =
                                        StateSpecs.multimap(VarLongCoder.of(),
                                        StringUtf8Coder.of());

                                        or
                                           private final
                                        StateSpec<MultimapState<Long,
                                        String>> state =
                                        
StateSpecs.orderedMultimap(VarLongCoder.of(),
                                        StringUtf8Coder.of());

                                        The second one will validate
                                        that the key coder preserves
                                        order, and fails otherwise
                                        (similar to coder determinism
                                        checking in GroupByKey). (BTW
                                        we would also have versions of
                                        these functions that use coder
                                        inference to "guess" the
                                        coder, but those will do the
                                        same checking)

                                        Also the API I proposed did
                                        support random access! We
                                        could separate out
                                        OrderedBagState again if we
                                        think the use cases are
                                        fundamentally different. I
                                        merged the proposal into that
                                        of MultimapState because there
                                        seemed be 99% overlap.

                                        Reuven

                                        On Fri, May 24, 2019 at 6:19
                                        AM Robert Bradshaw
                                        <rober...@google.com
                                        <mailto:rober...@google.com>>
                                        wrote:

                                            On Fri, May 24, 2019 at
                                            5:32 AM Reuven Lax
                                            <re...@google.com
                                            <mailto:re...@google.com>>
                                            wrote:
                                            >
                                            > On Thu, May 23, 2019 at
                                            1:53 PM Ahmet Altay
                                            <al...@google.com
                                            <mailto:al...@google.com>>
                                            wrote:
                                            >>
                                            >>
                                            >>
                                            >> On Thu, May 23, 2019 at
                                            1:38 PM Lukasz Cwik
                                            <lc...@google.com
                                            <mailto:lc...@google.com>>
                                            wrote:
                                            >>>
                                            >>>
                                            >>>
                                            >>> On Thu, May 23, 2019
                                            at 11:37 AM Rui Wang
                                            <ruw...@google.com
                                            <mailto:ruw...@google.com>>
                                            wrote:
                                            >>>>>
                                            >>>>> A few obvious
                                            problems with this code:
                                            >>>>>  1. Removing the
                                            elements already processed
                                            from the bag requires
                                            clearing and rewriting the
                                            entire bag. This is O(n^2)
                                            in the number of input trades.
                                            >>>>
                                            >>>> why it's not O(2 * n)
                                            to clearing and rewriting
                                            trade state?
                                            >>>>
                                            >>>>>
                                            >>>>> public interface
                                            SortedMultimapState<K, V>
                                            extends State {
                                            >>>>>  // Add a value to
                                            the map.
                                            >>>>>  void put(K key, V
                                            value);
                                            >>>>>  // Get all values
                                            for a given key.
                                            >>>>>
                                             ReadableState<Iterable<V>>
                                            get(K key);
                                            >>>>> // Return all
                                            entries in the map.
                                            >>>>>
                                             ReadableState<Iterable<KV<K,
                                            V>>> allEntries();
                                            >>>>>  // Return all
                                            entries in the map with
                                            keys <= limit. returned
                                            elements are sorted by the
                                            key.
                                            >>>>>
                                             ReadableState<Iterable<KV<K,
                                            V>>> entriesUntil(K limit);
                                            >>>>>
                                            >>>>> // Remove all values
                                            with the given key;
                                            >>>>>  void remove(K key);
                                            >>>>> // Remove all
                                            entries in the map with
                                            keys <= limit.
                                            >>>>>  void removeUntil(K
                                            limit);
                                            >>>>
                                            >>>> Will
                                            removeUntilExcl(K limit)
                                            also useful? It will
                                            remove all entries in the
                                            map with keys < limit.
                                            >>>>
                                            >>>>>
                                            >>>>> Runners will sort
                                            based on the encoded value
                                            of the key. In order to
                                            make this easier for
                                            users, I propose that we
                                            introduce a new tag on
                                            Coders PreservesOrder. A
                                            Coder that contains this
                                            tag guarantees that the
                                            encoded value preserves
                                            the same ordering as the
                                            base Java type.
                                            >>>>
                                            >>>>
                                            >>>> Could you clarify
                                            what is "encoded value
                                            preserves the same
                                            ordering as the base Java
                                            type"?
                                            >>>
                                            >>>
                                            >>> Lets say A and B
                                            represent two different
                                            instances of the same Java
                                            type like a double, then A
                                            < B (using the languages
                                            comparison operator) iff
                                            encode(A) < encode(B)
                                            (note the encoded versions
                                            are compared
                                            lexicographically)
                                            >>
                                            >>
                                            >> Since coders are shared
                                            across SDKs, do we expect
                                            A < B iff e(A) < e(P)
                                            property to hold for all
                                            languages we support? What
                                            happens A, B sort
                                            differently in different
                                            languages?
                                            >
                                            >
                                            > That would have to be
                                            the property of the coder
                                            (which means that this
                                            property probably needs to
                                            be represented in the
                                            portability representation
                                            of the coder). I imagine
                                            the common use cases will
                                            be for simple coders like
                                            int, long, string, etc.,
                                            which are likely to sort
                                            the same in most languages.

                                            The standard coders for
                                            both double and integral
                                            types do not respect
                                            the natural ordering
                                            (consider negative
                                            values). KV coders violate the
                                            "natural" lexicographic
                                            ordering on components as
                                            well. I think
                                            implicitly sorting on
                                            encoded value would yield
                                            many surprises. (The
                                            state, of course, could
                                            take a order-preserving, bytes
                                            (string?)-producing
                                            callable as a parameter of
                                            course). (As for
                                            naming, I'd probably call
                                            this OrderedBagState or
                                            something like
                                            that...rather than Map
                                            which tends to imply
                                            random access.)

Reply via email to