I created a test demonstrating that our triggers violate this pretty
intuitive property.

https://github.com/apache/beam/pull/4239

I think we should discuss this: seems like a pretty important property to
me and seems pretty bad that it's violated. Again, per discussion below,
some of it should be addressed with changing semantics of individual
triggers, and some with prohibiting certain combinations of triggers /
accumulation modes / GBKs.

I tried changing the continuation trigger of some triggers to be the
"always fire" trigger, however we have a lot of code assuming that it will
be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a
OnceTrigger, and of course that components of a OnceTrigger have to be a
OnceTrigger) and I didn't yet dig into whether that division is at all
important or just baked in at compile time.

On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>> > Hi,
>> >
>> > After a recent investigation of a data loss bug caused by unintuitive
>> > behavior of some kinds of triggers, we had a discussion about how we can
>> > protect against future issues like this, and I summarized it in
>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>> >
>> > Current Beam trigger semantics are rather confusing and in some cases
>> > extremely unsafe, especially if the pipeline includes multiple chained
>> GBKs.
>> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169
>> .
>> >
>> > There's multiple issues:
>> >
>> > The API allows users to specify terminating top-level triggers (e.g.
>> > "trigger a pane after receiving 10000 elements in the window, and that's
>> > it"), but experience from user support shows that this is nearly always
>> a
>> > mistake and the user did not intend to drop all further data.
>> >
>> > In general, triggers are the only place in Beam where data is being
>> dropped
>> > without making a lot of very loud noise about it - a practice for which
>> the
>> > PTransform style guide uses the language: "never, ever, ever do this".
>> >
>> > Continuation triggers are still worse. For context: continuation
>> trigger is
>> > the trigger that's set on the output of a GBK and controls further
>> > aggregation of the results of this aggregation by downstream GBKs. The
>> > output shouldn't just use the same trigger as the input, because e.g.
>> if the
>> > input trigger said "wait for an hour before emitting a pane", that
>> doesn't
>> > mean that we should wait for another hour before emitting a result of
>> > aggregating the result of the input trigger. Continuation triggers try
>> to
>> > simulate the behavior "as if a pane of the input propagated through the
>> > entire pipeline", but the implementation of individual continuation
>> triggers
>> > doesn't do that. E.g. the continuation of "first N elements in pane"
>> trigger
>> > is "first 1 element in pane", and if the results of a first GBK are
>> further
>> > grouped by a second GBK onto more coarse key (e.g. if everything is
>> grouped
>> > onto the same key), that effectively means that, of the keys of the
>> first
>> > GBK, only one survives and all others are dropped (what happened in the
>> data
>> > loss bug).
>> >
>> > The ultimate fix to all of these things is
>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
>> > change, and meanwhile we have to do something. The options are, in
>> order of
>> > increasing backward incompatibility (but incompatibility in a "rejecting
>> > something that previously was accepted but extremely dangerous" kind of
>> > way):
>> >
>> > Make the continuation trigger of most triggers be the "always-fire"
>> trigger.
>> > Seems that this should be the case for all triggers except the watermark
>> > trigger. This will definitely increase safety, but lead to more eager
>> firing
>> > of downstream aggregations. It also will violate a user's expectation
>> that a
>> > fire-once trigger fires everything downstream only once, but that
>> > expectation appears impossible to satisfy safely.
>>
>> Note that firing more often for multiply stacked triggers, especially
>> in the case of accumulating mode, easily leads to data corruption bugs
>> where elements are duplicated and/or overcounted. I suppose that bad
>> data is slightly easier to detect than missing data, but automatically
>> turning once triggers into more-than-once triggers (automatically, or
>> manually by prohibiting the former) isn't a straightforward fix.
>>
>
> I agree that the combination of accumulating mode and stacked GBKs is very
> problematic.
> I think that the way it's defined now is a semantic bug that produces
> garbage (duplicated data) with any repeated trigger, but garbage (lost
> data, per current discussion) with any non-repeated trigger as well.
>
> I think we should prohibit the combination of accumulating mode and
> stacked GBKs until we sort this out, which is again probably Sink Triggers:
> per that document, I suppose, the accumulating mode would be set only on
> the sink step, and upstream steps would get discarding mode [and downstream
> steps would have to set something explicitly].
>
> By "prohibit" I mean "GBK applied to something with accumulating mode
> should return a PCollection with an invalid trigger, where you have to
> explicitly configure a trigger before you can GBK again". Does this sound
> reasonable?
>
> If we agree on that, then I think having the continuation of all triggers
> be the "always-fire" trigger is also safe?
>
>
>>
>> > Make the continuation trigger of some triggers be the "invalid" trigger,
>> > i.e. require the user to set it explicitly: there's in general no good
>> and
>> > safe way to infer what a trigger on a second GBK "truly" should be,
>> based on
>> > the trigger of the PCollection input into a first GBK. This is
>> especially
>> > true for terminating triggers.
>>
>> I think this should be on the table until we come up with good
>> semantics, which may not be until sink triggers. It's backwards
>> incompatible, but in a bug-fixing and verbose way (as opposed silently
>> changing semantics/behavior/outputs with the first option). Note that
>> sometimes for composite operations one does not have access to the
>> second grouping operation, so we must at least provide an opt-out
>> flag.
>>
> I would argue that the failure in those cases is Working As Intended: the
> pipeline is having undefined behavior and we're forcing the user to remove
> the ambiguity - either by specifying a repeated trigger (which they
> probably should have done in the first place) or by specifying the next
> trigger explicitly (which, granted, may require changing the composite
> transform, because the transform hasn't accounted for potential usage in
> this ambiguous case)
>
>
>>
>> > Prohibit top-level terminating triggers entirely. This will ensure that
>> the
>> > only data that ever gets dropped is "droppably late" data.
>>
>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
>> trigger that we should keep allowing, and has perfectly reasonable
>> continuation semantics. I suppose we could only allow it if allowed
>> lateness is 0, but that makes things even more context sensitive.
>>
>> > Do people think that these options are sensible?
>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
>> > discussion?
>>
>> I think this is a fair summary of the discussions we've had.
>>
>
> Do we have any sort of trigger test suite, or high-level properties
> triggers must satisfy?
>
> I'd propose the following property:
> Given a PCollection with a particular trigger, that is being passed
> through a GBK, processing through the downstream aggregations by default
> should happens without dropping or introducing any more data [unless a
> triggering strategy is explicitly set on a downstream aggregation to
> something that violates this property].
>
> For example:
> PCollection<V> pc = ...  // e.g. TestStream
>
> PCollection<V> regroup(PCollection<V> pc, int n) {
>   return pc.apply(WithKeys.of(random key in 0..n))
>
> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
> }
>
> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
> triggering, windowing and accumulation mode.
>
> This property is currently definitely violated for some triggers; and it's
> definitely violated for the accumulating mode; both aspects must be fixed.
>
>
>> - Robert
>>
>

Reply via email to