As I've mentioned before, I think OneTriggers are generally a bad idea. On Fri, Dec 8, 2017 at 1:05 PM, Eugene Kirpichov <[email protected]> wrote:
> The property that stacking more GBKs doesn't drop more data than a first > one already drops, nor introduce duplicate data (see previous email, > starting with "I'd propose the following property:"). > > On Fri, Dec 8, 2017 at 12:29 PM Reuven Lax <[email protected]> wrote: > >> Which intuitive property? >> >> On Fri, Dec 8, 2017 at 12:11 PM, Eugene Kirpichov <[email protected]> >> wrote: >> >>> I created a test demonstrating that our triggers violate this pretty >>> intuitive property. >>> >>> https://github.com/apache/beam/pull/4239 >>> >>> I think we should discuss this: seems like a pretty important property >>> to me and seems pretty bad that it's violated. Again, per discussion below, >>> some of it should be addressed with changing semantics of individual >>> triggers, and some with prohibiting certain combinations of triggers / >>> accumulation modes / GBKs. >>> >>> I tried changing the continuation trigger of some triggers to be the >>> "always fire" trigger, however we have a lot of code assuming that it will >>> be a OnceTrigger (e.g. that continuation of a OnceTrigger has to be a >>> OnceTrigger, and of course that components of a OnceTrigger have to be a >>> OnceTrigger) and I didn't yet dig into whether that division is at all >>> important or just baked in at compile time. >>> >>> On Thu, Dec 7, 2017 at 5:20 PM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> On Mon, Dec 4, 2017 at 4:24 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov <[email protected]> >>>>> wrote: >>>>> > Hi, >>>>> > >>>>> > After a recent investigation of a data loss bug caused by unintuitive >>>>> > behavior of some kinds of triggers, we had a discussion about how we >>>>> can >>>>> > protect against future issues like this, and I summarized it in >>>>> > https://issues.apache.org/jira/browse/BEAM-3288 . Copying here: >>>>> > >>>>> > Current Beam trigger semantics are rather confusing and in some cases >>>>> > extremely unsafe, especially if the pipeline includes multiple >>>>> chained GBKs. >>>>> > One example of that is https://issues.apache.org/ >>>>> jira/browse/BEAM-3169 . >>>>> > >>>>> > There's multiple issues: >>>>> > >>>>> > The API allows users to specify terminating top-level triggers (e.g. >>>>> > "trigger a pane after receiving 10000 elements in the window, and >>>>> that's >>>>> > it"), but experience from user support shows that this is nearly >>>>> always a >>>>> > mistake and the user did not intend to drop all further data. >>>>> > >>>>> > In general, triggers are the only place in Beam where data is being >>>>> dropped >>>>> > without making a lot of very loud noise about it - a practice for >>>>> which the >>>>> > PTransform style guide uses the language: "never, ever, ever do >>>>> this". >>>>> > >>>>> > Continuation triggers are still worse. For context: continuation >>>>> trigger is >>>>> > the trigger that's set on the output of a GBK and controls further >>>>> > aggregation of the results of this aggregation by downstream GBKs. >>>>> The >>>>> > output shouldn't just use the same trigger as the input, because >>>>> e.g. if the >>>>> > input trigger said "wait for an hour before emitting a pane", that >>>>> doesn't >>>>> > mean that we should wait for another hour before emitting a result of >>>>> > aggregating the result of the input trigger. Continuation triggers >>>>> try to >>>>> > simulate the behavior "as if a pane of the input propagated through >>>>> the >>>>> > entire pipeline", but the implementation of individual continuation >>>>> triggers >>>>> > doesn't do that. E.g. the continuation of "first N elements in pane" >>>>> trigger >>>>> > is "first 1 element in pane", and if the results of a first GBK are >>>>> further >>>>> > grouped by a second GBK onto more coarse key (e.g. if everything is >>>>> grouped >>>>> > onto the same key), that effectively means that, of the keys of the >>>>> first >>>>> > GBK, only one survives and all others are dropped (what happened in >>>>> the data >>>>> > loss bug). >>>>> > >>>>> > The ultimate fix to all of these things is >>>>> > https://s.apache.org/beam-sink-triggers . However, it is a huge >>>>> model >>>>> > change, and meanwhile we have to do something. The options are, in >>>>> order of >>>>> > increasing backward incompatibility (but incompatibility in a >>>>> "rejecting >>>>> > something that previously was accepted but extremely dangerous" kind >>>>> of >>>>> > way): >>>>> > >>>>> > Make the continuation trigger of most triggers be the "always-fire" >>>>> trigger. >>>>> > Seems that this should be the case for all triggers except the >>>>> watermark >>>>> > trigger. This will definitely increase safety, but lead to more >>>>> eager firing >>>>> > of downstream aggregations. It also will violate a user's >>>>> expectation that a >>>>> > fire-once trigger fires everything downstream only once, but that >>>>> > expectation appears impossible to satisfy safely. >>>>> >>>>> Note that firing more often for multiply stacked triggers, especially >>>>> in the case of accumulating mode, easily leads to data corruption bugs >>>>> where elements are duplicated and/or overcounted. I suppose that bad >>>>> data is slightly easier to detect than missing data, but automatically >>>>> turning once triggers into more-than-once triggers (automatically, or >>>>> manually by prohibiting the former) isn't a straightforward fix. >>>>> >>>> >>>> I agree that the combination of accumulating mode and stacked GBKs is >>>> very problematic. >>>> I think that the way it's defined now is a semantic bug that produces >>>> garbage (duplicated data) with any repeated trigger, but garbage (lost >>>> data, per current discussion) with any non-repeated trigger as well. >>>> >>>> I think we should prohibit the combination of accumulating mode and >>>> stacked GBKs until we sort this out, which is again probably Sink Triggers: >>>> per that document, I suppose, the accumulating mode would be set only on >>>> the sink step, and upstream steps would get discarding mode [and downstream >>>> steps would have to set something explicitly]. >>>> >>>> By "prohibit" I mean "GBK applied to something with accumulating mode >>>> should return a PCollection with an invalid trigger, where you have to >>>> explicitly configure a trigger before you can GBK again". Does this sound >>>> reasonable? >>>> >>>> If we agree on that, then I think having the continuation of all >>>> triggers be the "always-fire" trigger is also safe? >>>> >>>> >>>>> >>>>> > Make the continuation trigger of some triggers be the "invalid" >>>>> trigger, >>>>> > i.e. require the user to set it explicitly: there's in general no >>>>> good and >>>>> > safe way to infer what a trigger on a second GBK "truly" should be, >>>>> based on >>>>> > the trigger of the PCollection input into a first GBK. This is >>>>> especially >>>>> > true for terminating triggers. >>>>> >>>>> I think this should be on the table until we come up with good >>>>> semantics, which may not be until sink triggers. It's backwards >>>>> incompatible, but in a bug-fixing and verbose way (as opposed silently >>>>> changing semantics/behavior/outputs with the first option). Note that >>>>> sometimes for composite operations one does not have access to the >>>>> second grouping operation, so we must at least provide an opt-out >>>>> flag. >>>>> >>>> I would argue that the failure in those cases is Working As Intended: >>>> the pipeline is having undefined behavior and we're forcing the user to >>>> remove the ambiguity - either by specifying a repeated trigger (which they >>>> probably should have done in the first place) or by specifying the next >>>> trigger explicitly (which, granted, may require changing the composite >>>> transform, because the transform hasn't accounted for potential usage in >>>> this ambiguous case) >>>> >>>> >>>>> >>>>> > Prohibit top-level terminating triggers entirely. This will ensure >>>>> that the >>>>> > only data that ever gets dropped is "droppably late" data. >>>>> >>>>> The AfterWatermark.withEarlyFirings(...) is a top-level terminating >>>>> trigger that we should keep allowing, and has perfectly reasonable >>>>> continuation semantics. I suppose we could only allow it if allowed >>>>> lateness is 0, but that makes things even more context sensitive. >>>>> >>>>> > Do people think that these options are sensible? >>>>> > +Kenn Knowles +Thomas Groh +Ben Chambers is this a fair summary of >>>>> our >>>>> > discussion? >>>>> >>>>> I think this is a fair summary of the discussions we've had. >>>>> >>>> >>>> Do we have any sort of trigger test suite, or high-level properties >>>> triggers must satisfy? >>>> >>>> I'd propose the following property: >>>> Given a PCollection with a particular trigger, that is being passed >>>> through a GBK, processing through the downstream aggregations by default >>>> should happens without dropping or introducing any more data [unless a >>>> triggering strategy is explicitly set on a downstream aggregation to >>>> something that violates this property]. >>>> >>>> For example: >>>> PCollection<V> pc = ... // e.g. TestStream >>>> >>>> PCollection<V> regroup(PCollection<V> pc, int n) { >>>> return pc.apply(WithKeys.of(random key in 0..n)) >>>> .apply(GroupByKey.create()).apply(Values.create()).apply( >>>> Flatten.iterables()); >>>> } >>>> >>>> Then pc.regroup(n).regroup(1) == pc.regroup(n), regardless of pc's >>>> triggering, windowing and accumulation mode. >>>> >>>> This property is currently definitely violated for some triggers; and >>>> it's definitely violated for the accumulating mode; both aspects must be >>>> fixed. >>>> >>>> >>>>> - Robert >>>>> >>>> >>
