My questions were just an example. I fully agree there is a fundamental need for a sorted state (of some form, and I also think this links to efficient implementation of retrations) - I was reacting to Kenn's question about BIP. This one would be pretty nice example why it would be good to have such a "process" - not everything can be solved on ML and there are fundamental decisions that might need a closer attention.

On 6/18/20 5:28 PM, Reuven Lax wrote:
Jan - my proposal is exactly TimeSortedBagState (more accurately - TimeSortedListState), though I went a bit further and also proposed a way to have a dynamic number of tagged TimeSortedBagStates.

You are correct that the runner doesn't really have to store the data time sorted - what's actually needed is the ability to fetch and remove timestamp ranges of data (though that does include fetching the entire list); TimeOrderedState is probably a more accurate name then TimeSortedState. I don't think we could get away with operations that only act on the smallest timestamp, however we could limit the API to only being able to fetch and remove prefixes of data (ordered by timestamp). However if we support prefixes, we might as well support arbitrary subranges.

On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Big +1 for a BIP, as this might really help clarify all the pros
    and cons of all possibilities. There seem to be questions that
    need answering and motivating use cases - do we need sorted map
    state or can we solve our use cases by something simpler - e.g.
    the mentioned TimeSortedBagState? Does that really have to be
    time-sorted structure, or does it "only" have to have operations
    that can efficiently find and remove element with smallest
    timestamp (like a PriorityQueue)?

    Jan

    On 6/18/20 5:32 AM, Kenneth Knowles wrote:
    Zooming in from generic philosophy to be clear: adding time
    ordered buffer to the Fn state API is *not* a shortcut.It has
    benefits that will not be achieved by SDK-side implementation on
    top of either ordered or unordered multimap. Are those benefits
    worth expanding the API? I don't know.

    A change to allow a runner to have a specialized implementation
    for time-buffered state would be one or more StateKey types,
    right? Reuven, maybe put this and your Java API in a doc? A BIP?
    Seems like there's at least the following to explore:

     - how that Java API would map to an SDK-side implementation on
    top of multimap state key
     - how that Java API would map to a new StateKey
     - whether there's actually more than one relevant implementation
    of that StateKey
     - whether SDK-side implementation on some other state key would
    be performant enough in all SDK languages (present and future)

    Zooming back out to generic philosophy: Proliferation of StateKey
    types tuned by runners (which can very easily still share
    implementation) is probably better than proliferation of complex
    SDK-side implementations with varying completeness and performance.

    Kenn

    On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <[email protected]
    <mailto:[email protected]>> wrote:

        It might help for me to describe what I have in mind. I'm
        still proposing that we build multimap, just not a
        globally-sorted multimap.

        My previous proposal was that we provide a Multimap<Key,
        Value> state type, sorted by key. this would have two
        additional operations - multimap.getRange(startKey, endKey)
        and multimap.deleteRange(startKey, endKey). The primary use
        case was timestamp sorting, but I felt that a sorted multimap
        provided a nice generalization - after all, you can simply
        key the multimap by timestamp to get timestamp sorting.

        This approach had some issues immediately that would take
        some work to solve. Since a multimap key can have any type
        and a runner will only be able to sort by encoded type, we
        would need to introduce a concept of order-preserving coders
        into Beam and plumb that through. Robert pointed out that
        even our existing standard coders for simple integral types
        don't preserve order, so there will likely be surprises here.

        My current proposal is for a multimap that is not sorted by
        key, but that can support.ordered values for a single key.
        Remember that a multimap maps K -> Iterable<V>, so this means
        that each individual Iterable<V> is ordered, but the keys
        have no specific order relative to each other. This is not
        too different from many multimap implementations where the
        keys are unordered, but the list of values for a single key
        at least has a stable order.

        The interface would look like this:

        public interface MultimapState<K, V> extends State {
          // Add a value with a default timestamp.
          void put(K key, V value);

          // Add a timestamped value.
          void put(K, key, TimestampedValue<V> value);

          // Remove all values for a key.
          void remove (K key);

          // Remove all values for a key with timestamps within the
        specified range.
          void removeRange(K key, Instant startTs, Instant endTs);

          // Get an Iterable of values for V. The Iterable will be
        returned sorted by timestamp.
        ReadableState<Iterable<TimestampedValue<V>>> get(K key);

          // Get an Iterable of values for V in the specified range.
        The Iterable will be returned sorted by timestamp.
        ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
        Instant startTs, Instant endTs);

          ReadableState<Iterable<K>> keys();
        ReadableState<Iterable<TimestampedValue<V>>> values();
          ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>>
        entries;
        }

        We can of course provide helper functions that allow using
        MultimapState without deailing with TimestampValue for users
        who only want a multimap and don't want sorting.

        I think many users will only need a single sorted list - not
        a full multimap. It's worth offering this as well, and we can
        simply build it on top of MultimapState. It will look like an
        extension of BagState

        public interface TimestampSortedListState<T> extends State {
          void add(TimestampedValue<T> value);
          Iterable<TimestampedValue<T>> read();
          Iterable<TimestampedValue<T>> readRange(Instant startTs,
        Instant endTs);
          void clearRange(Instant startTs, Instant endTs);
        }


        On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <[email protected]
        <mailto:[email protected]>> wrote:

            The portability layer is meant to live across multiple
            versions of Beam and I don't think it should be treated
            by doing the simple and useful thing now since I believe
            it will lead to a proliferation of the API.

            On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles
            <[email protected] <mailto:[email protected]>> wrote:

                I have thoughts on the subject of whether to have
                APIs just for the lowest-level building blocks versus
                having APIs for higher-level constructs. Specifically
                this applies to providing only unsorted multimap vs
                what I will call "time-ordered buffer". TL;DR: I'd
                vote to focus on time-ordered buffer; if it turns out
                to be easy to go all the way to sorted multimap
                that's nice-to-have; if it turns out to be easy to
                implement on top of unsorted map state that should
                probably be under the hood

                Reasons to build low-level multimap in the runner &
                fn api and layer higher-level things in the SDK:

                 - It is less implementation for runners if they have
                to only provide fewer lower-level building blocks
                like multimap state.
                 - There are many more runners than SDKs (and will be
                even more and more) so this saves overall.

                Reasons to build higher-level constructs directly in
                the runner and fn api:

                 - Having multiple higher-level state types may
                actually be less implementation than one complex
                state type, especially if they map to runner primitives.
                 - The runner may have better specialized
                implementations, especially for something like a
                time-ordered buffer.
                 - The particular access patterns in an SDK-based
                implementation may not be ideal for each runner's
                underlying implementation of the low-level building
                block.
                 - There may be excessive gRPC overhead even for
                optimal access patterns.

                There are ways to have best of both worlds, like:

                1. Define multiple state types according to
                fundamental access patterns, like we did this before
                portability.
                2. If it is easy to layer one on top of the other, do
                that inside the runner. Provide shared code so for
                runners providing the lowest-level primitive they get
                all the types for free.

                I understand that this is an oversimplification. It
                still creates some more work. And APIs are a burden
                so it is good to introduce as few as possible for
                maintenance. But it has performance benefits and also
                unblocks "just doing the simple and useful thing now"
                which I always like to do as long as it is compatible
                with future changes. If the APIs are fundamental,
                like sets, maps, timestamp ordering, then it is safe
                to guess that they will change rarely and be useful
                forever.

                Kenn

                On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik
                <[email protected] <mailto:[email protected]>> wrote:

                    I would be glad to take a stab at how to provide
                    sorting on top of unsorted multimap state.
                    Based upon your description, you want integer
                    keys representing timestamps and arbitrary user
                    value for the values, is that correct?
                    What kinds of operations do you need on the
                    sorted map state in order of efficiency requirements?
                    (e.g. Next(x), Previous(x), GetAll(Range[x, y)),
                    ClearAll(Range[x, y))
                    What kinds of operations do we expect the
                    underlying unsorted map state to be able to provide?
                    (at a minimum Get(K), Append(K), Clear(K) but
                    what else e.g. enumerate(K)?)

                    I went through a similar exercise of how to
                    provide a list like side input view over a
                    multimap[1] side input which efficiently allowed
                    computation of size and provided random access
                    while only having access to get(K) and enumerate K's.

                    1:
                    
https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568

                    On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax
                    <[email protected] <mailto:[email protected]>> wrote:

                        Bringing this subject up again,

                        I've spent some time looking into
                        implementing this for the Dataflow runner.
                        I'm unable to find a way to implement the
                        arbitrary sorted multimap efficiently for the
                        case where there are large numbers of unique
                        keys. Since the primary driving use case is
                        timestamp ordering (i.e. key is event
                        timestamp), you would expect to have nearly a
                        new key per element. I considered Luke's
                        suggestion above, but unfortunately it
                        doesn't really solve this issue.

                        The primary use case for sorting always seems
                        to be sorting by timestamp. I want to propose
                        that instead of building the fully-general
                        sorted multimap, we instead focus on a state
                        type where the sort key is an integral type
                        (like a timestamp or an integer). There is
                        still a valid use case for multimap, but we
                        can provide that as an unordered state. At
                        least for Dataflow, it will be much easier

                        While my difficulties here may be specific to
                        the Dataflow runner, any such support would
                        have to be built into other runners as well,
                        and limiting to integral sorting likely makes
                        it easier for other runners to implement
                        this. Also, if you look at this
                        
<https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
 Flink
                        comment pointed out by Aljoscha, for Flink
                        the main use case identified was also
                        timestamp sorting. This will also simplify
                        the API design for this feature: Sorted
                        multimap with arbitrary keys would require us
                        to introduce a way of mapping natural
                        ordering to encoded ordering (i.e. a new
                        OrderPreservingCoder), but if we limit sort
                        keys to integral types, the API design is
                        simpler as integral types can be represented
                        directly.

                        Reuven

                        On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax
                        <[email protected] <mailto:[email protected]>>
                        wrote:

                            This sounds to me like a potential runner
                            strategy. However if a runner can
                            natively support sorted maps (e.g. we
                            expect the Dataflow runner to be able to
                            do so, and I think it would be useful for
                            other runners as well), then it's
                            probably preferable to allow the runner
                            to use its native capabilities.

                            On Fri, May 24, 2019 at 11:05 AM Lukasz
                            Cwik <[email protected]
                            <mailto:[email protected]>> wrote:

                                For the API that you proposed, the
                                map key is always "void" and the sort
                                key == user key. So in my example of
                                key: dummy value
                                key.000: token, (0001, value4)
                                key.001: token, (0010, value1),
                                (0011, value2)
                                key.01: token
                                key.1: token, (1011, value3)
                                you would have:
                                "void": dummy value
                                "void".000: token, (0001, value4)
                                "void".001: token, (0010, value1),
                                (0011, value2)
                                "void".01: token
                                "void".1: token, (1011, value3)

                                Iterable<KV<K, V>> entriesUntil(K
                                limit) translates into walking the
                                the prefixes until you find a common
                                prefix for K and then filter for
                                values where they have a sort key <=
                                K. Using the example above, to find
                                entriesUntil(0010) you would:
                                look for key."", miss
                                look for key.0, miss
                                look for key.00, miss
                                look for key.000, hit, sort all
                                contained values using secondary key,
                                provide value4 to user
                                look for key.001, hit, notice that
                                001 is a prefix of 0010 so we sort
                                all contained values using secondary
                                key, filter out value2 and provide value1

                                void removeUntil(K limit) also
                                translates into walking the prefixes
                                but instead we will clear them when
                                we have a "hit" with some special
                                logic for when the sort key is a
                                prefix of the key. Used the example,
                                to removeUntil(0010) you would:
                                look for key."", miss
                                look for key.0, miss
                                look for key.00, miss
                                look for key.000, hit, clear
                                look for key.001, hit, notice that
                                001 is a prefix of 0010 so we sort
                                all contained values using secondary
                                key, store in memory all values that
                                > 0010, clear and append values
                                stored in memory.

                                On Fri, May 24, 2019 at 10:36 AM
                                Reuven Lax <[email protected]
                                <mailto:[email protected]>> wrote:

                                    Can you explain how fetching and
                                    deleting ranges of keys would
                                    work with this data structure?

                                    On Fri, May 24, 2019 at 9:50 AM
                                    Lukasz Cwik <[email protected]
                                    <mailto:[email protected]>> wrote:

                                        Reuven, for the example, I
                                        assume that we never want to
                                        store more then 2 values at a
                                        given sort key prefix, and if
                                        we do then we will create a
                                        new longer prefix splitting
                                        up the values based upon the
                                        sort key.

                                        Tuple representation in
                                        examples below is (key, sort
                                        key, value) and . is a
                                        character outside of the
                                        alphabet which can be
                                        represented by using an
                                        escaping encoding that wraps
                                        the key + sort key encoding.

                                        To insert (key, 0010,
                                        value1), we lookup "key" +
                                        all the prefixes of 0010
                                        finding one that is not
                                        empty. In this case its 0, so
                                        we append value to the map at
                                        key.0 ending up with (we also
                                        set the key to any dummy
                                        value to know that it it
                                        contains values):
                                        key: dummy value
                                        key."": token, (0010, value1)
                                        Now we insert (key, 0011,
                                        value2), we again lookup
                                        "key" + all the prefixes of
                                        0010, finding "", so we
                                        append value2 to key.""
                                        ending up with:
                                        key: dummy value
                                        key."": token, (0010,
                                        value1), (0011, value2)
                                        Now we insert (key, 1011,
                                        value3), we again lookup
                                        "key" + all the prefixes of
                                        1011 finding "" but notice
                                        that it is full, so we
                                        partition all the values into
                                        two prefixes 0 and 1. We also
                                        clear the "" prefix ending up
                                        with:
                                        key: dummy value
                                        key.0: token, (0010, value1),
                                        (0011, value2)
                                        key.1: token, (1011, value3)
                                        Now we insert (key, 0001,
                                        value4), we again lookup
                                        "key" + all the prefixes of
                                        the value finding 0 but
                                        notice that it is full, so we
                                        partition all the values into
                                        two prefixes 00 and 01 but
                                        notice this doesn't help us
                                        since 00 will be too full so
                                        we split 00 again to 000,
                                        001. We also clear the 0
                                        prefix ending up with:
                                        key: dummy value
                                        key.000: token, (0001, value4)
                                        key.001: token, (0010,
                                        value1), (0011, value2)
                                        key.01: token
                                        key.1: token, (1011, value3)

                                        We are effectively building a
                                        trie[1] where we only have
                                        values at the leaves and
                                        control how full each leaf
                                        can be. There are other trie
                                        representations like a radix
                                        tree that may be better.

                                        Looking up the values in
                                        sorted order for "key" would
                                        go like this:
                                        Is key set, yes
                                        look for key."", miss
                                        look for key.0, miss
                                        look for key.00, miss
                                        look for key.000, hit, sort
                                        all contained values using
                                        secondary key, provide value4
                                        to user
                                        look for key.001, hit, sort
                                        all contained values using
                                        secondary key, provide value1
                                        followed by value2 to user
                                        look for key.01, hit, empty,
                                        return no values to user
                                        look for key.1, hit, sort all
                                        contained values using
                                        secondary key, provide value3
                                        to user
                                        we have walked the entire
                                        prefix space, signal end of
                                        iterable

                                        Some notes for the above:
                                        * The dummy value is used to
                                        know that the key contains
                                        values and the token is to
                                        know whether there are any
                                        values deeper in the trie so
                                        when we know when to stop
                                        searching.
                                        * If we can recalculate the
                                        sort key from the combination
                                        of the key and value, then we
                                        don't need to store it.
                                        * Keys with lots of values
                                        will perform worse then keys
                                        with less values since we
                                        have to look up more keys but
                                        they will be empty reads. The
                                        number of misses can be
                                        controlled by how many
                                        elements we are willing to
                                        store at a given node before
                                        we subdivide.

                                        In reality you could build a
                                        lot of structures (e.g. red
                                        black tree, binary tree)
                                        using the sort key, the issue
                                        is the cost of
                                        rebalancing/re-organizing the
                                        structure in map form and
                                        whether it has a convenient
                                        pre-order traversal for lookups.



                                        On Fri, May 24, 2019 at 8:14
                                        AM Reuven Lax
                                        <[email protected]
                                        <mailto:[email protected]>> wrote:

                                            Some great comments!

                                            *Aljoscha*: absolutely
                                            this would have to be
                                            implemented by runners to
                                            be efficient. We can of
                                            course provide a default
                                            (inefficient)
                                            implementation, but
                                            ideally runners would
                                            provide better ones.

                                            *Jan* Exactly. I think
                                            MapState can be dropped
                                            or backed by this. E.g.

                                            *Robert* Great point
                                            about standard coders not
                                            satisfying this. That's
                                            why I suggested that we
                                            provide a way to tag the
                                            coders that do preserve
                                            order, and only accept
                                            those as key coders
                                            Alternatively we could
                                            present a more limited
                                            API - e.g. only allowing
                                            a hard-coded set of types
                                            to be used as keys - but
                                            that seems counter to the
                                            direction Beam usually
                                            goes. So users will have
                                            two ways .of creating
                                            multimap state specs:

                                               private final
                                            StateSpec<MultimapState<Long,
                                            String>> state =
                                            
StateSpecs.multimap(VarLongCoder.of(),
                                            StringUtf8Coder.of());

                                            or
                                             private final
                                            StateSpec<MultimapState<Long,
                                            String>> state =
                                            
StateSpecs.orderedMultimap(VarLongCoder.of(),
                                            StringUtf8Coder.of());

                                            The second one will
                                            validate that the key
                                            coder preserves order,
                                            and fails otherwise
                                            (similar to coder
                                            determinism checking in
                                            GroupByKey). (BTW we
                                            would also have versions
                                            of these functions that
                                            use coder inference to
                                            "guess" the coder, but
                                            those will do the same
                                            checking)

                                            Also the API I proposed
                                            did support
                                            random access! We could
                                            separate out
                                            OrderedBagState again if
                                            we think the use cases
                                            are fundamentally
                                            different. I merged the
                                            proposal into that of
                                            MultimapState because
                                            there seemed be 99% overlap.

                                            Reuven

                                            On Fri, May 24, 2019 at
                                            6:19 AM Robert Bradshaw
                                            <[email protected]
                                            <mailto:[email protected]>>
                                            wrote:

                                                On Fri, May 24, 2019
                                                at 5:32 AM Reuven Lax
                                                <[email protected]
                                                <mailto:[email protected]>>
                                                wrote:
                                                >
                                                > On Thu, May 23,
                                                2019 at 1:53 PM Ahmet
                                                Altay
                                                <[email protected]
                                                <mailto:[email protected]>>
                                                wrote:
                                                >>
                                                >>
                                                >>
                                                >> On Thu, May 23,
                                                2019 at 1:38 PM
                                                Lukasz Cwik
                                                <[email protected]
                                                <mailto:[email protected]>>
                                                wrote:
                                                >>>
                                                >>>
                                                >>>
                                                >>> On Thu, May 23,
                                                2019 at 11:37 AM Rui
                                                Wang
                                                <[email protected]
                                                <mailto:[email protected]>>
                                                wrote:
                                                >>>>>
                                                >>>>> A few obvious
                                                problems with this code:
                                                >>>>>   1. Removing
                                                the elements already
                                                processed from the
                                                bag requires clearing
                                                and rewriting the
                                                entire bag. This is
                                                O(n^2) in the number
                                                of input trades.
                                                >>>>
                                                >>>> why it's not O(2
                                                * n) to clearing and
                                                rewriting trade state?
                                                >>>>
                                                >>>>>
                                                >>>>> public
                                                interface
                                                SortedMultimapState<K,
                                                V> extends State {
                                                >>>>>   // Add a
                                                value to the map.
                                                >>>>>   void put(K
                                                key, V value);
                                                >>>>>   // Get all
                                                values for a given key.
                                                >>>>>
                                                 ReadableState<Iterable<V>>
                                                get(K key);
                                                >>>>>  // Return all
                                                entries in the map.
                                                >>>>>
                                                 ReadableState<Iterable<KV<K,
                                                V>>> allEntries();
                                                >>>>>   // Return all
                                                entries in the map
                                                with keys <= limit.
                                                returned elements are
                                                sorted by the key.
                                                >>>>>
                                                 ReadableState<Iterable<KV<K,
                                                V>>> entriesUntil(K
                                                limit);
                                                >>>>>
                                                >>>>>  // Remove all
                                                values with the given
                                                key;
                                                >>>>>   void remove(K
                                                key);
                                                >>>>>  // Remove all
                                                entries in the map
                                                with keys <= limit.
                                                >>>>>   void
                                                removeUntil(K limit);
                                                >>>>
                                                >>>> Will
                                                removeUntilExcl(K
                                                limit) also useful?
                                                It will remove all
                                                entries in the map
                                                with keys < limit.
                                                >>>>
                                                >>>>>
                                                >>>>> Runners will
                                                sort based on the
                                                encoded value of the
                                                key. In order to make
                                                this easier for
                                                users, I propose that
                                                we introduce a new
                                                tag on Coders
                                                PreservesOrder. A
                                                Coder that contains
                                                this tag guarantees
                                                that the encoded
                                                value preserves the
                                                same ordering as the
                                                base Java type.
                                                >>>>
                                                >>>>
                                                >>>> Could you
                                                clarify what is 
                                                "encoded value
                                                preserves the same
                                                ordering as the base
                                                Java type"?
                                                >>>
                                                >>>
                                                >>> Lets say A and B
                                                represent two
                                                different instances
                                                of the same Java type
                                                like a double, then A
                                                < B (using the
                                                languages comparison
                                                operator) iff
                                                encode(A) < encode(B)
                                                (note the encoded
                                                versions are compared
                                                lexicographically)
                                                >>
                                                >>
                                                >> Since coders are
                                                shared across SDKs,
                                                do we expect A < B
                                                iff e(A) < e(P)
                                                property to hold for
                                                all languages we
                                                support? What happens
                                                A, B sort differently
                                                in different languages?
                                                >
                                                >
                                                > That would have to
                                                be the property of
                                                the coder (which
                                                means that this
                                                property probably
                                                needs to be
                                                represented in the
                                                portability
                                                representation of the
                                                coder). I imagine the
                                                common use cases will
                                                be for simple coders
                                                like int, long,
                                                string, etc., which
                                                are likely to sort
                                                the same in most
                                                languages.

                                                The standard coders
                                                for both double and
                                                integral types do not
                                                respect
                                                the natural ordering
                                                (consider negative
                                                values). KV coders
                                                violate the
                                                "natural"
                                                lexicographic
                                                ordering on
                                                components as well. I
                                                think
                                                implicitly sorting on
                                                encoded value would
                                                yield many surprises.
                                                (The
                                                state, of course,
                                                could take a
                                                order-preserving, bytes
                                                (string?)-producing
                                                callable as a
                                                parameter of course).
                                                (As for
                                                naming, I'd probably
                                                call this
                                                OrderedBagState or
                                                something like
                                                that...rather than
                                                Map which tends to
                                                imply random access.)

Reply via email to