Agree !

Thanks Kenn,
Regards
JB

On 11/15/2017 02:05 PM, Kenneth Knowles wrote:
Reviving this again, since it came up again today in yet another context. I
think it is time to add this as an experimental annotation. I think we know
that we need it, and roughly how it should work, while there are still
finer points to discuss about what it means for input to be stable.

So I filed https://issues.apache.org/jira/browse/BEAM-3194 and whipped up
https://github.com/apache/beam/pull/4135 to move it along a smidge. It will
need to be incorporated into the Beam model's ParDoPayload and the Python
SDK as well.

Kenn

On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax <re...@google.com.invalid>
wrote:

Well the Fn API is still being designed, so this is something we'd have to
think about.

On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <re...@google.com.invalid>
wrote:
On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax <re...@google.com.invalid

wrote:
On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
The question here is whether the ordering is part of the "content"
of
an iterable.

My initial instinct was to say yes - but maybe it should not be
until
Beam
has a first-class notion of sorted values after a GBK?

Yeah, I'm not sure on this either. Interestingly, if we consider
ordering to be important, than the composite gbk + ungroup will be
stable despite its components not being so.

As I mention above, the iterable is semantically [part of] a
single
element. So just to unpack this, to make sure we are talking
about
the
same
thing, I think you are talking about GBK as implemented via
GBKO +
GABW.

When the output of GABW is required to be stable but the output
of
GBKO
is
not stable, we don't have stability for free in all cases by
inserting a
GBK, but require something more to make the output of GABW
stable, in
the
worst case a full materialization.


Correct. My point is that there are alternate, cheaper ways of
doing
this.
If GABW stores state in an ordered list, it can simply
checkpoint a
market
into that list to ensure that the output is stabl.

In the presence of non-trivial triggering and/or late data, I'm not
so
sure this is "easy." E.g. A bundle may fail, and more data may come
in
from upstream (and get appended to the buffer) before it is
retried.


That will still work. If the subsequent ParDo has processed the
Iterable,
that means we'll have successfully checkpointed a marker to the list
(using
whatever technique the runner supports). More data coming in will
get
appended after the marker, so we can ensure that the retry still
sees
the
same elements in the Iterable.

I'm thinking of the following.

1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in
the state. A trigger gets set.
2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
for some reason fails.
3. (k, v3) comes into the GABW and [v3] gets appended to the state.
4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent
downstream.


If you add the annotation specifying stableinput, then we will not do
this.
Before we send anything downstream, we will add a marker to the list,
and
only forward data downstream once the marker has been checkpointed.
This
adds a bit of cost and latency of course, but the assumption is that
adding
this annotation will always add some cost.

I don't think you can checkpoint anything "before sending data
downstream" if its being executed as part of a fused graph, unless we
add special support for this in the Fn API. I suppose the runner could
pre-emptively modify the state of any GABW operations before firing
triggers...

It is unclear when a marker would be added to the list. Is this in
step 2 which, despite failing, still result in modified state [v1, v2,
marker]? (And this state modification would have to be committed
before attempting the bundle, in case the "failure" was something like
a VM shutdown.) And only on success the state is modified to be (say
this is accumulating mode) [v1, v2]?

I think it could be done, but it may significantly complicate things.





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to