The property that stacking more GBKs doesn't drop more data than a first one already drops, nor introduce duplicate data (see previous email, starting with "I'd propose the following property:").
On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <[email protected]> wrote: > Which intuitive property? > > On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]> > wrote: > >> I created a test demonstrating that our triggers violate this pretty >> intuitive property. >> >> https://github.com/apache/beam/pull/4239 >> >> I think we should discuss this: seems like a pretty important property to >> me and seems pretty bad that it's violated. Again, per discussion below, >> some of it should be addressed with changing semantics of individual >> triggers, and some with prohibiting certain combinations of triggers / >> accumulation modes / GBKs. >> >> I tried changing the continuation trigger of some triggers to be the >> "always fire" trigger, however we have a lot of code assuming that it will >> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a >> OnceTrigger, and of course that components of a OnceTrigger have to be a >> OnceTrigger) and I didn't yet dig into whether that division is at all >> important or just baked in at compile time. >> >> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]> >> wrote: >> >>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]> >>>> wrote: >>>> > Hi, >>>> > >>>> > After a recent investigation of a data loss bug caused by unintuitive >>>> > behavior of some kinds of triggers, we had a discussion about how we >>>> can >>>> > protect against future issues like this, and I summarized it in >>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here: >>>> > >>>> > Current Beam trigger semantics are rather confusing and in some cases >>>> > extremely unsafe, especially if the pipeline includes multiple >>>> chained GBKs. >>>> > One example of that is >>>> https://issues.apache.org/jira/browse/BEAM-3169 . >>>> > >>>> > There's multiple issues: >>>> > >>>> > The API allows users to specify terminating top-level triggers (e.g. >>>> > "trigger a pane after receiving 10000 elements in the window, and >>>> that's >>>> > it"), but experience from user support shows that this is nearly >>>> always a >>>> > mistake and the user did not intend to drop all further data. >>>> > >>>> > In general, triggers are the only place in Beam where data is being >>>> dropped >>>> > without making a lot of very loud noise about it - a practice for >>>> which the >>>> > PTransform style guide uses the language: "never, ever, ever do this". >>>> > >>>> > Continuation triggers are still worse. For context: continuation >>>> trigger is >>>> > the trigger that's set on the output of a GBK and controls further >>>> > aggregation of the results of this aggregation by downstream GBKs. The >>>> > output shouldn't just use the same trigger as the input, because e.g. >>>> if the >>>> > input trigger said "wait for an hour before emitting a pane", that >>>> doesn't >>>> > mean that we should wait for another hour before emitting a result of >>>> > aggregating the result of the input trigger. Continuation triggers >>>> try to >>>> > simulate the behavior "as if a pane of the input propagated through >>>> the >>>> > entire pipeline", but the implementation of individual continuation >>>> triggers >>>> > doesn't do that. E.g. the continuation of "first N elements in pane" >>>> trigger >>>> > is "first 1 element in pane", and if the results of a first GBK are >>>> further >>>> > grouped by a second GBK onto more coarse key (e.g. if everything is >>>> grouped >>>> > onto the same key), that effectively means that, of the keys of the >>>> first >>>> > GBK, only one survives and all others are dropped (what happened in >>>> the data >>>> > loss bug). >>>> > >>>> > The ultimate fix to all of these things is >>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model >>>> > change, and meanwhile we have to do something. The options are, in >>>> order of >>>> > increasing backward incompatibility (but incompatibility in a >>>> "rejecting >>>> > something that previously was accepted but extremely dangerous" kind >>>> of >>>> > way): >>>> > >>>> > Make the continuation trigger of most triggers be the "always-fire" >>>> trigger. >>>> > Seems that this should be the case for all triggers except the >>>> watermark >>>> > trigger. This will definitely increase safety, but lead to more eager >>>> firing >>>> > of downstream aggregations. It also will violate a user's expectation >>>> that a >>>> > fire-once trigger fires everything downstream only once, but that >>>> > expectation appears impossible to satisfy safely. >>>> >>>> Note that firing more often for multiply stacked triggers, especially >>>> in the case of accumulating mode, easily leads to data corruption bugs >>>> where elements are duplicated and/or overcounted. I suppose that bad >>>> data is slightly easier to detect than missing data, but automatically >>>> turning once triggers into more-than-once triggers (automatically, or >>>> manually by prohibiting the former) isn't a straightforward fix. >>>> >>> >>> I agree that the combination of accumulating mode and stacked GBKs is >>> very problematic. >>> I think that the way it's defined now is a semantic bug that produces >>> garbage (duplicated data) with any repeated trigger, but garbage (lost >>> data, per current discussion) with any non-repeated trigger as well. >>> >>> I think we should prohibit the combination of accumulating mode and >>> stacked GBKs until we sort this out, which is again probably Sink Triggers: >>> per that document, I suppose, the accumulating mode would be set only on >>> the sink step, and upstream steps would get discarding mode [and downstream >>> steps would have to set something explicitly]. >>> >>> By "prohibit" I mean "GBK applied to something with accumulating mode >>> should return a PCollection with an invalid trigger, where you have to >>> explicitly configure a trigger before you can GBK again". Does this sound >>> reasonable? >>> >>> If we agree on that, then I think having the continuation of all >>> triggers be the "always-fire" trigger is also safe? >>> >>> >>>> >>>> > Make the continuation trigger of some triggers be the "invalid" >>>> trigger, >>>> > i.e. require the user to set it explicitly: there's in general no >>>> good and >>>> > safe way to infer what a trigger on a second GBK "truly" should be, >>>> based on >>>> > the trigger of the PCollection input into a first GBK. This is >>>> especially >>>> > true for terminating triggers. >>>> >>>> I think this should be on the table until we come up with good >>>> semantics, which may not be until sink triggers. It's backwards >>>> incompatible, but in a bug-fixing and verbose way (as opposed silently >>>> changing semantics/behavior/outputs with the first option). Note that >>>> sometimes for composite operations one does not have access to the >>>> second grouping operation, so we must at least provide an opt-out >>>> flag. >>>> >>> I would argue that the failure in those cases is Working As Intended: >>> the pipeline is having undefined behavior and we're forcing the user to >>> remove the ambiguity - either by specifying a repeated trigger (which they >>> probably should have done in the first place) or by specifying the next >>> trigger explicitly (which, granted, may require changing the composite >>> transform, because the transform hasn't accounted for potential usage in >>> this ambiguous case) >>> >>> >>>> >>>> > Prohibit top-level terminating triggers entirely. This will ensure >>>> that the >>>> > only data that ever gets dropped is "droppably late" data. >>>> >>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating >>>> trigger that we should keep allowing, and has perfectly reasonable >>>> continuation semantics. I suppose we could only allow it if allowed >>>> lateness is 0, but that makes things even more context sensitive. >>>> >>>> > Do people think that these options are sensible? >>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our >>>> > discussion? >>>> >>>> I think this is a fair summary of the discussions we've had. >>>> >>> >>> Do we have any sort of trigger test suite, or high-level properties >>> triggers must satisfy? >>> >>> I'd propose the following property: >>> Given a PCollection with a particular trigger, that is being passed >>> through a GBK, processing through the downstream aggregations by default >>> should happens without dropping or introducing any more data [unless a >>> triggering strategy is explicitly set on a downstream aggregation to >>> something that violates this property]. >>> >>> For example: >>> PCollection<V> pc = ... // e.g. TestStream >>> >>> PCollection<V> regroup(PCollection<V> pc, int n) { >>> return pc.apply(WithKeys.of(random key in 0..n)) >>> >>> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables()); >>> } >>> >>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's >>> triggering, windowing and accumulation mode. >>> >>> This property is currently definitely violated for some triggers; and >>> it's definitely violated for the accumulating mode; both aspects must be >>> fixed. >>> >>> >>>> - Robert >>>> >>> >
