On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org> wrote:

>
> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> Of course the right answer is to just implement sink triggers and
>> sidestep the question altogether :).
>>
>> In the meantime, I think leaving AfterSynchronizedProcessingTime in the
>> model makes the most sense, and runners can choose an implementation
>> between firing eagerly and waiting some amount of time until they think all
>> (most?) downstream results are in before firing, depending on how smart the
>> runner wants to be. As you point out, they're all correct, and we'll have
>> multiple firings due to the upstream trigger anyway, and this is safer than
>> it used to be (though still possibly requires work).
>>
>
> Just to clarify, as I got a little confused, is your suggestion: Leave
> AfterSynchronizedProcessingTime* triggers in the model/proto, let the SDK
> put them in where they want, and let runners decide how to interpret them?
> (this SGTM and requires the least/no changes)
>

Yep. We may want to update Python/Go to produce
AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too,
eventually, to better express intent.


> Kenn
>
> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related to
> this, except in implementation, and should be removed either way.
>
>
>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Hi all,
>>>
>>> TL;DR:
>>> 1. should we replace "after synchronized processing time" with "after
>>> count 1"?
>>> 2. should we remove "continuation trigger" and leave this to runners?
>>>
>>> ----
>>>
>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a
>>> specific problem. They are inconsistent across SDKs today.
>>>
>>>  - You have an aggregation/GBK with aligned processing time trigger like
>>> ("output every minute on the minute")
>>>  - You have a downstream aggregation/GBK between that and the sink
>>>  - You expect to have about one output every minute per key+window pair
>>>
>>> Any output of the upstream aggregation may contribute to any key+window
>>> of the downstream aggregation. The AfterSynchronizedProcessingTime trigger
>>> waits for all the processing time based triggers to fire and commit their
>>> outputs. The downstream aggregation will output as fast as possible in
>>> panes consistent with the upstream aggregation.
>>>
>>>  - The Java SDK behavior is as above, to output "as fast as reasonable".
>>>  - The Python SDK never uses "AfterSynchronizedProcessingTime" triggers
>>> but simply propagates the same trigger to the next GBK, creating additional
>>> delay.
>>>  - I don't know what the Go SDK may do, if it supports this at all.
>>>
>>> Any behavior could be defined as "correct". A simple option could be to
>>> have the downstream aggregation "fire always" aka "after element count 1".
>>> How would this change things? We would potentially see many more outputs.
>>>
>>> Why did we do this in the first place? There are (at least) these
>>> reasons:
>>>
>>>  - Previously, triggers could "finish" an aggregation thus dropping all
>>> further data. In this case, waiting for all outputs is critical or else you
>>> lose data. Now triggers cannot finish aggregations.
>>>  - Whenever there may be more than one pane, a user has to write logic
>>> to compensate and deal with it. Changing from guaranteed single pane to
>>> multi-pane would break things. So if the user configures a single firing,
>>> all downstream aggregations must respect it. Now that triggers cannot
>>> finish, I think processing time can only be used in multi-pane contexts
>>> anyhow.
>>>  - The above example illustrates how the behavior in Java maintains
>>> something that the user will expect. Or so we think. Maybe users don't care.
>>>
>>> How did we get into this inconsistent state? When the user specifies
>>> triggering it applies to the very nearest aggregation/GBK. The SDK decides
>>> what triggering to insert downstream. One possibility is to remove this and
>>> have it unspecified, left to runner behavior.
>>>
>>> I think maybe these pieces of complexity are both not helpful and also
>>> not (necessarily) breaking changes to alter, especially considering we have
>>> inconsistency in the model.
>>>
>>> WDYT? And I wonder what this means for xlang and portability... how does
>>> continuation triggering even work? (if at all)
>>>
>>> Kenn
>>>
>>

Reply via email to