> If that timer is triggered by the watermark of the previous step and
that watermark is being held up by the entire iteration, then this timer
will never fire and the whole transform could deadlock. This was one
reason for multi-dimensional watermarks - the timer can fire based on
the watermark from the previous iterations, and so never deadlocks
(though figuring out how to efficiently implement watermarks of
unbounded dimensionality might be difficult).
That makes sense. However, if you implement endless "for" cycle, that
will cycle for ever - is that something that the tool you are using
should avoid? Should we ban for-cycles just because it can lead to
infinite loops? Another important detail - there is no change in the
Beam model needed. The purpose of this thread is - are we already there?
Is really SDF filling the gap in both the source, but as well the
iteration "gap"? Iteration is for the time being the domain of batch,
which is where the unified approach looses its points.
On 6/24/21 12:31 AM, Reuven Lax wrote:
On Wed, Jun 23, 2021 at 2:33 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
On 6/23/21 11:13 PM, Reuven Lax wrote:
On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
The most qualitatively import use-case I see are ACID
transactions - transactions naturally involve cycles, because
the most natural implementation would be of something like
"optimistic locking" where the transaction is allowed to
progress until a downstream "commit" sees a conflict, when it
needs to return the transaction back to the beginning (pretty
much the same as how git resolves conflict in a push).
True, however within a transform one could use timers to
implement this (there are currently some bugs around looping
timers I believe, but those are easier to fix than implementing a
brand new programming model). Iterative is only really necessary
if you need to iterate an entire subgraph, including GroupByKeys,
etc.
There necessarily needs to be GBK to support transactions. If we
take the most prominent example of a transaction - moving some
amount of cash between two bank accounts - we can have a state for
the current amount of cash per user. This state would be keyed,
request to transfer some amount would come as a atomic event "move
amount X from user A to user B". This would lead to updates of
state in keyed state of A and B, but we don't know if A or B have
the required amount on their account. And this is where we need to
do two GBKs - one will assign a sequential ID to each transaction
(that is a serialized order) and the other downstream will verify
that the result was computed from the correct data.
Fair point. This could be done with state/timers in an
eventually-consistent way (though not fully ACID) by simply sending
messages. However in these sorts of workflow scenarios, the need for
back edges will probably come up regardless (e.g. if a failure happens
and you want to cancel, you might need a back edge to tell the
previous key to cancel).
However I'm still not convinced watermarks are needed.
This is maybe too complex to describe in short, but there
definitely has to be GBK (actually GroupAll operation) downstream
and a cycle when a post-condition fails.
Another application would be graph algorithms on changing
graphs, where adding or removing an edge might trigger an
iterative algorithm on the graph (and I'm absolutely not sure
that the discussed approach can do that, this is just
something, that would be cool to do :)).
Yes, that's what I had in mind. I'm just not sure that these
algorithms lend themselves to windowing. I.e. if we added
iterative support, but did not have support for windowing or
watermarks across iterations, have we solved most of the problem?
I don't think there is any windowing involved. When a new road is
built between cities A and B it _immediately_ makes traveling
between these two cities faster. There is no discrete boundary.
I don't understand why we would drop support for watermarks - they
would be perfectly supported, every iteration key will have a
watermark hold that would be released when the key finished
iterating - or was terminated due to timeout. I'm not sure if
windowing as such plays any role in this, but maybe can.
You'd have to make sure things don't deadlock. If a step inside the
transform that was being iterated had an event-time timer, what
triggers that timer? If that timer is triggered by the watermark of
the previous step and that watermark is being held up by the entire
iteration, then this timer will never fire and the whole transform
could deadlock. This was one reason for multi-dimensional watermarks -
the timer can fire based on the watermark from the previous
iterations, and so never deadlocks (though figuring out how to
efficiently implement watermarks of unbounded dimensionality might be
difficult).
YOn 6/23/21 10:53 PM, Reuven Lax wrote:
One question I have is whether the use cases for cyclic
graphs overlap substantially with the use cases for
event-time watermarks. Many of the uses I'm aware of are
ML-type algorithms (e.g. clustering) or iterative algorithms
on large graphs (connected components, etc.), and it's
unclear how many of these problems have a natural time
component.
On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Reuven, can you please elaborate a little on that? Why
do you need watermark per iteration? Letting the
watermark progress as soon as all the keys arriving
before the upstream watermark terminate the cycle seems
like a valid definition without the need to make the
watermark multidimensional. Yes, it introduces (possibly
unbounded) latency in downstream processing, but that is
something that should be probably expected. The
unboundness of the latency can be limited by either
fixed timeout or number of iterations.
On 6/23/21 8:39 PM, Reuven Lax wrote:
This was explored in the past, though the design
started getting very complex (watermarks of unbounded
dimension, where each iteration has its own watermark
dimension). At the time, the exploration petered out.
On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
I'd like to discuss a very rough idea. I didn't
walk through all the
corner cases and the whole idea has a lot of rough
edges, so please bear
with me. I was thinking about non-IO applications
of splittable DoFn,
and the main idea - and why it is called splittable
- is that it can
handle unbounded outputs per element. Then I was
thinking about what can
generate unbounded outputs per element _without
reading from external
source_ (as that would be IO application) - and
then I realized that the
data can - at least theoretically - come from a
downstream transform. It
would have to be passed over an RPC (gRPC probably)
connection, it would
probably require some sort of service discovery -
as the feedback loop
would have to be correctly targeted based on key -
and so on (those are
the rough edges).
But supposing this can be solved - what iterations
actually mean is the
we have a side channel, that come from downstream
processing - and we
need a watermark estimator for this channel, that
is able to hold the
watermark back until the very last element (at a
certain watermark)
finishes the iteration. The idea is then we could -
in theory - create
an Iteration PTransform, that would take another
PTransform (probably
something like PTransform<PCollection<KV<K, V>>,
PCollection<KV<K,
IterationResult<K, V>>>, where the
IterationResult<K, V> would contain
the original KV<K, V> and a stopping condition
(true, false) and by
creating the feedback loop from the output of this
PCollection we could
actually implement this without any need of support
on the side of runners.
Does that seem like something that might be worth
exploring?
Jan