Most of the theory is particularly well-treated in "Timely Dataflow"
and "Differential Dataflow". There is a brief summary of the latter
at https://blog.acolyer.org/2015/06/17/differential-dataflow/
<https://blog.acolyer.org/2015/06/17/differential-dataflow/> but I
recommend actually reading both papers. It uses clock ticks rather
than Beam's continuous style of watermark, but I don't think this
changes the general approach.
There are very few implementations of watermark-correct cycles AFAIK.
For Beam runners where the watermark is simulated (for example using
Spark's state) we could possibly implement at the Beam layer. For
engines where the Beam watermark is implemented more directly (for
example Dataflow & Flink) there would be a lot of added complexity,
probably performance loss, if it could be done at all.
Kenn
On Wed, Jun 23, 2021 at 3:31 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
On 6/23/21 11:13 PM, Reuven Lax wrote:
On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
The most qualitatively import use-case I see are ACID
transactions - transactions naturally involve cycles,
because the most natural implementation would be of
something like "optimistic locking" where the
transaction is allowed to progress until a downstream
"commit" sees a conflict, when it needs to return the
transaction back to the beginning (pretty much the same
as how git resolves conflict in a push).
True, however within a transform one could use timers to
implement this (there are currently some bugs around looping
timers I believe, but those are easier to fix than
implementing a brand new programming model). Iterative is
only really necessary if you need to iterate an entire
subgraph, including GroupByKeys, etc.
There necessarily needs to be GBK to support transactions. If
we take the most prominent example of a transaction - moving
some amount of cash between two bank accounts - we can have a
state for the current amount of cash per user. This state
would be keyed, request to transfer some amount would come as
a atomic event "move amount X from user A to user B". This
would lead to updates of state in keyed state of A and B, but
we don't know if A or B have the required amount on their
account. And this is where we need to do two GBKs - one will
assign a sequential ID to each transaction (that is a
serialized order) and the other downstream will verify that
the result was computed from the correct data.
Fair point. This could be done with state/timers in an
eventually-consistent way (though not fully ACID) by simply
sending messages. However in these sorts of workflow scenarios,
the need for back edges will probably come up regardless (e.g. if
a failure happens and you want to cancel, you might need a back
edge to tell the previous key to cancel).
However I'm still not convinced watermarks are needed.
This is maybe too complex to describe in short, but there
definitely has to be GBK (actually GroupAll operation)
downstream and a cycle when a post-condition fails.
Another application would be graph algorithms on
changing graphs, where adding or removing an edge might
trigger an iterative algorithm on the graph (and I'm
absolutely not sure that the discussed approach can do
that, this is just something, that would be cool to do :)).
Yes, that's what I had in mind. I'm just not sure that these
algorithms lend themselves to windowing. I.e. if we added
iterative support, but did not have support for windowing or
watermarks across iterations, have we solved most of the
problem?
I don't think there is any windowing involved. When a new
road is built between cities A and B it _immediately_ makes
traveling between these two cities faster. There is no
discrete boundary.
I don't understand why we would drop support for watermarks -
they would be perfectly supported, every iteration key will
have a watermark hold that would be released when the key
finished iterating - or was terminated due to timeout. I'm
not sure if windowing as such plays any role in this, but
maybe can.
You'd have to make sure things don't deadlock. If a step inside
the transform that was being iterated had an event-time timer,
what triggers that timer? If that timer is triggered by the
watermark of the previous step and that watermark is being held
up by the entire iteration, then this timer will never fire and
the whole transform could deadlock. This was one reason for
multi-dimensional watermarks - the timer can fire based on the
watermark from the previous iterations, and so never deadlocks
(though figuring out how to efficiently implement watermarks of
unbounded dimensionality might be difficult).
YOn 6/23/21 10:53 PM, Reuven Lax wrote:
One question I have is whether the use cases for cyclic
graphs overlap substantially with the use cases for
event-time watermarks. Many of the uses I'm aware of
are ML-type algorithms (e.g. clustering) or iterative
algorithms on large graphs (connected components,
etc.), and it's unclear how many of these problems have
a natural time component.
On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Reuven, can you please elaborate a little on that?
Why do you need watermark per iteration? Letting
the watermark progress as soon as all the keys
arriving before the upstream watermark terminate
the cycle seems like a valid definition without the
need to make the watermark multidimensional. Yes,
it introduces (possibly unbounded) latency in
downstream processing, but that is something that
should be probably expected. The unboundness of the
latency can be limited by either fixed timeout or
number of iterations.
On 6/23/21 8:39 PM, Reuven Lax wrote:
This was explored in the past, though the design
started getting very complex (watermarks of
unbounded dimension, where each iteration has its
own watermark dimension). At the time, the
exploration petered out.
On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi,
I'd like to discuss a very rough idea. I
didn't walk through all the
corner cases and the whole idea has a lot of
rough edges, so please bear
with me. I was thinking about non-IO
applications of splittable DoFn,
and the main idea - and why it is called
splittable - is that it can
handle unbounded outputs per element. Then I
was thinking about what can
generate unbounded outputs per element
_without reading from external
source_ (as that would be IO application) -
and then I realized that the
data can - at least theoretically - come from
a downstream transform. It
would have to be passed over an RPC (gRPC
probably) connection, it would
probably require some sort of service
discovery - as the feedback loop
would have to be correctly targeted based on
key - and so on (those are
the rough edges).
But supposing this can be solved - what
iterations actually mean is the
we have a side channel, that come from
downstream processing - and we
need a watermark estimator for this channel,
that is able to hold the
watermark back until the very last element (at
a certain watermark)
finishes the iteration. The idea is then we
could - in theory - create
an Iteration PTransform, that would take
another PTransform (probably
something like PTransform<PCollection<KV<K,
V>>, PCollection<KV<K,
IterationResult<K, V>>>, where the
IterationResult<K, V> would contain
the original KV<K, V> and a stopping condition
(true, false) and by
creating the feedback loop from the output of
this PCollection we could
actually implement this without any need of
support on the side of runners.
Does that seem like something that might be
worth exploring?
Jan