Of course the right answer is to just implement sink triggers and sidestep
the question altogether :).

In the meantime, I think leaving AfterSynchronizedProcessingTime in the
model makes the most sense, and runners can choose an implementation
between firing eagerly and waiting some amount of time until they think all
(most?) downstream results are in before firing, depending on how smart the
runner wants to be. As you point out, they're all correct, and we'll have
multiple firings due to the upstream trigger anyway, and this is safer than
it used to be (though still possibly requires work).


On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> wrote:

> Hi all,
>
> TL;DR:
> 1. should we replace "after synchronized processing time" with "after
> count 1"?
> 2. should we remove "continuation trigger" and leave this to runners?
>
> ----
>
> "AfterSynchronizedProcessingTime" triggers were invented to solve a
> specific problem. They are inconsistent across SDKs today.
>
>  - You have an aggregation/GBK with aligned processing time trigger like
> ("output every minute on the minute")
>  - You have a downstream aggregation/GBK between that and the sink
>  - You expect to have about one output every minute per key+window pair
>
> Any output of the upstream aggregation may contribute to any key+window of
> the downstream aggregation. The AfterSynchronizedProcessingTime trigger
> waits for all the processing time based triggers to fire and commit their
> outputs. The downstream aggregation will output as fast as possible in
> panes consistent with the upstream aggregation.
>
>  - The Java SDK behavior is as above, to output "as fast as reasonable".
>  - The Python SDK never uses "AfterSynchronizedProcessingTime" triggers
> but simply propagates the same trigger to the next GBK, creating additional
> delay.
>  - I don't know what the Go SDK may do, if it supports this at all.
>
> Any behavior could be defined as "correct". A simple option could be to
> have the downstream aggregation "fire always" aka "after element count 1".
> How would this change things? We would potentially see many more outputs.
>
> Why did we do this in the first place? There are (at least) these reasons:
>
>  - Previously, triggers could "finish" an aggregation thus dropping all
> further data. In this case, waiting for all outputs is critical or else you
> lose data. Now triggers cannot finish aggregations.
>  - Whenever there may be more than one pane, a user has to write logic to
> compensate and deal with it. Changing from guaranteed single pane to
> multi-pane would break things. So if the user configures a single firing,
> all downstream aggregations must respect it. Now that triggers cannot
> finish, I think processing time can only be used in multi-pane contexts
> anyhow.
>  - The above example illustrates how the behavior in Java maintains
> something that the user will expect. Or so we think. Maybe users don't care.
>
> How did we get into this inconsistent state? When the user specifies
> triggering it applies to the very nearest aggregation/GBK. The SDK decides
> what triggering to insert downstream. One possibility is to remove this and
> have it unspecified, left to runner behavior.
>
> I think maybe these pieces of complexity are both not helpful and also not
> (necessarily) breaking changes to alter, especially considering we have
> inconsistency in the model.
>
> WDYT? And I wonder what this means for xlang and portability... how does
> continuation triggering even work? (if at all)
>
> Kenn
>

Reply via email to