The property that stacking more GBKs doesn't drop more data than a first
one already drops, nor introduce duplicate data (see previous email,
starting with "I'd propose the following property:").

On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <[email protected]> wrote:

> Which intuitive property?
>
> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]>
> wrote:
>
>> I created a test demonstrating that our triggers violate this pretty
>> intuitive property.
>>
>> https://github.com/apache/beam/pull/4239
>>
>> I think we should discuss this: seems like a pretty important property to
>> me and seems pretty bad that it's violated. Again, per discussion below,
>> some of it should be addressed with changing semantics of individual
>> triggers, and some with prohibiting certain combinations of triggers /
>> accumulation modes / GBKs.
>>
>> I tried changing the continuation trigger of some triggers to be the
>> "always fire" trigger, however we have a lot of code assuming that it will
>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
>> OnceTrigger, and of course that components of a OnceTrigger have to be a
>> OnceTrigger) and I didn't yet dig into whether that division is at all
>> important or just baked in at compile time.
>>
>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]>
>>>> wrote:
>>>> > Hi,
>>>> >
>>>> > After a recent investigation of a data loss bug caused by unintuitive
>>>> > behavior of some kinds of triggers, we had a discussion about how we
>>>> can
>>>> > protect against future issues like this, and I summarized it in
>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>>> >
>>>> > Current Beam trigger semantics are rather confusing and in some cases
>>>> > extremely unsafe, especially if the pipeline includes multiple
>>>> chained GBKs.
>>>> > One example of that is
>>>> https://issues.apache.org/jira/browse/BEAM-3169 .
>>>> >
>>>> > There's multiple issues:
>>>> >
>>>> > The API allows users to specify terminating top-level triggers (e.g.
>>>> > "trigger a pane after receiving 10000 elements in the window, and
>>>> that's
>>>> > it"), but experience from user support shows that this is nearly
>>>> always a
>>>> > mistake and the user did not intend to drop all further data.
>>>> >
>>>> > In general, triggers are the only place in Beam where data is being
>>>> dropped
>>>> > without making a lot of very loud noise about it - a practice for
>>>> which the
>>>> > PTransform style guide uses the language: "never, ever, ever do this".
>>>> >
>>>> > Continuation triggers are still worse. For context: continuation
>>>> trigger is
>>>> > the trigger that's set on the output of a GBK and controls further
>>>> > aggregation of the results of this aggregation by downstream GBKs. The
>>>> > output shouldn't just use the same trigger as the input, because e.g.
>>>> if the
>>>> > input trigger said "wait for an hour before emitting a pane", that
>>>> doesn't
>>>> > mean that we should wait for another hour before emitting a result of
>>>> > aggregating the result of the input trigger. Continuation triggers
>>>> try to
>>>> > simulate the behavior "as if a pane of the input propagated through
>>>> the
>>>> > entire pipeline", but the implementation of individual continuation
>>>> triggers
>>>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>>>> trigger
>>>> > is "first 1 element in pane", and if the results of a first GBK are
>>>> further
>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>>> grouped
>>>> > onto the same key), that effectively means that, of the keys of the
>>>> first
>>>> > GBK, only one survives and all others are dropped (what happened in
>>>> the data
>>>> > loss bug).
>>>> >
>>>> > The ultimate fix to all of these things is
>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
>>>> > change, and meanwhile we have to do something. The options are, in
>>>> order of
>>>> > increasing backward incompatibility (but incompatibility in a
>>>> "rejecting
>>>> > something that previously was accepted but extremely dangerous" kind
>>>> of
>>>> > way):
>>>> >
>>>> > Make the continuation trigger of most triggers be the "always-fire"
>>>> trigger.
>>>> > Seems that this should be the case for all triggers except the
>>>> watermark
>>>> > trigger. This will definitely increase safety, but lead to more eager
>>>> firing
>>>> > of downstream aggregations. It also will violate a user's expectation
>>>> that a
>>>> > fire-once trigger fires everything downstream only once, but that
>>>> > expectation appears impossible to satisfy safely.
>>>>
>>>> Note that firing more often for multiply stacked triggers, especially
>>>> in the case of accumulating mode, easily leads to data corruption bugs
>>>> where elements are duplicated and/or overcounted. I suppose that bad
>>>> data is slightly easier to detect than missing data, but automatically
>>>> turning once triggers into more-than-once triggers (automatically, or
>>>> manually by prohibiting the former) isn't a straightforward fix.
>>>>
>>>
>>> I agree that the combination of accumulating mode and stacked GBKs is
>>> very problematic.
>>> I think that the way it's defined now is a semantic bug that produces
>>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>>> data, per current discussion) with any non-repeated trigger as well.
>>>
>>> I think we should prohibit the combination of accumulating mode and
>>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>>> per that document, I suppose, the accumulating mode would be set only on
>>> the sink step, and upstream steps would get discarding mode [and downstream
>>> steps would have to set something explicitly].
>>>
>>> By "prohibit" I mean "GBK applied to something with accumulating mode
>>> should return a PCollection with an invalid trigger, where you have to
>>> explicitly configure a trigger before you can GBK again". Does this sound
>>> reasonable?
>>>
>>> If we agree on that, then I think having the continuation of all
>>> triggers be the "always-fire" trigger is also safe?
>>>
>>>
>>>>
>>>> > Make the continuation trigger of some triggers be the "invalid"
>>>> trigger,
>>>> > i.e. require the user to set it explicitly: there's in general no
>>>> good and
>>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>>> based on
>>>> > the trigger of the PCollection input into a first GBK. This is
>>>> especially
>>>> > true for terminating triggers.
>>>>
>>>> I think this should be on the table until we come up with good
>>>> semantics, which may not be until sink triggers. It's backwards
>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>>> changing semantics/behavior/outputs with the first option). Note that
>>>> sometimes for composite operations one does not have access to the
>>>> second grouping operation, so we must at least provide an opt-out
>>>> flag.
>>>>
>>> I would argue that the failure in those cases is Working As Intended:
>>> the pipeline is having undefined behavior and we're forcing the user to
>>> remove the ambiguity - either by specifying a repeated trigger (which they
>>> probably should have done in the first place) or by specifying the next
>>> trigger explicitly (which, granted, may require changing the composite
>>> transform, because the transform hasn't accounted for potential usage in
>>> this ambiguous case)
>>>
>>>
>>>>
>>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>>> that the
>>>> > only data that ever gets dropped is "droppably late" data.
>>>>
>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>>> trigger that we should keep allowing, and has perfectly reasonable
>>>> continuation semantics. I suppose we could only allow it if allowed
>>>> lateness is 0, but that makes things even more context sensitive.
>>>>
>>>> > Do people think that these options are sensible?
>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
>>>> > discussion?
>>>>
>>>> I think this is a fair summary of the discussions we've had.
>>>>
>>>
>>> Do we have any sort of trigger test suite, or high-level properties
>>> triggers must satisfy?
>>>
>>> I'd propose the following property:
>>> Given a PCollection with a particular trigger, that is being passed
>>> through a GBK, processing through the downstream aggregations by default
>>> should happens without dropping or introducing any more data [unless a
>>> triggering strategy is explicitly set on a downstream aggregation to
>>> something that violates this property].
>>>
>>> For example:
>>> PCollection<V> pc = ...  // e.g. TestStream
>>>
>>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>>   return pc.apply(WithKeys.of(random key in 0..n))
>>>
>>> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
>>> }
>>>
>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>>> triggering, windowing and accumulation mode.
>>>
>>> This property is currently definitely violated for some triggers; and
>>> it's definitely violated for the accumulating mode; both aspects must be
>>> fixed.
>>>
>>>
>>>> - Robert
>>>>
>>>
>

Reply via email to