+1

> On 15. Nov 2017, at 14:07, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> 
> Agree !
> 
> Thanks Kenn,
> Regards
> JB
> 
> On 11/15/2017 02:05 PM, Kenneth Knowles wrote:
>> Reviving this again, since it came up again today in yet another context. I
>> think it is time to add this as an experimental annotation. I think we know
>> that we need it, and roughly how it should work, while there are still
>> finer points to discuss about what it means for input to be stable.
>> So I filed https://issues.apache.org/jira/browse/BEAM-3194 and whipped up
>> https://github.com/apache/beam/pull/4135 to move it along a smidge. It will
>> need to be incorporated into the Beam model's ParDoPayload and the Python
>> SDK as well.
>> Kenn
>> On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>>> Well the Fn API is still being designed, so this is something we'd have to
>>> think about.
>>> 
>>> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
>>> rober...@google.com.invalid> wrote:
>>> 
>>>> On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <re...@google.com.invalid>
>>>> wrote:
>>>>> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
>>>>> rober...@google.com.invalid> wrote:
>>>>> 
>>>>>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax <re...@google.com.invalid
>>>> 
>>>>>> wrote:
>>>>>>> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
>>>>>>>> The question here is whether the ordering is part of the "content"
>>> of
>>>>>>>> an iterable.
>>>>>>> 
>>>>>>> My initial instinct was to say yes - but maybe it should not be
>>> until
>>>>>> Beam
>>>>>>> has a first-class notion of sorted values after a GBK?
>>>>>> 
>>>>>> Yeah, I'm not sure on this either. Interestingly, if we consider
>>>>>> ordering to be important, than the composite gbk + ungroup will be
>>>>>> stable despite its components not being so.
>>>>>> 
>>>>>>>>>> As I mention above, the iterable is semantically [part of] a
>>>> single
>>>>>>>>>> element. So just to unpack this, to make sure we are talking
>>> about
>>>>>> the
>>>>>>>> same
>>>>>>>>>> thing, I think you are talking about GBK as implemented via
>>> GBKO +
>>>>>> GABW.
>>>>>>>>>> 
>>>>>>>>>> When the output of GABW is required to be stable but the output
>>> of
>>>>>> GBKO
>>>>>>>> is
>>>>>>>>>> not stable, we don't have stability for free in all cases by
>>>>>> inserting a
>>>>>>>>>> GBK, but require something more to make the output of GABW
>>>> stable, in
>>>>>>>> the
>>>>>>>>>> worst case a full materialization.
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Correct. My point is that there are alternate, cheaper ways of
>>>> doing
>>>>>>>> this.
>>>>>>>>> If GABW stores state in an ordered list, it can simply
>>> checkpoint a
>>>>>>>> market
>>>>>>>>> into that list to ensure that the output is stabl.
>>>>>>>> 
>>>>>>>> In the presence of non-trivial triggering and/or late data, I'm not
>>>> so
>>>>>>>> sure this is "easy." E.g. A bundle may fail, and more data may come
>>>> in
>>>>>>>> from upstream (and get appended to the buffer) before it is
>>> retried.
>>>>>>>> 
>>>>>>> 
>>>>>>> That will still work. If the subsequent ParDo has processed the
>>>> Iterable,
>>>>>>> that means we'll have successfully checkpointed a marker to the list
>>>>>> (using
>>>>>>> whatever technique the runner supports). More data coming in will
>>> get
>>>>>>> appended after the marker, so we can ensure that the retry still
>>> sees
>>>> the
>>>>>>> same elements in the Iterable.
>>>>>> 
>>>>>> I'm thinking of the following.
>>>>>> 
>>>>>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in
>>>>>> the state. A trigger gets set.
>>>>>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
>>>>>> for some reason fails.
>>>>>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
>>>>>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent
>>>>>> downstream.
>>>>>> 
>>>>>> 
>>>>> If you add the annotation specifying stableinput, then we will not do
>>>> this.
>>>>> Before we send anything downstream, we will add a marker to the list,
>>> and
>>>>> only forward data downstream once the marker has been checkpointed.
>>> This
>>>>> adds a bit of cost and latency of course, but the assumption is that
>>>> adding
>>>>> this annotation will always add some cost.
>>>> 
>>>> I don't think you can checkpoint anything "before sending data
>>>> downstream" if its being executed as part of a fused graph, unless we
>>>> add special support for this in the Fn API. I suppose the runner could
>>>> pre-emptively modify the state of any GABW operations before firing
>>>> triggers...
>>>> 
>>>>>> It is unclear when a marker would be added to the list. Is this in
>>>>>> step 2 which, despite failing, still result in modified state [v1, v2,
>>>>>> marker]? (And this state modification would have to be committed
>>>>>> before attempting the bundle, in case the "failure" was something like
>>>>>> a VM shutdown.) And only on success the state is modified to be (say
>>>>>> this is accumulating mode) [v1, v2]?
>>>>>> 
>>>>>> I think it could be done, but it may significantly complicate things.
>>>>>> 
>>>> 
>>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to