> If that timer is triggered by the watermark of the previous step and that watermark is being held up by the entire iteration, then this timer will never fire and the whole transform could deadlock. This was one reason for multi-dimensional watermarks - the timer can fire based on the watermark from the previous iterations, and so never deadlocks (though figuring out how to efficiently implement watermarks of unbounded dimensionality might be difficult).

That makes sense. However, if you implement endless "for" cycle, that will cycle for ever - is that something that the tool you are using should avoid? Should we ban for-cycles just because it can lead to infinite loops? Another important detail - there is no change in the Beam model needed. The purpose of this thread is - are we already there? Is really SDF filling the gap in both the source, but as well the iteration "gap"? Iteration is for the time being the domain of batch, which is where the unified approach looses its points.

On 6/24/21 12:31 AM, Reuven Lax wrote:


On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    On 6/23/21 11:13 PM, Reuven Lax wrote:


    On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        The most qualitatively import use-case I see are ACID
        transactions - transactions naturally involve cycles, because
        the most natural implementation would be of something like
        "optimistic locking" where the transaction is allowed to
        progress until a downstream "commit" sees a conflict, when it
        needs to return the transaction back to the beginning (pretty
        much the same as how git resolves conflict in a push).

    True, however within a transform one could use timers to
    implement this (there are currently some bugs around looping
    timers I believe, but those are easier to fix than implementing a
    brand new programming model). Iterative is only really necessary
    if you need to iterate an entire subgraph, including GroupByKeys,
    etc.

    There necessarily needs to be GBK to support transactions. If we
    take the most prominent example of a transaction - moving some
    amount of cash between two bank accounts - we can have a state for
    the current amount of cash per user. This state would be keyed,
    request to transfer some amount would come as a atomic event "move
    amount X from user A to user B". This would lead to updates of
    state in keyed state of A and B, but we don't know if A or B have
    the required amount on their account. And this is where we need to
    do two GBKs - one will assign a sequential ID to each transaction
    (that is a serialized order) and the other downstream will verify
    that the result was computed from the correct data.


Fair point. This could be done with state/timers in an eventually-consistent way (though not fully ACID) by simply sending messages. However in these sorts of workflow scenarios, the need for back edges will probably come up regardless (e.g. if a failure happens and you want to cancel, you might need a back edge to tell the previous key to cancel).

However I'm still not convinced watermarks are needed.

    This is maybe too complex to describe in short, but there
    definitely has to be GBK (actually GroupAll operation) downstream
    and a cycle when a post-condition fails.


        Another application would be graph algorithms on changing
        graphs, where adding or removing an edge might trigger an
        iterative algorithm on the graph (and I'm absolutely not sure
        that the discussed approach can do that, this is just
        something, that would be cool to do :)).

    Yes, that's what I had in mind. I'm just not sure that these
    algorithms lend themselves to windowing. I.e. if we added
    iterative support, but did not have support for windowing or
    watermarks across iterations, have we solved most of the problem?

    I don't think there is any windowing involved. When a new road is
    built between cities A and B it _immediately_ makes traveling
    between these two cities faster. There is no discrete boundary.

    I don't understand why we would drop support for watermarks - they
    would be perfectly supported, every iteration key will have a
    watermark hold that would be released when the key finished
    iterating - or was terminated due to timeout. I'm not sure if
    windowing as such plays any role in this, but maybe can.


You'd have to make sure things don't deadlock. If a step inside the transform that was being iterated had an event-time timer, what triggers that timer? If that timer is triggered by the watermark of the previous step and that watermark is being held up by the entire iteration, then this timer will never fire and the whole transform could deadlock. This was one reason for multi-dimensional watermarks - the timer can fire based on the watermark from the previous iterations, and so never deadlocks (though figuring out how to efficiently implement watermarks of unbounded dimensionality might be difficult).


        YOn 6/23/21 10:53 PM, Reuven Lax wrote:
        One question I have is whether the use cases for cyclic
        graphs overlap substantially with the use cases for
        event-time watermarks. Many of the uses I'm aware of are
        ML-type algorithms (e.g. clustering) or iterative algorithms
        on large graphs (connected components, etc.), and it's
        unclear how many of these problems have a natural time
        component.

        On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            Reuven, can you please elaborate a little on that? Why
            do you need watermark per iteration? Letting the
            watermark progress as soon as all the keys arriving
            before the upstream watermark terminate the cycle seems
            like a valid definition without the need to make the
            watermark multidimensional. Yes, it introduces (possibly
            unbounded) latency in downstream processing, but that is
            something that should be probably expected. The
            unboundness of the latency can be limited by either
            fixed timeout or number of iterations.

            On 6/23/21 8:39 PM, Reuven Lax wrote:
            This was explored in the past, though the design
            started getting very complex (watermarks of unbounded
            dimension, where each iteration has its own watermark
            dimension). At the time, the exploration petered out.

            On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Hi,

                I'd like to discuss a very rough idea. I didn't
                walk through all the
                corner cases and the whole idea has a lot of rough
                edges, so please bear
                with me. I was thinking about non-IO applications
                of splittable DoFn,
                and the main idea - and why it is called splittable
                - is that it can
                handle unbounded outputs per element. Then I was
                thinking about what can
                generate unbounded outputs per element _without
                reading from external
                source_ (as that would be IO application) - and
                then I realized that the
                data can - at least theoretically - come from a
                downstream transform. It
                would have to be passed over an RPC (gRPC probably)
                connection, it would
                probably require some sort of service discovery -
                as the feedback loop
                would have to be correctly targeted based on key -
                and so on (those are
                the rough edges).

                But supposing this can be solved - what iterations
                actually mean is the
                we have a side channel, that come from downstream
                processing - and we
                need a watermark estimator for this channel, that
                is able to hold the
                watermark back until the very last element (at a
                certain watermark)
                finishes the iteration. The idea is then we could -
                in theory - create
                an Iteration PTransform, that would take another
                PTransform (probably
                something like PTransform<PCollection<KV<K, V>>,
                PCollection<KV<K,
                IterationResult<K, V>>>, where the
                IterationResult<K, V> would contain
                the original KV<K, V> and a stopping condition
                (true, false) and by
                creating the feedback loop from the output of this
                PCollection we could
                actually implement this without any need of support
                on the side of runners.

                Does that seem like something that might be worth
                exploring?

                  Jan

Reply via email to