On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]> wrote:

> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]>
> wrote:
> > Hi,
> >
> > After a recent investigation of a data loss bug caused by unintuitive
> > behavior of some kinds of triggers, we had a discussion about how we can
> > protect against future issues like this, and I summarized it in
> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
> >
> > Current Beam trigger semantics are rather confusing and in some cases
> > extremely unsafe, especially if the pipeline includes multiple chained
> GBKs.
> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169 .
> >
> > There's multiple issues:
> >
> > The API allows users to specify terminating top-level triggers (e.g.
> > "trigger a pane after receiving 10000 elements in the window, and that's
> > it"), but experience from user support shows that this is nearly always a
> > mistake and the user did not intend to drop all further data.
> >
> > In general, triggers are the only place in Beam where data is being
> dropped
> > without making a lot of very loud noise about it - a practice for which
> the
> > PTransform style guide uses the language: "never, ever, ever do this".
> >
> > Continuation triggers are still worse. For context: continuation trigger
> is
> > the trigger that's set on the output of a GBK and controls further
> > aggregation of the results of this aggregation by downstream GBKs. The
> > output shouldn't just use the same trigger as the input, because e.g. if
> the
> > input trigger said "wait for an hour before emitting a pane", that
> doesn't
> > mean that we should wait for another hour before emitting a result of
> > aggregating the result of the input trigger. Continuation triggers try to
> > simulate the behavior "as if a pane of the input propagated through the
> > entire pipeline", but the implementation of individual continuation
> triggers
> > doesn't do that. E.g. the continuation of "first N elements in pane"
> trigger
> > is "first 1 element in pane", and if the results of a first GBK are
> further
> > grouped by a second GBK onto more coarse key (e.g. if everything is
> grouped
> > onto the same key), that effectively means that, of the keys of the first
> > GBK, only one survives and all others are dropped (what happened in the
> data
> > loss bug).
> >
> > The ultimate fix to all of these things is
> > https://s.apache.org/beam-sink-triggers . However, it is a huge model
> > change, and meanwhile we have to do something. The options are, in order
> of
> > increasing backward incompatibility (but incompatibility in a "rejecting
> > something that previously was accepted but extremely dangerous" kind of
> > way):
> >
> > Make the continuation trigger of most triggers be the "always-fire"
> trigger.
> > Seems that this should be the case for all triggers except the watermark
> > trigger. This will definitely increase safety, but lead to more eager
> firing
> > of downstream aggregations. It also will violate a user's expectation
> that a
> > fire-once trigger fires everything downstream only once, but that
> > expectation appears impossible to satisfy safely.
>
> Note that firing more often for multiply stacked triggers, especially
> in the case of accumulating mode, easily leads to data corruption bugs
> where elements are duplicated and/or overcounted. I suppose that bad
> data is slightly easier to detect than missing data, but automatically
> turning once triggers into more-than-once triggers (automatically, or
> manually by prohibiting the former) isn't a straightforward fix.
>

I agree that the combination of accumulating mode and stacked GBKs is very
problematic.
I think that the way it's defined now is a semantic bug that produces
garbage (duplicated data) with any repeated trigger, but garbage (lost
data, per current discussion) with any non-repeated trigger as well.

I think we should prohibit the combination of accumulating mode and stacked
GBKs until we sort this out, which is again probably Sink Triggers: per
that document, I suppose, the accumulating mode would be set only on the
sink step, and upstream steps would get discarding mode [and downstream
steps would have to set something explicitly].

By "prohibit" I mean "GBK applied to something with accumulating mode
should return a PCollection with an invalid trigger, where you have to
explicitly configure a trigger before you can GBK again". Does this sound
reasonable?

If we agree on that, then I think having the continuation of all triggers
be the "always-fire" trigger is also safe?


>
> > Make the continuation trigger of some triggers be the "invalid" trigger,
> > i.e. require the user to set it explicitly: there's in general no good
> and
> > safe way to infer what a trigger on a second GBK "truly" should be,
> based on
> > the trigger of the PCollection input into a first GBK. This is especially
> > true for terminating triggers.
>
> I think this should be on the table until we come up with good
> semantics, which may not be until sink triggers. It's backwards
> incompatible, but in a bug-fixing and verbose way (as opposed silently
> changing semantics/behavior/outputs with the first option). Note that
> sometimes for composite operations one does not have access to the
> second grouping operation, so we must at least provide an opt-out
> flag.
>
I would argue that the failure in those cases is Working As Intended: the
pipeline is having undefined behavior and we're forcing the user to remove
the ambiguity - either by specifying a repeated trigger (which they
probably should have done in the first place) or by specifying the next
trigger explicitly (which, granted, may require changing the composite
transform, because the transform hasn't accounted for potential usage in
this ambiguous case)


>
> > Prohibit top-level terminating triggers entirely. This will ensure that
> the
> > only data that ever gets dropped is "droppably late" data.
>
> The AfterWatermark.withEarlyFirings(...) is a top-level terminating
> trigger that we should keep allowing, and has perfectly reasonable
> continuation semantics. I suppose we could only allow it if allowed
> lateness is 0, but that makes things even more context sensitive.
>
> > Do people think that these options are sensible?
> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our
> > discussion?
>
> I think this is a fair summary of the discussions we've had.
>

Do we have any sort of trigger test suite, or high-level properties
triggers must satisfy?

I'd propose the following property:
Given a PCollection with a particular trigger, that is being passed through
a GBK, processing through the downstream aggregations by default should
happens without dropping or introducing any more data [unless a triggering
strategy is explicitly set on a downstream aggregation to something that
violates this property].

For example:
PCollection<V> pc = ...  // e.g. TestStream

PCollection<V> regroup(PCollection<V> pc, int n) {
  return pc.apply(WithKeys.of(random key in 0..n))

.apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
}

Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's
triggering, windowing and accumulation mode.

This property is currently definitely violated for some triggers; and it's
definitely violated for the accumulating mode; both aspects must be fixed.


> - Robert
>

Reply via email to