On Mon, Dec 11, 2017 at 8:47 PM, Kenneth Knowles <[email protected]> wrote:
> In discussion on https://github.com/apache/beam/pull/4135 and offline it > came up how this should interact with stateful/timely DoFns. > > Highlights: > > - It was suggested to support it for @OnTimer which would mean that the > state is stable on retry > This sounds good. > - But an unfortunate corollary is that for stateful DoFn it should be > stable after each element > - And also if the state is expected to be stable on replay, then ordering > must be stable, including interleaving of timers and elements > I'm not sure why this follows? I think rather we simply need the timer not to fire _until_ state is stable. Elements (and therefore state writes) might retry in different orders, but the timer does not fire until the result is committed. > > And since stateful DoFn are very useful for sinks, and > @RequiresStableInput is very useful for sinks, I wonder if someone has > thoughts about how these should interact? > > In the special case of @OnWindowExpiry (not yet implemented), the most > important timer for flushing state, it wouldn't actually be so bad. > > Kenn > > On Wed, Nov 15, 2017 at 7:28 AM, Aljoscha Krettek <[email protected]> > wrote: > >> +1 >> >> > On 15. Nov 2017, at 14:07, Jean-Baptiste Onofré <[email protected]> >> wrote: >> > >> > Agree ! >> > >> > Thanks Kenn, >> > Regards >> > JB >> > >> > On 11/15/2017 02:05 PM, Kenneth Knowles wrote: >> >> Reviving this again, since it came up again today in yet another >> context. I >> >> think it is time to add this as an experimental annotation. I think we >> know >> >> that we need it, and roughly how it should work, while there are still >> >> finer points to discuss about what it means for input to be stable. >> >> So I filed https://issues.apache.org/jira/browse/BEAM-3194 and >> whipped up >> >> https://github.com/apache/beam/pull/4135 to move it along a smidge. >> It will >> >> need to be incorporated into the Beam model's ParDoPayload and the >> Python >> >> SDK as well. >> >> Kenn >> >> On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax <[email protected]> >> >> wrote: >> >>> Well the Fn API is still being designed, so this is something we'd >> have to >> >>> think about. >> >>> >> >>> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw < >> >>> [email protected]> wrote: >> >>> >> >>>> On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <[email protected] >> > >> >>>> wrote: >> >>>>> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw < >> >>>>> [email protected]> wrote: >> >>>>> >> >>>>>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax >> <[email protected] >> >>>> >> >>>>>> wrote: >> >>>>>>> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw < >> >>>>>>>> The question here is whether the ordering is part of the >> "content" >> >>> of >> >>>>>>>> an iterable. >> >>>>>>> >> >>>>>>> My initial instinct was to say yes - but maybe it should not be >> >>> until >> >>>>>> Beam >> >>>>>>> has a first-class notion of sorted values after a GBK? >> >>>>>> >> >>>>>> Yeah, I'm not sure on this either. Interestingly, if we consider >> >>>>>> ordering to be important, than the composite gbk + ungroup will be >> >>>>>> stable despite its components not being so. >> >>>>>> >> >>>>>>>>>> As I mention above, the iterable is semantically [part of] a >> >>>> single >> >>>>>>>>>> element. So just to unpack this, to make sure we are talking >> >>> about >> >>>>>> the >> >>>>>>>> same >> >>>>>>>>>> thing, I think you are talking about GBK as implemented via >> >>> GBKO + >> >>>>>> GABW. >> >>>>>>>>>> >> >>>>>>>>>> When the output of GABW is required to be stable but the output >> >>> of >> >>>>>> GBKO >> >>>>>>>> is >> >>>>>>>>>> not stable, we don't have stability for free in all cases by >> >>>>>> inserting a >> >>>>>>>>>> GBK, but require something more to make the output of GABW >> >>>> stable, in >> >>>>>>>> the >> >>>>>>>>>> worst case a full materialization. >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Correct. My point is that there are alternate, cheaper ways of >> >>>> doing >> >>>>>>>> this. >> >>>>>>>>> If GABW stores state in an ordered list, it can simply >> >>> checkpoint a >> >>>>>>>> market >> >>>>>>>>> into that list to ensure that the output is stabl. >> >>>>>>>> >> >>>>>>>> In the presence of non-trivial triggering and/or late data, I'm >> not >> >>>> so >> >>>>>>>> sure this is "easy." E.g. A bundle may fail, and more data may >> come >> >>>> in >> >>>>>>>> from upstream (and get appended to the buffer) before it is >> >>> retried. >> >>>>>>>> >> >>>>>>> >> >>>>>>> That will still work. If the subsequent ParDo has processed the >> >>>> Iterable, >> >>>>>>> that means we'll have successfully checkpointed a marker to the >> list >> >>>>>> (using >> >>>>>>> whatever technique the runner supports). More data coming in will >> >>> get >> >>>>>>> appended after the marker, so we can ensure that the retry still >> >>> sees >> >>>> the >> >>>>>>> same elements in the Iterable. >> >>>>>> >> >>>>>> I'm thinking of the following. >> >>>>>> >> >>>>>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored >> in >> >>>>>> the state. A trigger gets set. >> >>>>>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but >> >>>>>> for some reason fails. >> >>>>>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state. >> >>>>>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is >> sent >> >>>>>> downstream. >> >>>>>> >> >>>>>> >> >>>>> If you add the annotation specifying stableinput, then we will not >> do >> >>>> this. >> >>>>> Before we send anything downstream, we will add a marker to the >> list, >> >>> and >> >>>>> only forward data downstream once the marker has been checkpointed. >> >>> This >> >>>>> adds a bit of cost and latency of course, but the assumption is that >> >>>> adding >> >>>>> this annotation will always add some cost. >> >>>> >> >>>> I don't think you can checkpoint anything "before sending data >> >>>> downstream" if its being executed as part of a fused graph, unless we >> >>>> add special support for this in the Fn API. I suppose the runner >> could >> >>>> pre-emptively modify the state of any GABW operations before firing >> >>>> triggers... >> >>>> >> >>>>>> It is unclear when a marker would be added to the list. Is this in >> >>>>>> step 2 which, despite failing, still result in modified state [v1, >> v2, >> >>>>>> marker]? (And this state modification would have to be committed >> >>>>>> before attempting the bundle, in case the "failure" was something >> like >> >>>>>> a VM shutdown.) And only on success the state is modified to be >> (say >> >>>>>> this is accumulating mode) [v1, v2]? >> >>>>>> >> >>>>>> I think it could be done, but it may significantly complicate >> things. >> >>>>>> >> >>>> >> >>> >> > >> > -- >> > Jean-Baptiste Onofré >> > [email protected] >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >> >> >
