Which intuitive property? On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]> wrote:
> I created a test demonstrating that our triggers violate this pretty > intuitive property. > > https://github.com/apache/beam/pull/4239 > > I think we should discuss this: seems like a pretty important property to > me and seems pretty bad that it's violated. Again, per discussion below, > some of it should be addressed with changing semantics of individual > triggers, and some with prohibiting certain combinations of triggers / > accumulation modes / GBKs. > > I tried changing the continuation trigger of some triggers to be the > "always fire" trigger, however we have a lot of code assuming that it will > be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a > OnceTrigger, and of course that components of a OnceTrigger have to be a > OnceTrigger) and I didn't yet dig into whether that division is at all > important or just baked in at compile time. > > On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]> > wrote: > >> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]> >>> wrote: >>> > Hi, >>> > >>> > After a recent investigation of a data loss bug caused by unintuitive >>> > behavior of some kinds of triggers, we had a discussion about how we >>> can >>> > protect against future issues like this, and I summarized it in >>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here: >>> > >>> > Current Beam trigger semantics are rather confusing and in some cases >>> > extremely unsafe, especially if the pipeline includes multiple chained >>> GBKs. >>> > One example of that is https://issues.apache.org/jira/browse/BEAM-3169 >>> . >>> > >>> > There's multiple issues: >>> > >>> > The API allows users to specify terminating top-level triggers (e.g. >>> > "trigger a pane after receiving 10000 elements in the window, and >>> that's >>> > it"), but experience from user support shows that this is nearly >>> always a >>> > mistake and the user did not intend to drop all further data. >>> > >>> > In general, triggers are the only place in Beam where data is being >>> dropped >>> > without making a lot of very loud noise about it - a practice for >>> which the >>> > PTransform style guide uses the language: "never, ever, ever do this". >>> > >>> > Continuation triggers are still worse. For context: continuation >>> trigger is >>> > the trigger that's set on the output of a GBK and controls further >>> > aggregation of the results of this aggregation by downstream GBKs. The >>> > output shouldn't just use the same trigger as the input, because e.g. >>> if the >>> > input trigger said "wait for an hour before emitting a pane", that >>> doesn't >>> > mean that we should wait for another hour before emitting a result of >>> > aggregating the result of the input trigger. Continuation triggers try >>> to >>> > simulate the behavior "as if a pane of the input propagated through the >>> > entire pipeline", but the implementation of individual continuation >>> triggers >>> > doesn't do that. E.g. the continuation of "first N elements in pane" >>> trigger >>> > is "first 1 element in pane", and if the results of a first GBK are >>> further >>> > grouped by a second GBK onto more coarse key (e.g. if everything is >>> grouped >>> > onto the same key), that effectively means that, of the keys of the >>> first >>> > GBK, only one survives and all others are dropped (what happened in >>> the data >>> > loss bug). >>> > >>> > The ultimate fix to all of these things is >>> > https://s.apache.org/beam-sink-triggers . However, it is a huge model >>> > change, and meanwhile we have to do something. The options are, in >>> order of >>> > increasing backward incompatibility (but incompatibility in a >>> "rejecting >>> > something that previously was accepted but extremely dangerous" kind of >>> > way): >>> > >>> > Make the continuation trigger of most triggers be the "always-fire" >>> trigger. >>> > Seems that this should be the case for all triggers except the >>> watermark >>> > trigger. This will definitely increase safety, but lead to more eager >>> firing >>> > of downstream aggregations. It also will violate a user's expectation >>> that a >>> > fire-once trigger fires everything downstream only once, but that >>> > expectation appears impossible to satisfy safely. >>> >>> Note that firing more often for multiply stacked triggers, especially >>> in the case of accumulating mode, easily leads to data corruption bugs >>> where elements are duplicated and/or overcounted. I suppose that bad >>> data is slightly easier to detect than missing data, but automatically >>> turning once triggers into more-than-once triggers (automatically, or >>> manually by prohibiting the former) isn't a straightforward fix. >>> >> >> I agree that the combination of accumulating mode and stacked GBKs is >> very problematic. >> I think that the way it's defined now is a semantic bug that produces >> garbage (duplicated data) with any repeated trigger, but garbage (lost >> data, per current discussion) with any non-repeated trigger as well. >> >> I think we should prohibit the combination of accumulating mode and >> stacked GBKs until we sort this out, which is again probably Sink Triggers: >> per that document, I suppose, the accumulating mode would be set only on >> the sink step, and upstream steps would get discarding mode [and downstream >> steps would have to set something explicitly]. >> >> By "prohibit" I mean "GBK applied to something with accumulating mode >> should return a PCollection with an invalid trigger, where you have to >> explicitly configure a trigger before you can GBK again". Does this sound >> reasonable? >> >> If we agree on that, then I think having the continuation of all triggers >> be the "always-fire" trigger is also safe? >> >> >>> >>> > Make the continuation trigger of some triggers be the "invalid" >>> trigger, >>> > i.e. require the user to set it explicitly: there's in general no good >>> and >>> > safe way to infer what a trigger on a second GBK "truly" should be, >>> based on >>> > the trigger of the PCollection input into a first GBK. This is >>> especially >>> > true for terminating triggers. >>> >>> I think this should be on the table until we come up with good >>> semantics, which may not be until sink triggers. It's backwards >>> incompatible, but in a bug-fixing and verbose way (as opposed silently >>> changing semantics/behavior/outputs with the first option). Note that >>> sometimes for composite operations one does not have access to the >>> second grouping operation, so we must at least provide an opt-out >>> flag. >>> >> I would argue that the failure in those cases is Working As Intended: the >> pipeline is having undefined behavior and we're forcing the user to remove >> the ambiguity - either by specifying a repeated trigger (which they >> probably should have done in the first place) or by specifying the next >> trigger explicitly (which, granted, may require changing the composite >> transform, because the transform hasn't accounted for potential usage in >> this ambiguous case) >> >> >>> >>> > Prohibit top-level terminating triggers entirely. This will ensure >>> that the >>> > only data that ever gets dropped is "droppably late" data. >>> >>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating >>> trigger that we should keep allowing, and has perfectly reasonable >>> continuation semantics. I suppose we could only allow it if allowed >>> lateness is 0, but that makes things even more context sensitive. >>> >>> > Do people think that these options are sensible? >>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of our >>> > discussion? >>> >>> I think this is a fair summary of the discussions we've had. >>> >> >> Do we have any sort of trigger test suite, or high-level properties >> triggers must satisfy? >> >> I'd propose the following property: >> Given a PCollection with a particular trigger, that is being passed >> through a GBK, processing through the downstream aggregations by default >> should happens without dropping or introducing any more data [unless a >> triggering strategy is explicitly set on a downstream aggregation to >> something that violates this property]. >> >> For example: >> PCollection<V> pc = ... // e.g. TestStream >> >> PCollection<V> regroup(PCollection<V> pc, int n) { >> return pc.apply(WithKeys.of(random key in 0..n)) >> .apply(GroupByKey.create()).apply(Values.create()).apply( >> Flatten.iterables()); >> } >> >> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's >> triggering, windowing and accumulation mode. >> >> This property is currently definitely violated for some triggers; and >> it's definitely violated for the accumulating mode; both aspects must be >> fixed. >> >> >>> - Robert >>> >>
