On Mon, Dec 11, 2017 at 8:47 PM, Kenneth Knowles <[email protected]> wrote:

> In discussion on https://github.com/apache/beam/pull/4135 and offline it
> came up how this should interact with stateful/timely DoFns.
>
> Highlights:
>
>  - It was suggested to support it for @OnTimer which would mean that the
> state is stable on retry
>

This sounds good.


>  - But an unfortunate corollary is that for stateful DoFn it should be
> stable after each element
>  - And also if the state is expected to be stable on replay, then ordering
> must be stable, including interleaving of timers and elements
>

I'm not sure why this follows? I think rather we simply need the timer not
to fire _until_ state is stable. Elements (and therefore state writes)
might retry in different orders, but the timer does not fire until the
result is committed.


>
> And since stateful DoFn are very useful for sinks, and
> @RequiresStableInput is very useful for sinks, I wonder if someone has
> thoughts about how these should interact?
>
> In the special case of @OnWindowExpiry (not yet implemented), the most
> important timer for flushing state, it wouldn't actually be so bad.
>
> Kenn
>
> On Wed, Nov 15, 2017 at 7:28 AM, Aljoscha Krettek <[email protected]>
> wrote:
>
>> +1
>>
>> > On 15. Nov 2017, at 14:07, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>> >
>> > Agree !
>> >
>> > Thanks Kenn,
>> > Regards
>> > JB
>> >
>> > On 11/15/2017 02:05 PM, Kenneth Knowles wrote:
>> >> Reviving this again, since it came up again today in yet another
>> context. I
>> >> think it is time to add this as an experimental annotation. I think we
>> know
>> >> that we need it, and roughly how it should work, while there are still
>> >> finer points to discuss about what it means for input to be stable.
>> >> So I filed https://issues.apache.org/jira/browse/BEAM-3194 and
>> whipped up
>> >> https://github.com/apache/beam/pull/4135 to move it along a smidge.
>> It will
>> >> need to be incorporated into the Beam model's ParDoPayload and the
>> Python
>> >> SDK as well.
>> >> Kenn
>> >> On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax <[email protected]>
>> >> wrote:
>> >>> Well the Fn API is still being designed, so this is something we'd
>> have to
>> >>> think about.
>> >>>
>> >>> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
>> >>> [email protected]> wrote:
>> >>>
>> >>>> On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <[email protected]
>> >
>> >>>> wrote:
>> >>>>> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
>> >>>>> [email protected]> wrote:
>> >>>>>
>> >>>>>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax
>> <[email protected]
>> >>>>
>> >>>>>> wrote:
>> >>>>>>> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
>> >>>>>>>> The question here is whether the ordering is part of the
>> "content"
>> >>> of
>> >>>>>>>> an iterable.
>> >>>>>>>
>> >>>>>>> My initial instinct was to say yes - but maybe it should not be
>> >>> until
>> >>>>>> Beam
>> >>>>>>> has a first-class notion of sorted values after a GBK?
>> >>>>>>
>> >>>>>> Yeah, I'm not sure on this either. Interestingly, if we consider
>> >>>>>> ordering to be important, than the composite gbk + ungroup will be
>> >>>>>> stable despite its components not being so.
>> >>>>>>
>> >>>>>>>>>> As I mention above, the iterable is semantically [part of] a
>> >>>> single
>> >>>>>>>>>> element. So just to unpack this, to make sure we are talking
>> >>> about
>> >>>>>> the
>> >>>>>>>> same
>> >>>>>>>>>> thing, I think you are talking about GBK as implemented via
>> >>> GBKO +
>> >>>>>> GABW.
>> >>>>>>>>>>
>> >>>>>>>>>> When the output of GABW is required to be stable but the output
>> >>> of
>> >>>>>> GBKO
>> >>>>>>>> is
>> >>>>>>>>>> not stable, we don't have stability for free in all cases by
>> >>>>>> inserting a
>> >>>>>>>>>> GBK, but require something more to make the output of GABW
>> >>>> stable, in
>> >>>>>>>> the
>> >>>>>>>>>> worst case a full materialization.
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Correct. My point is that there are alternate, cheaper ways of
>> >>>> doing
>> >>>>>>>> this.
>> >>>>>>>>> If GABW stores state in an ordered list, it can simply
>> >>> checkpoint a
>> >>>>>>>> market
>> >>>>>>>>> into that list to ensure that the output is stabl.
>> >>>>>>>>
>> >>>>>>>> In the presence of non-trivial triggering and/or late data, I'm
>> not
>> >>>> so
>> >>>>>>>> sure this is "easy." E.g. A bundle may fail, and more data may
>> come
>> >>>> in
>> >>>>>>>> from upstream (and get appended to the buffer) before it is
>> >>> retried.
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>> That will still work. If the subsequent ParDo has processed the
>> >>>> Iterable,
>> >>>>>>> that means we'll have successfully checkpointed a marker to the
>> list
>> >>>>>> (using
>> >>>>>>> whatever technique the runner supports). More data coming in will
>> >>> get
>> >>>>>>> appended after the marker, so we can ensure that the retry still
>> >>> sees
>> >>>> the
>> >>>>>>> same elements in the Iterable.
>> >>>>>>
>> >>>>>> I'm thinking of the following.
>> >>>>>>
>> >>>>>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored
>> in
>> >>>>>> the state. A trigger gets set.
>> >>>>>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
>> >>>>>> for some reason fails.
>> >>>>>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
>> >>>>>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is
>> sent
>> >>>>>> downstream.
>> >>>>>>
>> >>>>>>
>> >>>>> If you add the annotation specifying stableinput, then we will not
>> do
>> >>>> this.
>> >>>>> Before we send anything downstream, we will add a marker to the
>> list,
>> >>> and
>> >>>>> only forward data downstream once the marker has been checkpointed.
>> >>> This
>> >>>>> adds a bit of cost and latency of course, but the assumption is that
>> >>>> adding
>> >>>>> this annotation will always add some cost.
>> >>>>
>> >>>> I don't think you can checkpoint anything "before sending data
>> >>>> downstream" if its being executed as part of a fused graph, unless we
>> >>>> add special support for this in the Fn API. I suppose the runner
>> could
>> >>>> pre-emptively modify the state of any GABW operations before firing
>> >>>> triggers...
>> >>>>
>> >>>>>> It is unclear when a marker would be added to the list. Is this in
>> >>>>>> step 2 which, despite failing, still result in modified state [v1,
>> v2,
>> >>>>>> marker]? (And this state modification would have to be committed
>> >>>>>> before attempting the bundle, in case the "failure" was something
>> like
>> >>>>>> a VM shutdown.) And only on success the state is modified to be
>> (say
>> >>>>>> this is accumulating mode) [v1, v2]?
>> >>>>>>
>> >>>>>> I think it could be done, but it may significantly complicate
>> things.
>> >>>>>>
>> >>>>
>> >>>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > [email protected]
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>>
>>
>

Reply via email to