Which intuitive property?

On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]>
wrote:

> I created a test demonstrating that our triggers violate this pretty
> intuitive property.
>
> https://github.com/apache/beam/pull/4239
>
> I think we should discuss this: seems like a pretty important property to
> me and seems pretty bad that it's violated. Again, per discussion below,
> some of it should be addressed with changing semantics of individual
> triggers, and some with prohibiting certain combinations of triggers /
> accumulation modes / GBKs.
>
> I tried changing the continuation trigger of some triggers to be the
> "always fire" trigger, however we have a lot of code assuming that it will
> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
> OnceTrigger, and of course that components of a OnceTrigger have to be a
> OnceTrigger) and I didn't yet dig into whether that division is at all
> important or just baked in at compile time.
>
> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]>
>>> wrote:
>>> > Hi,
>>> >
>>> > After a recent investigation of a data loss bug caused by unintuitive
>>> > behavior of some kinds of triggers, we had a discussion about how we
>>> can
>>> > protect against future issues like this, and I summarized it in
>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>>> >
>>> > Current Beam trigger semantics are rather confusing and in some cases
>>> > extremely unsafe, especially if the pipeline includes multiple chained
>>> GBKs.
>>> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169
>>> .
>>> >
>>> > There's multiple issues:
>>> >
>>> > The API allows users to specify terminating top-level triggers (e.g.
>>> > "trigger a pane after receiving 10000 elements in the window, and
>>> that's
>>> > it"), but experience from user support shows that this is nearly
>>> always a
>>> > mistake and the user did not intend to drop all further data.
>>> >
>>> > In general, triggers are the only place in Beam where data is being
>>> dropped
>>> > without making a lot of very loud noise about it - a practice for
>>> which the
>>> > PTransform style guide uses the language: "never, ever, ever do this".
>>> >
>>> > Continuation triggers are still worse. For context: continuation
>>> trigger is
>>> > the trigger that's set on the output of a GBK and controls further
>>> > aggregation of the results of this aggregation by downstream GBKs. The
>>> > output shouldn't just use the same trigger as the input, because e.g.
>>> if the
>>> > input trigger said "wait for an hour before emitting a pane", that
>>> doesn't
>>> > mean that we should wait for another hour before emitting a result of
>>> > aggregating the result of the input trigger. Continuation triggers try
>>> to
>>> > simulate the behavior "as if a pane of the input propagated through the
>>> > entire pipeline", but the implementation of individual continuation
>>> triggers
>>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>>> trigger
>>> > is "first 1 element in pane", and if the results of a first GBK are
>>> further
>>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>>> grouped
>>> > onto the same key), that effectively means that, of the keys of the
>>> first
>>> > GBK, only one survives and all others are dropped (what happened in
>>> the data
>>> > loss bug).
>>> >
>>> > The ultimate fix to all of these things is
>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
>>> > change, and meanwhile we have to do something. The options are, in
>>> order of
>>> > increasing backward incompatibility (but incompatibility in a
>>> "rejecting
>>> > something that previously was accepted but extremely dangerous" kind of
>>> > way):
>>> >
>>> > Make the continuation trigger of most triggers be the "always-fire"
>>> trigger.
>>> > Seems that this should be the case for all triggers except the
>>> watermark
>>> > trigger. This will definitely increase safety, but lead to more eager
>>> firing
>>> > of downstream aggregations. It also will violate a user's expectation
>>> that a
>>> > fire-once trigger fires everything downstream only once, but that
>>> > expectation appears impossible to satisfy safely.
>>>
>>> Note that firing more often for multiply stacked triggers, especially
>>> in the case of accumulating mode, easily leads to data corruption bugs
>>> where elements are duplicated and/or overcounted. I suppose that bad
>>> data is slightly easier to detect than missing data, but automatically
>>> turning once triggers into more-than-once triggers (automatically, or
>>> manually by prohibiting the former) isn't a straightforward fix.
>>>
>>
>> I agree that the combination of accumulating mode and stacked GBKs is
>> very problematic.
>> I think that the way it's defined now is a semantic bug that produces
>> garbage (duplicated data) with any repeated trigger, but garbage (lost
>> data, per current discussion) with any non-repeated trigger as well.
>>
>> I think we should prohibit the combination of accumulating mode and
>> stacked GBKs until we sort this out, which is again probably Sink Triggers:
>> per that document, I suppose, the accumulating mode would be set only on
>> the sink step, and upstream steps would get discarding mode [and downstream
>> steps would have to set something explicitly].
>>
>> By "prohibit" I mean "GBK applied to something with accumulating mode
>> should return a PCollection with an invalid trigger, where you have to
>> explicitly configure a trigger before you can GBK again". Does this sound
>> reasonable?
>>
>> If we agree on that, then I think having the continuation of all triggers
>> be the "always-fire" trigger is also safe?
>>
>>
>>>
>>> > Make the continuation trigger of some triggers be the "invalid"
>>> trigger,
>>> > i.e. require the user to set it explicitly: there's in general no good
>>> and
>>> > safe way to infer what a trigger on a second GBK "truly" should be,
>>> based on
>>> > the trigger of the PCollection input into a first GBK. This is
>>> especially
>>> > true for terminating triggers.
>>>
>>> I think this should be on the table until we come up with good
>>> semantics, which may not be until sink triggers. It's backwards
>>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>>> changing semantics/behavior/outputs with the first option). Note that
>>> sometimes for composite operations one does not have access to the
>>> second grouping operation, so we must at least provide an opt-out
>>> flag.
>>>
>> I would argue that the failure in those cases is Working As Intended: the
>> pipeline is having undefined behavior and we're forcing the user to remove
>> the ambiguity - either by specifying a repeated trigger (which they
>> probably should have done in the first place) or by specifying the next
>> trigger explicitly (which, granted, may require changing the composite
>> transform, because the transform hasn't accounted for potential usage in
>> this ambiguous case)
>>
>>
>>>
>>> > Prohibit top-level terminating triggers entirely. This will ensure
>>> that the
>>> > only data that ever gets dropped is "droppably late" data.
>>>
>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>>> trigger that we should keep allowing, and has perfectly reasonable
>>> continuation semantics. I suppose we could only allow it if allowed
>>> lateness is 0, but that makes things even more context sensitive.
>>>
>>> > Do people think that these options are sensible?
>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
>>> > discussion?
>>>
>>> I think this is a fair summary of the discussions we've had.
>>>
>>
>> Do we have any sort of trigger test suite, or high-level properties
>> triggers must satisfy?
>>
>> I'd propose the following property:
>> Given a PCollection with a particular trigger, that is being passed
>> through a GBK, processing through the downstream aggregations by default
>> should happens without dropping or introducing any more data [unless a
>> triggering strategy is explicitly set on a downstream aggregation to
>> something that violates this property].
>>
>> For example:
>> PCollection<V> pc = ...  // e.g. TestStream
>>
>> PCollection<V> regroup(PCollection<V> pc, int n) {
>>   return pc.apply(WithKeys.of(random key in 0..n))
>>       .apply(GroupByKey.create()).apply(Values.create()).apply(
>> Flatten.iterables());
>> }
>>
>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
>> triggering, windowing and accumulation mode.
>>
>> This property is currently definitely violated for some triggers; and
>> it's definitely violated for the accumulating mode; both aspects must be
>> fixed.
>>
>>
>>> - Robert
>>>
>>

Reply via email to