On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> wrote:
> > On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Of course the right answer is to just implement sink triggers and >> sidestep the question altogether :). >> >> In the meantime, I think leaving AfterSynchronizedProcessingTime in the >> model makes the most sense, and runners can choose an implementation >> between firing eagerly and waiting some amount of time until they think all >> (most?) downstream results are in before firing, depending on how smart the >> runner wants to be. As you point out, they're all correct, and we'll have >> multiple firings due to the upstream trigger anyway, and this is safer than >> it used to be (though still possibly requires work). >> > > Just to clarify, as I got a little confused, is your suggestion: Leave > AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK > put them in where they want, and let runners decide how to interpret them? > (this SGTM and requires the least/no changes) > Yep. We may want to update Python/Go to produce AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too, eventually, to better express intent. > Kenn > > *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to > this, except in implementation, and should be removed either way. > > >> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> Hi all, >>> >>> TL;DR: >>> 1. should we replace "after synchronized processing time" with "after >>> count 1"? >>> 2. should we remove "continuation trigger" and leave this to runners? >>> >>> ---- >>> >>> "AfterSynchronizedProcessingTime" triggers were invented to solve a >>> specific problem. They are inconsistent across SDKs today. >>> >>> - You have an aggregation/GBK with aligned processing time trigger like >>> ("output every minute on the minute") >>> - You have a downstream aggregation/GBK between that and the sink >>> - You expect to have about one output every minute per key+window pair >>> >>> Any output of the upstream aggregation may contribute to any key+window >>> of the downstream aggregation. The AfterSynchronizedProcessingTime trigger >>> waits for all the processing time based triggers to fire and commit their >>> outputs. The downstream aggregation will output as fast as possible in >>> panes consistent with the upstream aggregation. >>> >>> - The Java SDK behavior is as above, to output "as fast as reasonable". >>> - The Python SDK never uses "AfterSynchronizedProcessingTime" triggers >>> but simply propagates the same trigger to the next GBK, creating additional >>> delay. >>> - I don't know what the Go SDK may do, if it supports this at all. >>> >>> Any behavior could be defined as "correct". A simple option could be to >>> have the downstream aggregation "fire always" aka "after element count 1". >>> How would this change things? We would potentially see many more outputs. >>> >>> Why did we do this in the first place? There are (at least) these >>> reasons: >>> >>> - Previously, triggers could "finish" an aggregation thus dropping all >>> further data. In this case, waiting for all outputs is critical or else you >>> lose data. Now triggers cannot finish aggregations. >>> - Whenever there may be more than one pane, a user has to write logic >>> to compensate and deal with it. Changing from guaranteed single pane to >>> multi-pane would break things. So if the user configures a single firing, >>> all downstream aggregations must respect it. Now that triggers cannot >>> finish, I think processing time can only be used in multi-pane contexts >>> anyhow. >>> - The above example illustrates how the behavior in Java maintains >>> something that the user will expect. Or so we think. Maybe users don't care. >>> >>> How did we get into this inconsistent state? When the user specifies >>> triggering it applies to the very nearest aggregation/GBK. The SDK decides >>> what triggering to insert downstream. One possibility is to remove this and >>> have it unspecified, left to runner behavior. >>> >>> I think maybe these pieces of complexity are both not helpful and also >>> not (necessarily) breaking changes to alter, especially considering we have >>> inconsistency in the model. >>> >>> WDYT? And I wonder what this means for xlang and portability... how does >>> continuation triggering even work? (if at all) >>> >>> Kenn >>> >>