Hi Kenn,

thanks for the pointers, that is really interesting reading that probably could (should) be part of the Beam docs. On the other hand Beam is no longer Dataflow only - and that could mean that some of the concepts can be reiterated, possibly?

I don't quite understand where is the difference of "source watermark" - where the "source SDF" can use output any downstream watermark - and an "iterative SDF", cannot ... this feels like it should be the same.

On 6/24/21 12:47 AM, Kenneth Knowles wrote:
Most of the theory is particularly well-treated in "Timely Dataflow" and "Differential Dataflow". There is a brief summary of the latter at https://blog.acolyer.org/2015/06/17/differential-dataflow/ <https://blog.acolyer.org/2015/06/17/differential-dataflow/> but I recommend actually reading both papers. It uses clock ticks rather than Beam's continuous style of watermark, but I don't think this changes the general approach.

There are very few implementations of watermark-correct cycles AFAIK. For Beam runners where the watermark is simulated (for example using Spark's state) we could possibly implement at the Beam layer. For engines where the Beam watermark is implemented more directly (for example Dataflow & Flink) there would be a lot of added complexity, probably performance loss, if it could be done at all.

Kenn

On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:



    On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        On 6/23/21 11:13 PM, Reuven Lax wrote:


        On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            The most qualitatively import use-case I see are ACID
            transactions - transactions naturally involve cycles,
            because the most natural implementation would be of
            something like "optimistic locking" where the transaction
            is allowed to progress until a downstream "commit" sees a
            conflict, when it needs to return the transaction back to
            the beginning (pretty much the same as how git resolves
            conflict in a push).

        True, however within a transform one could use timers to
        implement this (there are currently some bugs around looping
        timers I believe, but those are easier to fix than
        implementing a brand new programming model). Iterative is
        only really necessary if you need to iterate an entire
        subgraph, including GroupByKeys, etc.

        There necessarily needs to be GBK to support transactions. If
        we take the most prominent example of a transaction - moving
        some amount of cash between two bank accounts - we can have a
        state for the current amount of cash per user. This state
        would be keyed, request to transfer some amount would come as
        a atomic event "move amount X from user A to user B". This
        would lead to updates of state in keyed state of A and B, but
        we don't know if A or B have the required amount on their
        account. And this is where we need to do two GBKs - one will
        assign a sequential ID to each transaction (that is a
        serialized order) and the other downstream will verify that
        the result was computed from the correct data.


    Fair point. This could be done with state/timers in an
    eventually-consistent way (though not fully ACID) by simply
    sending messages. However in these sorts of workflow scenarios,
    the need for back edges will probably come up regardless (e.g. if
    a failure happens and you want to cancel, you might need a back
    edge to tell the previous key to cancel).

    However I'm still not convinced watermarks are needed.

        This is maybe too complex to describe in short, but there
        definitely has to be GBK (actually GroupAll operation)
        downstream and a cycle when a post-condition fails.


            Another application would be graph algorithms on changing
            graphs, where adding or removing an edge might trigger an
            iterative algorithm on the graph (and I'm absolutely not
            sure that the discussed approach can do that, this is
            just something, that would be cool to do :)).

        Yes, that's what I had in mind. I'm just not sure that these
        algorithms lend themselves to windowing. I.e. if we added
        iterative support, but did not have support for windowing or
        watermarks across iterations, have we solved most of the
        problem?

        I don't think there is any windowing involved. When a new road
        is built between cities A and B it _immediately_ makes
        traveling between these two cities faster. There is no
        discrete boundary.

        I don't understand why we would drop support for watermarks -
        they would be perfectly supported, every iteration key will
        have a watermark hold that would be released when the key
        finished iterating - or was terminated due to timeout. I'm not
        sure if windowing as such plays any role in this, but maybe can.


    You'd have to make sure things don't deadlock. If a step inside
    the transform that was being iterated had an event-time timer,
    what triggers that timer? If that timer is triggered by the
    watermark of the previous step and that watermark is being held up
    by the entire iteration, then this timer will never fire and the
    whole transform could deadlock. This was one reason for
    multi-dimensional watermarks - the timer can fire based on the
    watermark from the previous iterations, and so never deadlocks
    (though figuring out how to efficiently implement watermarks of
    unbounded dimensionality might be difficult).


            YOn 6/23/21 10:53 PM, Reuven Lax wrote:
            One question I have is whether the use cases for cyclic
            graphs overlap substantially with the use cases for
            event-time watermarks. Many of the uses I'm aware of are
            ML-type algorithms (e.g. clustering) or iterative
            algorithms on large graphs (connected components, etc.),
            and it's unclear how many of these problems have a
            natural time component.

            On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                Reuven, can you please elaborate a little on that?
                Why do you need watermark per iteration? Letting the
                watermark progress as soon as all the keys arriving
                before the upstream watermark terminate the cycle
                seems like a valid definition without the need to
                make the watermark multidimensional. Yes, it
                introduces (possibly unbounded) latency in
                downstream processing, but that is something that
                should be probably expected. The unboundness of the
                latency can be limited by either fixed timeout or
                number of iterations.

                On 6/23/21 8:39 PM, Reuven Lax wrote:
                This was explored in the past, though the design
                started getting very complex (watermarks of
                unbounded dimension, where each iteration has its
                own watermark dimension). At the time, the
                exploration petered out.

                On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    Hi,

                    I'd like to discuss a very rough idea. I didn't
                    walk through all the
                    corner cases and the whole idea has a lot of
                    rough edges, so please bear
                    with me. I was thinking about non-IO
                    applications of splittable DoFn,
                    and the main idea - and why it is called
                    splittable - is that it can
                    handle unbounded outputs per element. Then I
                    was thinking about what can
                    generate unbounded outputs per element _without
                    reading from external
                    source_ (as that would be IO application) - and
                    then I realized that the
                    data can - at least theoretically - come from a
                    downstream transform. It
                    would have to be passed over an RPC (gRPC
                    probably) connection, it would
                    probably require some sort of service discovery
                    - as the feedback loop
                    would have to be correctly targeted based on
                    key - and so on (those are
                    the rough edges).

                    But supposing this can be solved - what
                    iterations actually mean is the
                    we have a side channel, that come from
                    downstream processing - and we
                    need a watermark estimator for this channel,
                    that is able to hold the
                    watermark back until the very last element (at
                    a certain watermark)
                    finishes the iteration. The idea is then we
                    could - in theory - create
                    an Iteration PTransform, that would take
                    another PTransform (probably
                    something like PTransform<PCollection<KV<K,
                    V>>, PCollection<KV<K,
                    IterationResult<K, V>>>, where the
                    IterationResult<K, V> would contain
                    the original KV<K, V> and a stopping condition
                    (true, false) and by
                    creating the feedback loop from the output of
                    this PCollection we could
                    actually implement this without any need of
                    support on the side of runners.

                    Does that seem like something that might be
                    worth exploring?

                      Jan

Reply via email to