FWIW, things seem to work if I make the continuations of AfterPane and AfterProcessingTime wrapped into Repeatedly.forever() - then, in DISCARDING mode, they pass the property. They still fail it in ACCUMULATING mode. I think ACCUMULATING mode is currently just semantically broken for stacked aggregations and needs to be forbidden.
On Fri, Dec 8, 2017 at 1:10 PM Reuven Lax <[email protected]> wrote: > As I've mentioned before, I think OneTriggers are generally a bad idea. > > On Fri, Dec 8, 2017 at 1:05 PM, Eugene Kirpichov <[email protected]> > wrote: > >> The property that stacking more GBKs doesn't drop more data than a first >> one already drops, nor introduce duplicate data (see previous email, >> starting with "I'd propose the following property:"). >> >> On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <[email protected]> wrote: >> >>> Which intuitive property? >>> >>> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> I created a test demonstrating that our triggers violate this pretty >>>> intuitive property. >>>> >>>> https://github.com/apache/beam/pull/4239 >>>> >>>> I think we should discuss this: seems like a pretty important property >>>> to me and seems pretty bad that it's violated. Again, per discussion below, >>>> some of it should be addressed with changing semantics of individual >>>> triggers, and some with prohibiting certain combinations of triggers / >>>> accumulation modes / GBKs. >>>> >>>> I tried changing the continuation trigger of some triggers to be the >>>> "always fire" trigger, however we have a lot of code assuming that it will >>>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a >>>> OnceTrigger, and of course that components of a OnceTrigger have to be a >>>> OnceTrigger) and I didn't yet dig into whether that division is at all >>>> important or just baked in at compile time. >>>> >>>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> >>>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov < >>>>>> [email protected]> wrote: >>>>>> > Hi, >>>>>> > >>>>>> > After a recent investigation of a data loss bug caused by >>>>>> unintuitive >>>>>> > behavior of some kinds of triggers, we had a discussion about how >>>>>> we can >>>>>> > protect against future issues like this, and I summarized it in >>>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here: >>>>>> > >>>>>> > Current Beam trigger semantics are rather confusing and in some >>>>>> cases >>>>>> > extremely unsafe, especially if the pipeline includes multiple >>>>>> chained GBKs. >>>>>> > One example of that is >>>>>> https://issues.apache.org/jira/browse/BEAM-3169 . >>>>>> > >>>>>> > There's multiple issues: >>>>>> > >>>>>> > The API allows users to specify terminating top-level triggers (e.g. >>>>>> > "trigger a pane after receiving 10000 elements in the window, and >>>>>> that's >>>>>> > it"), but experience from user support shows that this is nearly >>>>>> always a >>>>>> > mistake and the user did not intend to drop all further data. >>>>>> > >>>>>> > In general, triggers are the only place in Beam where data is being >>>>>> dropped >>>>>> > without making a lot of very loud noise about it - a practice for >>>>>> which the >>>>>> > PTransform style guide uses the language: "never, ever, ever do >>>>>> this". >>>>>> > >>>>>> > Continuation triggers are still worse. For context: continuation >>>>>> trigger is >>>>>> > the trigger that's set on the output of a GBK and controls further >>>>>> > aggregation of the results of this aggregation by downstream GBKs. >>>>>> The >>>>>> > output shouldn't just use the same trigger as the input, because >>>>>> e.g. if the >>>>>> > input trigger said "wait for an hour before emitting a pane", that >>>>>> doesn't >>>>>> > mean that we should wait for another hour before emitting a result >>>>>> of >>>>>> > aggregating the result of the input trigger. Continuation triggers >>>>>> try to >>>>>> > simulate the behavior "as if a pane of the input propagated through >>>>>> the >>>>>> > entire pipeline", but the implementation of individual continuation >>>>>> triggers >>>>>> > doesn't do that. E.g. the continuation of "first N elements in >>>>>> pane" trigger >>>>>> > is "first 1 element in pane", and if the results of a first GBK are >>>>>> further >>>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is >>>>>> grouped >>>>>> > onto the same key), that effectively means that, of the keys of the >>>>>> first >>>>>> > GBK, only one survives and all others are dropped (what happened in >>>>>> the data >>>>>> > loss bug). >>>>>> > >>>>>> > The ultimate fix to all of these things is >>>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge >>>>>> model >>>>>> > change, and meanwhile we have to do something. The options are, in >>>>>> order of >>>>>> > increasing backward incompatibility (but incompatibility in a >>>>>> "rejecting >>>>>> > something that previously was accepted but extremely dangerous" >>>>>> kind of >>>>>> > way): >>>>>> > >>>>>> > Make the continuation trigger of most triggers be the "always-fire" >>>>>> trigger. >>>>>> > Seems that this should be the case for all triggers except the >>>>>> watermark >>>>>> > trigger. This will definitely increase safety, but lead to more >>>>>> eager firing >>>>>> > of downstream aggregations. It also will violate a user's >>>>>> expectation that a >>>>>> > fire-once trigger fires everything downstream only once, but that >>>>>> > expectation appears impossible to satisfy safely. >>>>>> >>>>>> Note that firing more often for multiply stacked triggers, especially >>>>>> in the case of accumulating mode, easily leads to data corruption bugs >>>>>> where elements are duplicated and/or overcounted. I suppose that bad >>>>>> data is slightly easier to detect than missing data, but automatically >>>>>> turning once triggers into more-than-once triggers (automatically, or >>>>>> manually by prohibiting the former) isn't a straightforward fix. >>>>>> >>>>> >>>>> I agree that the combination of accumulating mode and stacked GBKs is >>>>> very problematic. >>>>> I think that the way it's defined now is a semantic bug that produces >>>>> garbage (duplicated data) with any repeated trigger, but garbage (lost >>>>> data, per current discussion) with any non-repeated trigger as well. >>>>> >>>>> I think we should prohibit the combination of accumulating mode and >>>>> stacked GBKs until we sort this out, which is again probably Sink >>>>> Triggers: >>>>> per that document, I suppose, the accumulating mode would be set only on >>>>> the sink step, and upstream steps would get discarding mode [and >>>>> downstream >>>>> steps would have to set something explicitly]. >>>>> >>>>> By "prohibit" I mean "GBK applied to something with accumulating mode >>>>> should return a PCollection with an invalid trigger, where you have to >>>>> explicitly configure a trigger before you can GBK again". Does this sound >>>>> reasonable? >>>>> >>>>> If we agree on that, then I think having the continuation of all >>>>> triggers be the "always-fire" trigger is also safe? >>>>> >>>>> >>>>>> >>>>>> > Make the continuation trigger of some triggers be the "invalid" >>>>>> trigger, >>>>>> > i.e. require the user to set it explicitly: there's in general no >>>>>> good and >>>>>> > safe way to infer what a trigger on a second GBK "truly" should be, >>>>>> based on >>>>>> > the trigger of the PCollection input into a first GBK. This is >>>>>> especially >>>>>> > true for terminating triggers. >>>>>> >>>>>> I think this should be on the table until we come up with good >>>>>> semantics, which may not be until sink triggers. It's backwards >>>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently >>>>>> changing semantics/behavior/outputs with the first option). Note that >>>>>> sometimes for composite operations one does not have access to the >>>>>> second grouping operation, so we must at least provide an opt-out >>>>>> flag. >>>>>> >>>>> I would argue that the failure in those cases is Working As Intended: >>>>> the pipeline is having undefined behavior and we're forcing the user to >>>>> remove the ambiguity - either by specifying a repeated trigger (which they >>>>> probably should have done in the first place) or by specifying the next >>>>> trigger explicitly (which, granted, may require changing the composite >>>>> transform, because the transform hasn't accounted for potential usage in >>>>> this ambiguous case) >>>>> >>>>> >>>>>> >>>>>> > Prohibit top-level terminating triggers entirely. This will ensure >>>>>> that the >>>>>> > only data that ever gets dropped is "droppably late" data. >>>>>> >>>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating >>>>>> trigger that we should keep allowing, and has perfectly reasonable >>>>>> continuation semantics. I suppose we could only allow it if allowed >>>>>> lateness is 0, but that makes things even more context sensitive. >>>>>> >>>>>> > Do people think that these options are sensible? >>>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of >>>>>> our >>>>>> > discussion? >>>>>> >>>>>> I think this is a fair summary of the discussions we've had. >>>>>> >>>>> >>>>> Do we have any sort of trigger test suite, or high-level properties >>>>> triggers must satisfy? >>>>> >>>>> I'd propose the following property: >>>>> Given a PCollection with a particular trigger, that is being passed >>>>> through a GBK, processing through the downstream aggregations by default >>>>> should happens without dropping or introducing any more data [unless a >>>>> triggering strategy is explicitly set on a downstream aggregation to >>>>> something that violates this property]. >>>>> >>>>> For example: >>>>> PCollection<V> pc = ... // e.g. TestStream >>>>> >>>>> PCollection<V> regroup(PCollection<V> pc, int n) { >>>>> return pc.apply(WithKeys.of(random key in 0..n)) >>>>> >>>>> .apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables()); >>>>> } >>>>> >>>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's >>>>> triggering, windowing and accumulation mode. >>>>> >>>>> This property is currently definitely violated for some triggers; and >>>>> it's definitely violated for the accumulating mode; both aspects must be >>>>> fixed. >>>>> >>>>> >>>>>> - Robert >>>>>> >>>>> >>> >
