On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org> wrote:

>
> On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> I would prefer to leave downstream triggering up to the runner (or,
>> better, leave upstream triggering up to the runner, a la sink triggers),
>> but one problem is that without an explicit AfterSynchronizedProcessingTime
>> one can't tell if the downstream ProcessingTime between two groupings is
>> due to an explicit re-triggering between them or inherited from one to the
>> other.
>>
>
> I mean to propose that there should be no triggering specified unless due
> to explicit re-triggering.
>

You're saying that we leave the trigger (and perhaps other) fields of the
WindowingStrategy attached to PCollections downstream the first GBK unset
in the proto? And let runners walk over the graph to infer it? I could be
OK with making this legal, though updating all SDKs and Runners to handle
this doesn't seem high priority at the moment.


>
> (and BTW yes I agree about sink triggers, but we need retractions and
> probably some theoretical work before we can aim for that)
>
> Kenn
>
>
>> On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Just for the thread I want to comment on another, more drastic approach:
>>> eliminate continuation triggers from the model, leaving downstream
>>> triggering up to a runner. This approach is not viable because transforms
>>> may need to change their behavior based on whether or not a trigger will
>>> fire more than once. Transforms can and do inspect the windowing strategy
>>> to do things differently.
>>>
>>> Kenn
>>>
>>> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'll say that synchronized processing time has confused users before.
>>>> Users sometimes use processing-time triggers to optimize latency, banking
>>>> that that will decouple stage latency from the long-tail latency of
>>>> previous stages. However continuation triggers silently switching to
>>>> synchronized processing time has defeated that, and it wasn't clear to
>>>> users why.
>>>>
>>>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Of course the right answer is to just implement sink triggers and
>>>>>>> sidestep the question altogether :).
>>>>>>>
>>>>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime in
>>>>>>> the model makes the most sense, and runners can choose an implementation
>>>>>>> between firing eagerly and waiting some amount of time until they think 
>>>>>>> all
>>>>>>> (most?) downstream results are in before firing, depending on how smart 
>>>>>>> the
>>>>>>> runner wants to be. As you point out, they're all correct, and we'll 
>>>>>>> have
>>>>>>> multiple firings due to the upstream trigger anyway, and this is safer 
>>>>>>> than
>>>>>>> it used to be (though still possibly requires work).
>>>>>>>
>>>>>>
>>>>>> Just to clarify, as I got a little confused, is your suggestion:
>>>>>> Leave AfterSynchronizedProcessingTime* triggers in the model/proto, let 
>>>>>> the
>>>>>> SDK put them in where they want, and let runners decide how to interpret
>>>>>> them? (this SGTM and requires the least/no changes)
>>>>>>
>>>>>
>>>>> Yep. We may want to update Python/Go to produce
>>>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers too,
>>>>> eventually, to better express intent.
>>>>>
>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not related
>>>>>> to this, except in implementation, and should be removed either way.
>>>>>>
>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> TL;DR:
>>>>>>>> 1. should we replace "after synchronized processing time" with
>>>>>>>> "after count 1"?
>>>>>>>> 2. should we remove "continuation trigger" and leave this to
>>>>>>>> runners?
>>>>>>>>
>>>>>>>> ----
>>>>>>>>
>>>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to solve a
>>>>>>>> specific problem. They are inconsistent across SDKs today.
>>>>>>>>
>>>>>>>>  - You have an aggregation/GBK with aligned processing time trigger
>>>>>>>> like ("output every minute on the minute")
>>>>>>>>  - You have a downstream aggregation/GBK between that and the sink
>>>>>>>>  - You expect to have about one output every minute per key+window
>>>>>>>> pair
>>>>>>>>
>>>>>>>> Any output of the upstream aggregation may contribute to any
>>>>>>>> key+window of the downstream aggregation. The
>>>>>>>> AfterSynchronizedProcessingTime trigger waits for all the processing 
>>>>>>>> time
>>>>>>>> based triggers to fire and commit their outputs. The downstream 
>>>>>>>> aggregation
>>>>>>>> will output as fast as possible in panes consistent with the upstream
>>>>>>>> aggregation.
>>>>>>>>
>>>>>>>>  - The Java SDK behavior is as above, to output "as fast as
>>>>>>>> reasonable".
>>>>>>>>  - The Python SDK never uses "AfterSynchronizedProcessingTime"
>>>>>>>> triggers but simply propagates the same trigger to the next GBK, 
>>>>>>>> creating
>>>>>>>> additional delay.
>>>>>>>>  - I don't know what the Go SDK may do, if it supports this at all.
>>>>>>>>
>>>>>>>> Any behavior could be defined as "correct". A simple option could
>>>>>>>> be to have the downstream aggregation "fire always" aka "after element
>>>>>>>> count 1". How would this change things? We would potentially see many 
>>>>>>>> more
>>>>>>>> outputs.
>>>>>>>>
>>>>>>>> Why did we do this in the first place? There are (at least) these
>>>>>>>> reasons:
>>>>>>>>
>>>>>>>>  - Previously, triggers could "finish" an aggregation thus dropping
>>>>>>>> all further data. In this case, waiting for all outputs is critical or 
>>>>>>>> else
>>>>>>>> you lose data. Now triggers cannot finish aggregations.
>>>>>>>>  - Whenever there may be more than one pane, a user has to write
>>>>>>>> logic to compensate and deal with it. Changing from guaranteed single 
>>>>>>>> pane
>>>>>>>> to multi-pane would break things. So if the user configures a single
>>>>>>>> firing, all downstream aggregations must respect it. Now that triggers
>>>>>>>> cannot finish, I think processing time can only be used in multi-pane
>>>>>>>> contexts anyhow.
>>>>>>>>  - The above example illustrates how the behavior in Java maintains
>>>>>>>> something that the user will expect. Or so we think. Maybe users don't 
>>>>>>>> care.
>>>>>>>>
>>>>>>>> How did we get into this inconsistent state? When the user
>>>>>>>> specifies triggering it applies to the very nearest aggregation/GBK. 
>>>>>>>> The
>>>>>>>> SDK decides what triggering to insert downstream. One possibility is to
>>>>>>>> remove this and have it unspecified, left to runner behavior.
>>>>>>>>
>>>>>>>> I think maybe these pieces of complexity are both not helpful and
>>>>>>>> also not (necessarily) breaking changes to alter, especially 
>>>>>>>> considering we
>>>>>>>> have inconsistency in the model.
>>>>>>>>
>>>>>>>> WDYT? And I wonder what this means for xlang and portability... how
>>>>>>>> does continuation triggering even work? (if at all)
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>

Reply via email to