On 6/23/21 11:13 PM, Reuven Lax wrote:


On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    The most qualitatively import use-case I see are ACID transactions
    - transactions naturally involve cycles, because the most natural
    implementation would be of something like "optimistic locking"
    where the transaction is allowed to progress until a downstream
    "commit" sees a conflict, when it needs to return the transaction
    back to the beginning (pretty much the same as how git resolves
    conflict in a push).

True, however within a transform one could use timers to implement this (there are currently some bugs around looping timers I believe, but those are easier to fix than implementing a brand new programming model). Iterative is only really necessary if you need to iterate an entire subgraph, including GroupByKeys, etc.

There necessarily needs to be GBK to support transactions. If we take the most prominent example of a transaction - moving some amount of cash between two bank accounts - we can have a state for the current amount of cash per user. This state would be keyed, request to transfer some amount would come as a atomic event "move amount X from user A to user B". This would lead to updates of state in keyed state of A and B, but we don't know if A or B have the required amount on their account. And this is where we need to do two GBKs - one will assign a sequential ID to each transaction (that is a serialized order) and the other downstream will verify that the result was computed from the correct data.

This is maybe too complex to describe in short, but there definitely has to be GBK (actually GroupAll operation) downstream and a cycle when a post-condition fails.


    Another application would be graph algorithms on changing graphs,
    where adding or removing an edge might trigger an iterative
    algorithm on the graph (and I'm absolutely not sure that the
    discussed approach can do that, this is just something, that would
    be cool to do :)).

Yes, that's what I had in mind. I'm just not sure that these algorithms lend themselves to windowing. I.e. if we added iterative support, but did not have support for windowing or watermarks across iterations, have we solved most of the problem?

I don't think there is any windowing involved. When a new road is built between cities A and B it _immediately_ makes traveling between these two cities faster. There is no discrete boundary.

I don't understand why we would drop support for watermarks - they would be perfectly supported, every iteration key will have a watermark hold that would be released when the key finished iterating - or was terminated due to timeout. I'm not sure if windowing as such plays any role in this, but maybe can.

    On 6/23/21 10:53 PM, Reuven Lax wrote:
    One question I have is whether the use cases for cyclic graphs
    overlap substantially with the use cases for event-time
    watermarks. Many of the uses I'm aware of are ML-type algorithms
    (e.g. clustering) or iterative algorithms on large graphs
    (connected components, etc.), and it's unclear how many of these
    problems have a natural time component.

    On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Reuven, can you please elaborate a little on that? Why do you
        need watermark per iteration? Letting the watermark progress
        as soon as all the keys arriving before the upstream
        watermark terminate the cycle seems like a valid definition
        without the need to make the watermark multidimensional. Yes,
        it introduces (possibly unbounded) latency in downstream
        processing, but that is something that should be probably
        expected. The unboundness of the latency can be limited by
        either fixed timeout or number of iterations.

        On 6/23/21 8:39 PM, Reuven Lax wrote:
        This was explored in the past, though the design started
        getting very complex (watermarks of unbounded dimension,
        where each iteration has its own watermark dimension). At
        the time, the exploration petered out.

        On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            Hi,

            I'd like to discuss a very rough idea. I didn't walk
            through all the
            corner cases and the whole idea has a lot of rough
            edges, so please bear
            with me. I was thinking about non-IO applications of
            splittable DoFn,
            and the main idea - and why it is called splittable - is
            that it can
            handle unbounded outputs per element. Then I was
            thinking about what can
            generate unbounded outputs per element _without reading
            from external
            source_ (as that would be IO application) - and then I
            realized that the
            data can - at least theoretically - come from a
            downstream transform. It
            would have to be passed over an RPC (gRPC probably)
            connection, it would
            probably require some sort of service discovery - as the
            feedback loop
            would have to be correctly targeted based on key - and
            so on (those are
            the rough edges).

            But supposing this can be solved - what iterations
            actually mean is the
            we have a side channel, that come from downstream
            processing - and we
            need a watermark estimator for this channel, that is
            able to hold the
            watermark back until the very last element (at a certain
            watermark)
            finishes the iteration. The idea is then we could - in
            theory - create
            an Iteration PTransform, that would take another
            PTransform (probably
            something like PTransform<PCollection<KV<K, V>>,
            PCollection<KV<K,
            IterationResult<K, V>>>, where the IterationResult<K, V>
            would contain
            the original KV<K, V> and a stopping condition (true,
            false) and by
            creating the feedback loop from the output of this
            PCollection we could
            actually implement this without any need of support on
            the side of runners.

            Does that seem like something that might be worth exploring?

              Jan

Reply via email to