On 6/23/21 11:13 PM, Reuven Lax wrote:
On Wed, Jun 23, 2021 at 2:00 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
The most qualitatively import use-case I see are ACID transactions
- transactions naturally involve cycles, because the most natural
implementation would be of something like "optimistic locking"
where the transaction is allowed to progress until a downstream
"commit" sees a conflict, when it needs to return the transaction
back to the beginning (pretty much the same as how git resolves
conflict in a push).
True, however within a transform one could use timers to implement
this (there are currently some bugs around looping timers I believe,
but those are easier to fix than implementing a brand new programming
model). Iterative is only really necessary if you need to iterate an
entire subgraph, including GroupByKeys, etc.
There necessarily needs to be GBK to support transactions. If we take
the most prominent example of a transaction - moving some amount of cash
between two bank accounts - we can have a state for the current amount
of cash per user. This state would be keyed, request to transfer some
amount would come as a atomic event "move amount X from user A to user
B". This would lead to updates of state in keyed state of A and B, but
we don't know if A or B have the required amount on their account. And
this is where we need to do two GBKs - one will assign a sequential ID
to each transaction (that is a serialized order) and the other
downstream will verify that the result was computed from the correct data.
This is maybe too complex to describe in short, but there definitely has
to be GBK (actually GroupAll operation) downstream and a cycle when a
post-condition fails.
Another application would be graph algorithms on changing graphs,
where adding or removing an edge might trigger an iterative
algorithm on the graph (and I'm absolutely not sure that the
discussed approach can do that, this is just something, that would
be cool to do :)).
Yes, that's what I had in mind. I'm just not sure that these
algorithms lend themselves to windowing. I.e. if we added iterative
support, but did not have support for windowing or watermarks across
iterations, have we solved most of the problem?
I don't think there is any windowing involved. When a new road is built
between cities A and B it _immediately_ makes traveling between these
two cities faster. There is no discrete boundary.
I don't understand why we would drop support for watermarks - they would
be perfectly supported, every iteration key will have a watermark hold
that would be released when the key finished iterating - or was
terminated due to timeout. I'm not sure if windowing as such plays any
role in this, but maybe can.
On 6/23/21 10:53 PM, Reuven Lax wrote:
One question I have is whether the use cases for cyclic graphs
overlap substantially with the use cases for event-time
watermarks. Many of the uses I'm aware of are ML-type algorithms
(e.g. clustering) or iterative algorithms on large graphs
(connected components, etc.), and it's unclear how many of these
problems have a natural time component.
On Wed, Jun 23, 2021 at 1:49 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Reuven, can you please elaborate a little on that? Why do you
need watermark per iteration? Letting the watermark progress
as soon as all the keys arriving before the upstream
watermark terminate the cycle seems like a valid definition
without the need to make the watermark multidimensional. Yes,
it introduces (possibly unbounded) latency in downstream
processing, but that is something that should be probably
expected. The unboundness of the latency can be limited by
either fixed timeout or number of iterations.
On 6/23/21 8:39 PM, Reuven Lax wrote:
This was explored in the past, though the design started
getting very complex (watermarks of unbounded dimension,
where each iteration has its own watermark dimension). At
the time, the exploration petered out.
On Wed, Jun 23, 2021 at 10:13 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi,
I'd like to discuss a very rough idea. I didn't walk
through all the
corner cases and the whole idea has a lot of rough
edges, so please bear
with me. I was thinking about non-IO applications of
splittable DoFn,
and the main idea - and why it is called splittable - is
that it can
handle unbounded outputs per element. Then I was
thinking about what can
generate unbounded outputs per element _without reading
from external
source_ (as that would be IO application) - and then I
realized that the
data can - at least theoretically - come from a
downstream transform. It
would have to be passed over an RPC (gRPC probably)
connection, it would
probably require some sort of service discovery - as the
feedback loop
would have to be correctly targeted based on key - and
so on (those are
the rough edges).
But supposing this can be solved - what iterations
actually mean is the
we have a side channel, that come from downstream
processing - and we
need a watermark estimator for this channel, that is
able to hold the
watermark back until the very last element (at a certain
watermark)
finishes the iteration. The idea is then we could - in
theory - create
an Iteration PTransform, that would take another
PTransform (probably
something like PTransform<PCollection<KV<K, V>>,
PCollection<KV<K,
IterationResult<K, V>>>, where the IterationResult<K, V>
would contain
the original KV<K, V> and a stopping condition (true,
false) and by
creating the feedback loop from the output of this
PCollection we could
actually implement this without any need of support on
the side of runners.
Does that seem like something that might be worth exploring?
Jan