Hi,

I'm not sure if I got everything from this thread right, but from my point of view, triggers are property of GBK. They are property of neither windowing, nor PCollection, but relate solely to GBK. This can be seen from the fact, that unlike windowFn, triggers are completely ignored in stateful ParDo (there is no semantics for them, which is fine). It would be cool if the model could be adjusted for that - this would actually mean, that the correct place, where to specify triggering is not Window PTransform, but the GBK, i.e.

 input.apply(GroupByKey.create().triggering(...))

That would imply we simply have default trigger for all GBKs, unless explicitly changed, but for that particular instance only. I'm not sure what the impacts on pipeline compatibility would be, though.

 Jan

On 2/19/21 12:09 AM, Robert Bradshaw wrote:
On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote:


    On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw
    <rober...@google.com <mailto:rober...@google.com>> wrote:

        I would prefer to leave downstream triggering up to the runner
        (or, better, leave upstream triggering up to the runner, a la
        sink triggers), but one problem is that without an
        explicit AfterSynchronizedProcessingTime one can't tell if the
        downstream ProcessingTime between two groupings is due to an
        explicit re-triggering between them or inherited from one to
        the other.


    I mean to propose that there should be no triggering specified
    unless due to explicit re-triggering.


You're saying that we leave the trigger (and perhaps other) fields of the WindowingStrategy attached to PCollections downstream the first GBK unset in the proto? And let runners walk over the graph to infer it? I could be OK with making this legal, though updating all SDKs and Runners to handle this doesn't seem high priority at the moment.


    (and BTW yes I agree about sink triggers, but we need retractions
    and probably some theoretical work before we can aim for that)

    Kenn

        On Wed, Feb 17, 2021 at 12:37 PM Kenneth Knowles
        <k...@apache.org <mailto:k...@apache.org>> wrote:

            Just for the thread I want to comment on another, more
            drastic approach: eliminate continuation triggers from the
            model, leaving downstream triggering up to a runner. This
            approach is not viable because transforms may need to
            change their behavior based on whether or not a trigger
            will fire more than once. Transforms can and do inspect
            the windowing strategy to do things differently.

            Kenn

            On Wed, Feb 17, 2021 at 11:47 AM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:

                I'll say that synchronized processing time has
                confused users before. Users sometimes use
                processing-time triggers to optimize latency, banking
                that that will decouple stage latency from the
                long-tail latency of previous stages. However
                continuation triggers silently switching to
                synchronized processing time has defeated that, and it
                wasn't clear to users why.

                On Wed, Feb 17, 2021 at 11:12 AM Robert Bradshaw
                <rober...@google.com <mailto:rober...@google.com>> wrote:

                    On Fri, Feb 12, 2021 at 9:09 AM Kenneth Knowles
                    <k...@apache.org <mailto:k...@apache.org>> wrote:


                        On Thu, Feb 11, 2021 at 9:38 PM Robert
                        Bradshaw <rober...@google.com
                        <mailto:rober...@google.com>> wrote:

                            Of course the right answer is to just
                            implement sink triggers and sidestep the
                            question altogether :).

                            In the meantime, I think leaving
                            AfterSynchronizedProcessingTime in the
                            model makes the most sense, and runners
                            can choose an implementation between
                            firing eagerly and waiting some amount of
                            time until they think all (most?)
                            downstream results are in before firing,
                            depending on how smart the runner wants to
                            be. As you point out, they're all correct,
                            and we'll have multiple firings due to the
                            upstream trigger anyway, and this is safer
                            than it used to be (though still possibly
                            requires work).


                        Just to clarify, as I got a little confused,
                        is your suggestion: Leave
                        AfterSynchronizedProcessingTime* triggers in
                        the model/proto, let the SDK put them in where
                        they want, and let runners decide how to
                        interpret them? (this SGTM and requires the
                        least/no changes)


                    Yep. We may want to update Python/Go to produce
                    AfterSynchronizedProcessingTime downstream of
                    ProcessingTime triggers too, eventually, to better
                    express intent.

                        Kenn

                        *noting that
                        TimeDomain.SYNCHRONIZED_PROCESSING_TIME is not
                        related to this, except in implementation, and
                        should be removed either way.

                            On Wed, Feb 10, 2021 at 1:37 PM Kenneth
                            Knowles <k...@apache.org
                            <mailto:k...@apache.org>> wrote:

                                Hi all,

                                TL;DR:
                                1. should we replace "after
                                synchronized processing time" with
                                "after count 1"?
                                2. should we remove "continuation
                                trigger" and leave this to runners?

                                ----

                                "AfterSynchronizedProcessingTime"
                                triggers were invented to solve a
                                specific problem. They are
                                inconsistent across SDKs today.

                                 - You have an aggregation/GBK with
                                aligned processing time trigger like
                                ("output every minute on the minute")
                                 - You have a downstream
                                aggregation/GBK between that and the sink
                                 - You expect to have about one output
                                every minute per key+window pair

                                Any output of the upstream aggregation
                                may contribute to any key+window of
                                the downstream aggregation. The
                                AfterSynchronizedProcessingTime
                                trigger waits for all the processing
                                time based triggers to fire and commit
                                their outputs. The downstream
                                aggregation will output as fast as
                                possible in panes consistent with the
                                upstream aggregation.

                                 - The Java SDK behavior is as above,
                                to output "as fast as reasonable".
                                 - The Python SDK never uses
                                "AfterSynchronizedProcessingTime"
                                triggers but simply propagates the
                                same trigger to the next GBK, creating
                                additional delay.
                                 - I don't know what the Go SDK may
                                do, if it supports this at all.

                                Any behavior could be defined as
                                "correct". A simple option could be to
                                have the downstream aggregation "fire
                                always" aka "after element count 1".
                                How would this change things? We would
                                potentially see many more outputs.

                                Why did we do this in the first place?
                                There are (at least) these reasons:

                                 - Previously, triggers could "finish"
                                an aggregation thus dropping all
                                further data. In this case, waiting
                                for all outputs is critical or else
                                you lose data. Now triggers cannot
                                finish aggregations.
                                 - Whenever there may be more than one
                                pane, a user has to write logic to
                                compensate and deal with it. Changing
                                from guaranteed single pane to
                                multi-pane would break things. So if
                                the user configures a single firing,
                                all downstream aggregations must
                                respect it. Now that triggers cannot
                                finish, I think processing time can
                                only be used in multi-pane contexts
                                anyhow.
                                 - The above example illustrates how
                                the behavior in Java maintains
                                something that the user will expect.
                                Or so we think. Maybe users don't care.

                                How did we get into this inconsistent
                                state? When the user specifies
                                triggering it applies to the very
                                nearest aggregation/GBK. The SDK
                                decides what triggering to insert
                                downstream. One possibility is to
                                remove this and have it unspecified,
                                left to runner behavior.

                                I think maybe these pieces of
                                complexity are both not helpful and
                                also not (necessarily) breaking
                                changes to alter, especially
                                considering we have inconsistency in
                                the model.

                                WDYT? And I wonder what this means for
                                xlang and portability... how does
                                continuation triggering even work? (if
                                at all)

                                Kenn

Reply via email to