On Tue, Feb 23, 2021 at 1:07 AM Jan Lukavský <je...@seznam.cz> wrote:

> First, +1 to the conclusion of this thread.
>
> One note regarding the composite transforms and triggers *inside* those
> transforms - I think that propagating the triggering from input PCollection
> might be even dangerous and composite PTransforms that would be sensitive
> to the change of triggering will (should!) override the input triggering,
> and therefore adjusting it upfront will not work. There is clear option for
> composite PTransform (which includes one or more GBKs) to create API to
> specify the _input_ triggering of the composite as a whole, i.e.
>
>  input.apply(MyComposite.create().triggering())
>
> which (consistently with how triggering works for pure GBK) would change
> the input triggering (if we define trigger as "buffer input in state, flush
> buffer when trigger fires") of the PTransform. The PTransform knows how it
> expands and so it is quite easy to do the output triggering correctly.
>
When we originally explored this (for windowing, before triggering existed)
we looked at the number of composite operations (combining, joining, cogbk,
...) that contained GBKs and realized it would add a lot of boilerplate to
manually pass through the windowing information to each. Worse, this is a
burden placed on every author of a composite operation (and omitting this
argument, or hard coding a default, would be strictly worse). Triggering
doesn't flow as nicely, but requiring it on every subtransform invocation
during pipeline construction would have the same downsides of verbosity.

> Regarding the sink triggering - out of curiosity, how does that differ
> from applying the triggering on the very first GBK(s) and the subsequently
> trigger all downstream GBKs using AfterPane.elementCountAtLeast(1)? It
> seems to me, that from user perspective what I will want to define is not
> "how often output should be written", but "how quickly output should react
> to the change of input" - therefore I *must* trigger with at least this
> frequency from the source and then propagate each pane as quickly as
> possible to the output. Am I missing something?
>
Here "sink" is really any observable outside effect, so I think "how often
output should be written" and "how quickly output should react to the
change of input" are the same.

As an example, if I want, say, hourly output, triggering hourly at the
source and then as quickly as possible from then on may be wasteful. It may
also be desirable to arrange such that certain transforms only have a
single pane per window, which is easier to propagate up than down. As
another example, consider accumulating vs. discarding. If I have
CombineValues(sum) followed by a re-keying and another CombineValues(sum),
and I want the final output to be accumulating, the first must be
discarding (or, better, retractions). Propagating upwards is possible in a
way propagating downward is not.



>
>  Jan
>
>
> On 2/22/21 9:53 PM, Reuven Lax wrote:
>
> I really wish that we had found the time to build sink triggers. Jan is
> right - specifying triggers up front and having them propagate down is
> confusing (it's also a bit confusing for Windows, but with Windows the
> propagation at least makes sense). The fact that users rarely have access
> to the actual GBK operation means that allowing them to specify triggers on
> their sinks is the best approach.
>
> On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> I agree completely: Triggers control the output of the GBK.
>>>
>>> The issue is composite transforms, where there will be a GBK deep inside
>>> some code and the user cannot adjust the triggering.
>>>
>>> What a user really wants is "sink triggers
>>> <https://s.apache.org/beam-sink-triggers>" [1], a purely hypothetical
>>> feature where they specify the latency requirements on each _output_ and
>>> everything else is figured out automatically. Unfortunately, sink triggers
>>> require retractions, so each PCollection can be a complete changelog.
>>> Otherwise transformations cannot be transparently correct throughout a
>>> pipeline and triggers cannot be decoupled from pipeline logic. Retractions
>>> themselves are not necessarily complex in some cases (Flink SQL has them -
>>> they are extra easy for "pure" code) but require a massive working of the
>>> library of transforms, particularly IOs. And backwards compatibility
>>> concerns for existing DoFns are somewhat tricky. We've had two prototypes
>>> [2] [3] and some important design investigations [4], but no time to really
>>> finish adding them, even as just an optional experiment. And once we have
>>> retractions, there is still a lot to figure out to finish sink triggers.
>>> They may not even really be possible!
>>>
>>> So for now, we do our best with the user setting up triggering at the
>>> beginning of the pipeline instead of the end of the pipeline. The very
>>> first GBK (which may be deep in library code) is controlled by the
>>> triggering they set up and all the rest get the "continuation trigger"
>>> which tries to just let the data flow. Unless they set up another bit of
>>> triggering. Some of our transforms do this for various reasons.
>>>
>>> I think the conclusion of this particular thread is:
>>>
>>>  - make all the SDKs use AfterSynchronizedProcessingTime triggers
>>>  - allow runners to do whatever they want when they see
>>> AfterSynchronizedProcessingTime trigger
>>>  - remove TimeDomain.afterSynchronizedProcessingTime from the proto
>>> since it is only for timers and they should not use this
>>>  - later, figure out if we want to add support for making downstream
>>> triggering optional (could be useful prep for sink triggers)
>>>
>>
>> +1
>>
>>
>>> [1] https://s.apache.org/beam-sink-triggers
>>> [2] https://github.com/apache/beam/pull/4742
>>> [3] https://github.com/apache/beam/pull/9199
>>> [4] https://s.apache.org/beam-retractions
>>>
>>> On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> The same holds true for pane accumulation mode.
>>>>
>>>>  Jan
>>>> On 2/22/21 10:21 AM, Jan Lukavský wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm not sure if I got everything from this thread right, but from my
>>>> point of view, triggers are property of GBK. They are property of neither
>>>> windowing, nor PCollection, but relate solely to GBK. This can be seen from
>>>> the fact, that unlike windowFn, triggers are completely ignored in stateful
>>>> ParDo (there is no semantics for them, which is fine). It would be cool if
>>>> the model could be adjusted for that - this would actually mean, that the
>>>> correct place, where to specify triggering is not Window PTransform, but
>>>> the GBK, i.e.
>>>>
>>>>  input.apply(GroupByKey.create().triggering(...))
>>>>
>>>> That would imply we simply have default trigger for all GBKs, unless
>>>> explicitly changed, but for that particular instance only. I'm not sure
>>>> what the impacts on pipeline compatibility would be, though.
>>>>
>>>>  Jan
>>>> On 2/19/21 12:09 AM, Robert Bradshaw wrote:
>>>>
>>>> On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>> On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I would prefer to leave downstream triggering up to the runner (or,
>>>>>> better, leave upstream triggering up to the runner, a la sink triggers),
>>>>>> but one problem is that without an explicit 
>>>>>> AfterSynchronizedProcessingTime
>>>>>> one can't tell if the downstream ProcessingTime between two groupings is
>>>>>> due to an explicit re-triggering between them or inherited from one to 
>>>>>> the
>>>>>> other.
>>>>>>
>>>>>
>>>>> I mean to propose that there should be no triggering specified unless
>>>>> due to explicit re-triggering.
>>>>>
>>>>
>>>> You're saying that we leave the trigger (and perhaps other) fields of
>>>> the WindowingStrategy attached to PCollections downstream the first GBK
>>>> unset in the proto? And let runners walk over the graph to infer it? I
>>>> could be OK with making this legal, though updating all SDKs and Runners to
>>>> handle this doesn't seem high priority at the moment.
>>>>
>>>>
>>>>>
>>>>> (and BTW yes I agree about sink triggers, but we need retractions and
>>>>> probably some theoretical work before we can aim for that)
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>> On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Just for the thread I want to comment on another, more drastic
>>>>>>> approach: eliminate continuation triggers from the model, leaving
>>>>>>> downstream triggering up to a runner. This approach is not viable 
>>>>>>> because
>>>>>>> transforms may need to change their behavior based on whether or not a
>>>>>>> trigger will fire more than once. Transforms can and do inspect the
>>>>>>> windowing strategy to do things differently.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'll say that synchronized processing time has confused users
>>>>>>>> before. Users sometimes use processing-time triggers to optimize 
>>>>>>>> latency,
>>>>>>>> banking that that will decouple stage latency from the long-tail 
>>>>>>>> latency of
>>>>>>>> previous stages. However continuation triggers silently switching to
>>>>>>>> synchronized processing time has defeated that, and it wasn't clear to
>>>>>>>> users why.
>>>>>>>>
>>>>>>>> On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles <k...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 11, 2021 at 9:38 PM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Of course the right answer is to just implement sink triggers
>>>>>>>>>>> and sidestep the question altogether :).
>>>>>>>>>>>
>>>>>>>>>>> In the meantime, I think leaving AfterSynchronizedProcessingTime
>>>>>>>>>>> in the model makes the most sense, and runners can choose an 
>>>>>>>>>>> implementation
>>>>>>>>>>> between firing eagerly and waiting some amount of time until they 
>>>>>>>>>>> think all
>>>>>>>>>>> (most?) downstream results are in before firing, depending on how 
>>>>>>>>>>> smart the
>>>>>>>>>>> runner wants to be. As you point out, they're all correct, and 
>>>>>>>>>>> we'll have
>>>>>>>>>>> multiple firings due to the upstream trigger anyway, and this is 
>>>>>>>>>>> safer than
>>>>>>>>>>> it used to be (though still possibly requires work).
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Just to clarify, as I got a little confused, is your suggestion:
>>>>>>>>>> Leave AfterSynchronizedProcessingTime* triggers in the model/proto, 
>>>>>>>>>> let the
>>>>>>>>>> SDK put them in where they want, and let runners decide how to 
>>>>>>>>>> interpret
>>>>>>>>>> them? (this SGTM and requires the least/no changes)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yep. We may want to update Python/Go to produce
>>>>>>>>> AfterSynchronizedProcessingTime downstream of ProcessingTime triggers 
>>>>>>>>> too,
>>>>>>>>> eventually, to better express intent.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> *noting that TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not
>>>>>>>>>> related to this, except in implementation, and should be removed 
>>>>>>>>>> either way.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 10, 2021 at 1:37 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> TL;DR:
>>>>>>>>>>>> 1. should we replace "after synchronized processing time" with
>>>>>>>>>>>> "after count 1"?
>>>>>>>>>>>> 2. should we remove "continuation trigger" and leave this to
>>>>>>>>>>>> runners?
>>>>>>>>>>>>
>>>>>>>>>>>> ----
>>>>>>>>>>>>
>>>>>>>>>>>> "AfterSynchronizedProcessingTime" triggers were invented to
>>>>>>>>>>>> solve a specific problem. They are inconsistent across SDKs today.
>>>>>>>>>>>>
>>>>>>>>>>>>  - You have an aggregation/GBK with aligned processing time
>>>>>>>>>>>> trigger like ("output every minute on the minute")
>>>>>>>>>>>>  - You have a downstream aggregation/GBK between that and the
>>>>>>>>>>>> sink
>>>>>>>>>>>>  - You expect to have about one output every minute per
>>>>>>>>>>>> key+window pair
>>>>>>>>>>>>
>>>>>>>>>>>> Any output of the upstream aggregation may contribute to any
>>>>>>>>>>>> key+window of the downstream aggregation. The
>>>>>>>>>>>> AfterSynchronizedProcessingTime trigger waits for all the 
>>>>>>>>>>>> processing time
>>>>>>>>>>>> based triggers to fire and commit their outputs. The downstream 
>>>>>>>>>>>> aggregation
>>>>>>>>>>>> will output as fast as possible in panes consistent with the 
>>>>>>>>>>>> upstream
>>>>>>>>>>>> aggregation.
>>>>>>>>>>>>
>>>>>>>>>>>>  - The Java SDK behavior is as above, to output "as fast as
>>>>>>>>>>>> reasonable".
>>>>>>>>>>>>  - The Python SDK never uses "AfterSynchronizedProcessingTime"
>>>>>>>>>>>> triggers but simply propagates the same trigger to the next GBK, 
>>>>>>>>>>>> creating
>>>>>>>>>>>> additional delay.
>>>>>>>>>>>>  - I don't know what the Go SDK may do, if it supports this at
>>>>>>>>>>>> all.
>>>>>>>>>>>>
>>>>>>>>>>>> Any behavior could be defined as "correct". A simple option
>>>>>>>>>>>> could be to have the downstream aggregation "fire always" aka 
>>>>>>>>>>>> "after
>>>>>>>>>>>> element count 1". How would this change things? We would 
>>>>>>>>>>>> potentially see
>>>>>>>>>>>> many more outputs.
>>>>>>>>>>>>
>>>>>>>>>>>> Why did we do this in the first place? There are (at least)
>>>>>>>>>>>> these reasons:
>>>>>>>>>>>>
>>>>>>>>>>>>  - Previously, triggers could "finish" an aggregation thus
>>>>>>>>>>>> dropping all further data. In this case, waiting for all outputs is
>>>>>>>>>>>> critical or else you lose data. Now triggers cannot finish 
>>>>>>>>>>>> aggregations.
>>>>>>>>>>>>  - Whenever there may be more than one pane, a user has to
>>>>>>>>>>>> write logic to compensate and deal with it. Changing from 
>>>>>>>>>>>> guaranteed single
>>>>>>>>>>>> pane to multi-pane would break things. So if the user configures a 
>>>>>>>>>>>> single
>>>>>>>>>>>> firing, all downstream aggregations must respect it. Now that 
>>>>>>>>>>>> triggers
>>>>>>>>>>>> cannot finish, I think processing time can only be used in 
>>>>>>>>>>>> multi-pane
>>>>>>>>>>>> contexts anyhow.
>>>>>>>>>>>>  - The above example illustrates how the behavior in Java
>>>>>>>>>>>> maintains something that the user will expect. Or so we think. 
>>>>>>>>>>>> Maybe users
>>>>>>>>>>>> don't care.
>>>>>>>>>>>>
>>>>>>>>>>>> How did we get into this inconsistent state? When the user
>>>>>>>>>>>> specifies triggering it applies to the very nearest 
>>>>>>>>>>>> aggregation/GBK. The
>>>>>>>>>>>> SDK decides what triggering to insert downstream. One possibility 
>>>>>>>>>>>> is to
>>>>>>>>>>>> remove this and have it unspecified, left to runner behavior.
>>>>>>>>>>>>
>>>>>>>>>>>> I think maybe these pieces of complexity are both not helpful
>>>>>>>>>>>> and also not (necessarily) breaking changes to alter, especially
>>>>>>>>>>>> considering we have inconsistency in the model.
>>>>>>>>>>>>
>>>>>>>>>>>> WDYT? And I wonder what this means for xlang and portability...
>>>>>>>>>>>> how does continuation triggering even work? (if at all)
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to