As I've mentioned before, I think OneTriggers are generally a bad idea.

On Fri, Dec 8, 2017 at 1:05 PM, Eugene Kirpichov <[email protected]>
wrote:

> The property that stacking more GBKs doesn't drop more data than a first
> one already drops, nor introduce duplicate data (see previous email,
> starting with "I'd propose the following property:").
>
> On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <[email protected]> wrote:
>
>> Which intuitive property?
>>
>> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> I created a test demonstrating that our triggers violate this pretty
>>> intuitive property.
>>>
>>> https://github.com/apache/beam/pull/4239
>>>
>>> I think we should discuss this: seems like a pretty important property
>>> to me and seems pretty bad that it's violated. Again, per discussion below,
>>> some of it should be addressed with changing semantics of individual
>>> triggers, and some with prohibiting certain combinations of triggers /
>>> accumulation modes / GBKs.
>>>
>>> I tried changing the continuation trigger of some triggers to be the
>>> "always fire" trigger, however we have a lot of code assuming that it will
>>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
>>> OnceTrigger, and of course that components of a OnceTrigger have to be a
>>> OnceTrigger) and I didn't yet dig into whether that division is at all
>>> important or just baked in at compile time.
>>>
>>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>>
>>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > After a recent investigation of a data loss bug caused by unintuitive
>>>>> > behavior of some kinds of triggers, we had a discussion about how we
>>>>> can
>>>>> > protect against future issues like this, and I summarized it in
>>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>>>> >
>>>>> > Current Beam trigger semantics are rather confusing and in some cases
>>>>> > extremely unsafe, especially if the pipeline includes multiple
>>>>> chained GBKs.
>>>>> > One example of that is https://issues.apache.org/
>>>>> jira/browse/BEAM-3169 .
>>>>> >
>>>>> > There's multiple issues:
>>>>> >
>>>>> > The API allows users to specify terminating top-level triggers (e.g.
>>>>> > "trigger a pane after receiving 10000 elements in the window, and
>>>>> that's
>>>>> > it"), but experience from user support shows that this is nearly
>>>>> always a
>>>>> > mistake and the user did not intend to drop all further data.
>>>>> >
>>>>> > In general, triggers are the only place in Beam where data is being
>>>>> dropped
>>>>> > without making a lot of very loud noise about it - a practice for
>>>>> which the
>>>>> > PTransform style guide uses the language: "never, ever, ever do
>>>>> this".
>>>>> >
>>>>> > Continuation triggers are still worse. For context: continuation
>>>>> trigger is
>>>>> > the trigger that's set on the output of a GBK and controls further
>>>>> > aggregation of the results of this aggregation by downstream GBKs.
>>>>> The
>>>>> > output shouldn't just use the same trigger as the input, because
>>>>> e.g. if the
>>>>> > input trigger said "wait for an hour before emitting a pane", that
>>>>> doesn't
>>>>> > mean that we should wait for another hour before emitting a result of
>>>>> > aggregating the result of the input trigger. Continuation triggers
>>>>> try to
>>>>> > simulate the behavior "as if a pane of the input propagated through
>>>>> the
>>>>> > entire pipeline", but the implementation of individual continuation
>>>>> triggers
>>>>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>>>>> trigger
>>>>> > is "first 1 element in pane", and if the results of a first GBK are
>>>>> further
>>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>>>> grouped
>>>>> > onto the same key), that effectively means that, of the keys of the
>>>>> first
>>>>> > GBK, only one survives and all others are dropped (what happened in
>>>>> the data
>>>>> > loss bug).
>>>>> >
>>>>> > The ultimate fix to all of these things is
>>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge
>>>>> model
>>>>> > change, and meanwhile we have to do something. The options are, in
>>>>> order of
>>>>> > increasing backward incompatibility (but incompatibility in a
>>>>> "rejecting
>>>>> > something that previously was accepted but extremely dangerous" kind
>>>>> of
>>>>> > way):
>>>>> >
>>>>> > Make the continuation trigger of most triggers be the "always-fire"
>>>>> trigger.
>>>>> > Seems that this should be the case for all triggers except the
>>>>> watermark
>>>>> > trigger. This will definitely increase safety, but lead to more
>>>>> eager firing
>>>>> > of downstream aggregations. It also will violate a user's
>>>>> expectation that a
>>>>> > fire-once trigger fires everything downstream only once, but that
>>>>> > expectation appears impossible to satisfy safely.
>>>>>
>>>>> Note that firing more often for multiply stacked triggers, especially
>>>>> in the case of accumulating mode, easily leads to data corruption bugs
>>>>> where elements are duplicated and/or overcounted. I suppose that bad
>>>>> data is slightly easier to detect than missing data, but automatically
>>>>> turning once triggers into more-than-once triggers (automatically, or
>>>>> manually by prohibiting the former) isn't a straightforward fix.
>>>>>
>>>>
>>>> I agree that the combination of accumulating mode and stacked GBKs is
>>>> very problematic.
>>>> I think that the way it's defined now is a semantic bug that produces
>>>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>>>> data, per current discussion) with any non-repeated trigger as well.
>>>>
>>>> I think we should prohibit the combination of accumulating mode and
>>>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>>>> per that document, I suppose, the accumulating mode would be set only on
>>>> the sink step, and upstream steps would get discarding mode [and downstream
>>>> steps would have to set something explicitly].
>>>>
>>>> By "prohibit" I mean "GBK applied to something with accumulating mode
>>>> should return a PCollection with an invalid trigger, where you have to
>>>> explicitly configure a trigger before you can GBK again". Does this sound
>>>> reasonable?
>>>>
>>>> If we agree on that, then I think having the continuation of all
>>>> triggers be the "always-fire" trigger is also safe?
>>>>
>>>>
>>>>>
>>>>> > Make the continuation trigger of some triggers be the "invalid"
>>>>> trigger,
>>>>> > i.e. require the user to set it explicitly: there's in general no
>>>>> good and
>>>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>>>> based on
>>>>> > the trigger of the PCollection input into a first GBK. This is
>>>>> especially
>>>>> > true for terminating triggers.
>>>>>
>>>>> I think this should be on the table until we come up with good
>>>>> semantics, which may not be until sink triggers. It's backwards
>>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>>>> changing semantics/behavior/outputs with the first option). Note that
>>>>> sometimes for composite operations one does not have access to the
>>>>> second grouping operation, so we must at least provide an opt-out
>>>>> flag.
>>>>>
>>>> I would argue that the failure in those cases is Working As Intended:
>>>> the pipeline is having undefined behavior and we're forcing the user to
>>>> remove the ambiguity - either by specifying a repeated trigger (which they
>>>> probably should have done in the first place) or by specifying the next
>>>> trigger explicitly (which, granted, may require changing the composite
>>>> transform, because the transform hasn't accounted for potential usage in
>>>> this ambiguous case)
>>>>
>>>>
>>>>>
>>>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>>>> that the
>>>>> > only data that ever gets dropped is "droppably late" data.
>>>>>
>>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>>>> trigger that we should keep allowing, and has perfectly reasonable
>>>>> continuation semantics. I suppose we could only allow it if allowed
>>>>> lateness is 0, but that makes things even more context sensitive.
>>>>>
>>>>> > Do people think that these options are sensible?
>>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of
>>>>> our
>>>>> > discussion?
>>>>>
>>>>> I think this is a fair summary of the discussions we've had.
>>>>>
>>>>
>>>> Do we have any sort of trigger test suite, or high-level properties
>>>> triggers must satisfy?
>>>>
>>>> I'd propose the following property:
>>>> Given a PCollection with a particular trigger, that is being passed
>>>> through a GBK, processing through the downstream aggregations by default
>>>> should happens without dropping or introducing any more data [unless a
>>>> triggering strategy is explicitly set on a downstream aggregation to
>>>> something that violates this property].
>>>>
>>>> For example:
>>>> PCollection<V> pc = ...  // e.g. TestStream
>>>>
>>>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>>>   return pc.apply(WithKeys.of(random key in 0..n))
>>>>       .apply(GroupByKey.create()).apply(Values.create()).apply(
>>>> Flatten.iterables());
>>>> }
>>>>
>>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>>>> triggering, windowing and accumulation mode.
>>>>
>>>> This property is currently definitely violated for some triggers; and
>>>> it's definitely violated for the accumulating mode; both aspects must be
>>>> fixed.
>>>>
>>>>
>>>>> - Robert
>>>>>
>>>>
>>

Reply via email to