FWIW, things seem to work if I make the continuations of AfterPane and
AfterProcessingTime wrapped into Repeatedly.forever() - then, in DISCARDING
mode, they pass the property.
They still fail it in ACCUMULATING mode. I think ACCUMULATING mode is
currently just semantically broken for stacked aggregations and needs to be
forbidden.

On Fri, Dec 8, 2017 at 1:10 PM Reuven Lax <[email protected]> wrote:

> As I've mentioned before, I think OneTriggers are generally a bad idea.
>
> On Fri, Dec 8, 2017 at 1:05 PM, Eugene Kirpichov <[email protected]>
> wrote:
>
>> The property that stacking more GBKs doesn't drop more data than a first
>> one already drops, nor introduce duplicate data (see previous email,
>> starting with "I'd propose the following property:").
>>
>> On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <[email protected]> wrote:
>>
>>> Which intuitive property?
>>>
>>> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> I created a test demonstrating that our triggers violate this pretty
>>>> intuitive property.
>>>>
>>>> https://github.com/apache/beam/pull/4239
>>>>
>>>> I think we should discuss this: seems like a pretty important property
>>>> to me and seems pretty bad that it's violated. Again, per discussion below,
>>>> some of it should be addressed with changing semantics of individual
>>>> triggers, and some with prohibiting certain combinations of triggers /
>>>> accumulation modes / GBKs.
>>>>
>>>> I tried changing the continuation trigger of some triggers to be the
>>>> "always fire" trigger, however we have a lot of code assuming that it will
>>>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
>>>> OnceTrigger, and of course that components of a OnceTrigger have to be a
>>>> OnceTrigger) and I didn't yet dig into whether that division is at all
>>>> important or just baked in at compile time.
>>>>
>>>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]>
>>>> wrote:
>>>>
>>>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <
>>>>>> [email protected]> wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > After a recent investigation of a data loss bug caused by
>>>>>> unintuitive
>>>>>> > behavior of some kinds of triggers, we had a discussion about how
>>>>>> we can
>>>>>> > protect against future issues like this, and I summarized it in
>>>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>>>>> >
>>>>>> > Current Beam trigger semantics are rather confusing and in some
>>>>>> cases
>>>>>> > extremely unsafe, especially if the pipeline includes multiple
>>>>>> chained GBKs.
>>>>>> > One example of that is
>>>>>> https://issues.apache.org/jira/browse/BEAM-3169 .
>>>>>> >
>>>>>> > There's multiple issues:
>>>>>> >
>>>>>> > The API allows users to specify terminating top-level triggers (e.g.
>>>>>> > "trigger a pane after receiving 10000 elements in the window, and
>>>>>> that's
>>>>>> > it"), but experience from user support shows that this is nearly
>>>>>> always a
>>>>>> > mistake and the user did not intend to drop all further data.
>>>>>> >
>>>>>> > In general, triggers are the only place in Beam where data is being
>>>>>> dropped
>>>>>> > without making a lot of very loud noise about it - a practice for
>>>>>> which the
>>>>>> > PTransform style guide uses the language: "never, ever, ever do
>>>>>> this".
>>>>>> >
>>>>>> > Continuation triggers are still worse. For context: continuation
>>>>>> trigger is
>>>>>> > the trigger that's set on the output of a GBK and controls further
>>>>>> > aggregation of the results of this aggregation by downstream GBKs.
>>>>>> The
>>>>>> > output shouldn't just use the same trigger as the input, because
>>>>>> e.g. if the
>>>>>> > input trigger said "wait for an hour before emitting a pane", that
>>>>>> doesn't
>>>>>> > mean that we should wait for another hour before emitting a result
>>>>>> of
>>>>>> > aggregating the result of the input trigger. Continuation triggers
>>>>>> try to
>>>>>> > simulate the behavior "as if a pane of the input propagated through
>>>>>> the
>>>>>> > entire pipeline", but the implementation of individual continuation
>>>>>> triggers
>>>>>> > doesn't do that. E.g. the continuation of "first N elements in
>>>>>> pane" trigger
>>>>>> > is "first 1 element in pane", and if the results of a first GBK are
>>>>>> further
>>>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>>>>> grouped
>>>>>> > onto the same key), that effectively means that, of the keys of the
>>>>>> first
>>>>>> > GBK, only one survives and all others are dropped (what happened in
>>>>>> the data
>>>>>> > loss bug).
>>>>>> >
>>>>>> > The ultimate fix to all of these things is
>>>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge
>>>>>> model
>>>>>> > change, and meanwhile we have to do something. The options are, in
>>>>>> order of
>>>>>> > increasing backward incompatibility (but incompatibility in a
>>>>>> "rejecting
>>>>>> > something that previously was accepted but extremely dangerous"
>>>>>> kind of
>>>>>> > way):
>>>>>> >
>>>>>> > Make the continuation trigger of most triggers be the "always-fire"
>>>>>> trigger.
>>>>>> > Seems that this should be the case for all triggers except the
>>>>>> watermark
>>>>>> > trigger. This will definitely increase safety, but lead to more
>>>>>> eager firing
>>>>>> > of downstream aggregations. It also will violate a user's
>>>>>> expectation that a
>>>>>> > fire-once trigger fires everything downstream only once, but that
>>>>>> > expectation appears impossible to satisfy safely.
>>>>>>
>>>>>> Note that firing more often for multiply stacked triggers, especially
>>>>>> in the case of accumulating mode, easily leads to data corruption bugs
>>>>>> where elements are duplicated and/or overcounted. I suppose that bad
>>>>>> data is slightly easier to detect than missing data, but automatically
>>>>>> turning once triggers into more-than-once triggers (automatically, or
>>>>>> manually by prohibiting the former) isn't a straightforward fix.
>>>>>>
>>>>>
>>>>> I agree that the combination of accumulating mode and stacked GBKs is
>>>>> very problematic.
>>>>> I think that the way it's defined now is a semantic bug that produces
>>>>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>>>>> data, per current discussion) with any non-repeated trigger as well.
>>>>>
>>>>> I think we should prohibit the combination of accumulating mode and
>>>>> stacked GBKs until we sort this out, which is again probably Sink 
>>>>> Triggers:
>>>>> per that document, I suppose, the accumulating mode would be set only on
>>>>> the sink step, and upstream steps would get discarding mode [and 
>>>>> downstream
>>>>> steps would have to set something explicitly].
>>>>>
>>>>> By "prohibit" I mean "GBK applied to something with accumulating mode
>>>>> should return a PCollection with an invalid trigger, where you have to
>>>>> explicitly configure a trigger before you can GBK again". Does this sound
>>>>> reasonable?
>>>>>
>>>>> If we agree on that, then I think having the continuation of all
>>>>> triggers be the "always-fire" trigger is also safe?
>>>>>
>>>>>
>>>>>>
>>>>>> > Make the continuation trigger of some triggers be the "invalid"
>>>>>> trigger,
>>>>>> > i.e. require the user to set it explicitly: there's in general no
>>>>>> good and
>>>>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>>>>> based on
>>>>>> > the trigger of the PCollection input into a first GBK. This is
>>>>>> especially
>>>>>> > true for terminating triggers.
>>>>>>
>>>>>> I think this should be on the table until we come up with good
>>>>>> semantics, which may not be until sink triggers. It's backwards
>>>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>>>>> changing semantics/behavior/outputs with the first option). Note that
>>>>>> sometimes for composite operations one does not have access to the
>>>>>> second grouping operation, so we must at least provide an opt-out
>>>>>> flag.
>>>>>>
>>>>> I would argue that the failure in those cases is Working As Intended:
>>>>> the pipeline is having undefined behavior and we're forcing the user to
>>>>> remove the ambiguity - either by specifying a repeated trigger (which they
>>>>> probably should have done in the first place) or by specifying the next
>>>>> trigger explicitly (which, granted, may require changing the composite
>>>>> transform, because the transform hasn't accounted for potential usage in
>>>>> this ambiguous case)
>>>>>
>>>>>
>>>>>>
>>>>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>>>>> that the
>>>>>> > only data that ever gets dropped is "droppably late" data.
>>>>>>
>>>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>>>>> trigger that we should keep allowing, and has perfectly reasonable
>>>>>> continuation semantics. I suppose we could only allow it if allowed
>>>>>> lateness is 0, but that makes things even more context sensitive.
>>>>>>
>>>>>> > Do people think that these options are sensible?
>>>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of
>>>>>> our
>>>>>> > discussion?
>>>>>>
>>>>>> I think this is a fair summary of the discussions we've had.
>>>>>>
>>>>>
>>>>> Do we have any sort of trigger test suite, or high-level properties
>>>>> triggers must satisfy?
>>>>>
>>>>> I'd propose the following property:
>>>>> Given a PCollection with a particular trigger, that is being passed
>>>>> through a GBK, processing through the downstream aggregations by default
>>>>> should happens without dropping or introducing any more data [unless a
>>>>> triggering strategy is explicitly set on a downstream aggregation to
>>>>> something that violates this property].
>>>>>
>>>>> For example:
>>>>> PCollection<V> pc = ...  // e.g. TestStream
>>>>>
>>>>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>>>>   return pc.apply(WithKeys.of(random key in 0..n))
>>>>>
>>>>> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
>>>>> }
>>>>>
>>>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>>>>> triggering, windowing and accumulation mode.
>>>>>
>>>>> This property is currently definitely violated for some triggers; and
>>>>> it's definitely violated for the accumulating mode; both aspects must be
>>>>> fixed.
>>>>>
>>>>>
>>>>>> - Robert
>>>>>>
>>>>>
>>>
>

Reply via email to