Hi Robert,

> Here "sink" is really any observable outside effect, so I think "how often output should be written" and "how quickly output should react to the change of input" are the same.

The difference is in the input trigger - let's imagine, that I have two chained GBKs (A and B). If I trigger A every minute, but B every second, I will output 60 records per minute, but 59 of them will be the same. That's why it seems to me, that meaningful "sink" triggering has to start at the input and then propagate with each pane.

> As an example, if I want, say, hourly output, triggering hourly at the source and then as quickly as possible from then on may be wasteful. It may also be desirable to arrange such that certain transforms only have a single pane per window, which is easier to propagate up than down. As another example, consider accumulating vs. discarding. If I have CombineValues(sum) followed by a re-keying and another CombineValues(sum), and I want the final output to be accumulating, the first must be discarding (or, better, retractions). Propagating upwards is possible in a way propagating downward is not.

I'm not sure I understand this. If I want hourly output, I cannot trigger source with lower frequency. If I trigger source with hourly, but do not propagate this as fast as possible, I'm inevitably introducing additional latency (that's the definition of "not as fast as possible") in downstream processing. Therefore the final triggering cannot be "hourly output" at least not with regard to the rate of change in inputs.

On 2/23/21 5:47 PM, Robert Bradshaw wrote:
On Tue, Feb 23, 2021 at 1:07 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    First, +1 to the conclusion of this thread.

    One note regarding the composite transforms and triggers *inside*
    those transforms - I think that propagating the triggering from
    input PCollection might be even dangerous and composite
    PTransforms that would be sensitive to the change of triggering
    will (should!) override the input triggering, and therefore
    adjusting it upfront will not work. There is clear option for
    composite PTransform (which includes one or more GBKs) to create
    API to specify the _input_ triggering of the composite as a whole,
    i.e.

     input.apply(MyComposite.create().triggering())

    which (consistently with how triggering works for pure GBK) would
    change the input triggering (if we define trigger as "buffer input
    in state, flush buffer when trigger fires") of the PTransform. The
    PTransform knows how it expands and so it is quite easy to do the
    output triggering correctly.

When we originally explored this (for windowing, before triggering existed) we looked at the number of composite operations (combining, joining, cogbk, ...) that contained GBKs and realized it would add a lot of boilerplate to manually pass through the windowing information to each. Worse, this is a burden placed on every author of a composite operation (and omitting this argument, or hard coding a default, would be strictly worse). Triggering doesn't flow as nicely, but requiring it on every subtransform invocation during pipeline construction would have the same downsides of verbosity.

    Regarding the sink triggering - out of curiosity, how does that
    differ from applying the triggering on the very first GBK(s) and
    the subsequently trigger all downstream GBKs using
    AfterPane.elementCountAtLeast(1)? It seems to me, that from user
    perspective what I will want to define is not "how often output
    should be written", but "how quickly output should react to the
    change of input" - therefore I *must* trigger with at least this
    frequency from the source and then propagate each pane as quickly
    as possible to the output. Am I missing something?

Here "sink" is really any observable outside effect, so I think "how often output should be written" and "how quickly output should react to the change of input" are the same.

As an example, if I want, say, hourly output, triggering hourly at the source and then as quickly as possible from then on may be wasteful. It may also be desirable to arrange such that certain transforms only have a single pane per window, which is easier to propagate up than down. As another example, consider accumulating vs. discarding. If I have CombineValues(sum) followed by a re-keying and another CombineValues(sum), and I want the final output to be accumulating, the first must be discarding (or, better, retractions). Propagating upwards is possible in a way propagating downward is not.


     Jan


    On 2/22/21 9:53 PM, Reuven Lax wrote:
    I really wish that we had found the time to build sink triggers.
    Jan is right - specifying triggers up front and having them
    propagate down is confusing (it's also a bit confusing for
    Windows, but with Windows the propagation at least makes sense).
    The fact that users rarely have access to the actual GBK
    operation means that allowing them to specify triggers on their
    sinks is the best approach.

    On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw
    <rober...@google.com <mailto:rober...@google.com>> wrote:

        On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles
        <k...@apache.org <mailto:k...@apache.org>> wrote:

            I agree completely: Triggers control the output of the GBK.

            The issue is composite transforms, where there will be a
            GBK deep inside some code and the user cannot adjust the
            triggering.

            What a user really wants is "sink triggers
            <https://s.apache.org/beam-sink-triggers>" [1], a purely
            hypothetical feature where they specify the latency
            requirements on each _output_ and everything else is
            figured out automatically. Unfortunately, sink triggers
            require retractions, so each PCollection can be a
            complete changelog. Otherwise transformations cannot be
            transparently correct throughout a pipeline and triggers
            cannot be decoupled from pipeline logic. Retractions
            themselves are not necessarily complex in some cases
            (Flink SQL has them - they are extra easy for "pure"
            code) but require a massive working of the library of
            transforms, particularly IOs. And backwards compatibility
            concerns for existing DoFns are somewhat tricky. We've
            had two prototypes [2] [3] and some important design
            investigations [4], but no time to really finish adding
            them, even as just an optional experiment. And once we
            have retractions, there is still a lot to figure out to
            finish sink triggers. They may not even really be possible!

            So for now, we do our best with the user setting up
            triggering at the beginning of the pipeline instead of
            the end of the pipeline. The very first GBK (which may be
            deep in library code) is controlled by the triggering
            they set up and all the rest get the "continuation
            trigger" which tries to just let the data flow. Unless
            they set up another bit of triggering. Some of our
            transforms do this for various reasons.

            I think the conclusion of this particular thread is:

             - make all the SDKs use AfterSynchronizedProcessingTime
            triggers
             - allow runners to do whatever they want when they see
            AfterSynchronizedProcessingTime trigger
             - remove TimeDomain.afterSynchronizedProcessingTime from
            the proto since it is only for timers and they should not
            use this
             - later, figure out if we want to add support for making
            downstream triggering optional (could be useful prep for
            sink triggers)


        +1

            [1] https://s.apache.org/beam-sink-triggers
            [2] https://github.com/apache/beam/pull/4742
            [3] https://github.com/apache/beam/pull/9199
            [4] https://s.apache.org/beam-retractions

            On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                The same holds true for pane accumulation mode.

                 Jan

                On 2/22/21 10:21 AM, Jan Lukavský wrote:

                Hi,

                I'm not sure if I got everything from this thread
                right, but from my point of view, triggers are
                property of GBK. They are property of neither
                windowing, nor PCollection, but relate solely to
                GBK. This can be seen from the fact, that unlike
                windowFn, triggers are completely ignored in
                stateful ParDo (there is no semantics for them,
                which is fine). It would be cool if the model could
                be adjusted for that - this would actually mean,
                that the correct place, where to specify triggering
                is not Window PTransform, but the GBK, i.e.

                 input.apply(GroupByKey.create().triggering(...))

                That would imply we simply have default trigger for
                all GBKs, unless explicitly changed, but for that
                particular instance only. I'm not sure what the
                impacts on pipeline compatibility would be, though.

                 Jan

                On 2/19/21 12:09 AM, Robert Bradshaw wrote:
                On Wed, Feb 17, 2021 at 1:56 PM Kenneth Knowles
                <k...@apache.org <mailto:k...@apache.org>> wrote:


                    On Wed, Feb 17, 2021 at 1:06 PM Robert Bradshaw
                    <rober...@google.com
                    <mailto:rober...@google.com>> wrote:

                        I would prefer to leave
                        downstream triggering up to the runner (or,
                        better, leave upstream triggering up to the
                        runner, a la sink triggers), but one
                        problem is that without an
                        explicit AfterSynchronizedProcessingTime
                        one can't tell if the downstream
                        ProcessingTime between two groupings is due
                        to an explicit re-triggering between them
                        or inherited from one to the other.


                    I mean to propose that there should be no
                    triggering specified unless due to explicit
                    re-triggering.


                You're saying that we leave the trigger (and
                perhaps other) fields of the WindowingStrategy
                attached to PCollections downstream the first GBK
                unset in the proto? And let runners walk over the
                graph to infer it? I could be OK with making this
                legal, though updating all SDKs and Runners to
                handle this doesn't seem high priority at the moment.


                    (and BTW yes I agree about sink triggers, but
                    we need retractions and probably some
                    theoretical work before we can aim for that)

                    Kenn

                        On Wed, Feb 17, 2021 at 12:37 PM Kenneth
                        Knowles <k...@apache.org
                        <mailto:k...@apache.org>> wrote:

                            Just for the thread I want to comment
                            on another, more drastic approach:
                            eliminate continuation triggers from
                            the model, leaving downstream
                            triggering up to a runner. This
                            approach is not viable because
                            transforms may need to change their
                            behavior based on whether or not a
                            trigger will fire more than once.
                            Transforms can and do inspect the
                            windowing strategy to do things
                            differently.

                            Kenn

                            On Wed, Feb 17, 2021 at 11:47 AM Reuven
                            Lax <re...@google.com
                            <mailto:re...@google.com>> wrote:

                                I'll say that synchronized
                                processing time has confused users
                                before. Users sometimes use
                                processing-time triggers to
                                optimize latency, banking that that
                                will decouple stage latency from
                                the long-tail latency of previous
                                stages. However continuation
                                triggers silently switching to
                                synchronized processing time has
                                defeated that, and it wasn't clear
                                to users why.

                                On Wed, Feb 17, 2021 at 11:12 AM
                                Robert Bradshaw
                                <rober...@google.com
                                <mailto:rober...@google.com>> wrote:

                                    On Fri, Feb 12, 2021 at 9:09 AM
                                    Kenneth Knowles
                                    <k...@apache.org
                                    <mailto:k...@apache.org>> wrote:


                                        On Thu, Feb 11, 2021 at
                                        9:38 PM Robert Bradshaw
                                        <rober...@google.com
                                        <mailto:rober...@google.com>>
                                        wrote:

                                            Of course the
                                            right answer is to just
                                            implement sink triggers
                                            and sidestep the
                                            question altogether :).

                                            In the meantime, I
                                            think leaving
                                            AfterSynchronizedProcessingTime
                                            in the model makes the
                                            most sense, and runners
                                            can choose an
                                            implementation between
                                            firing eagerly and
                                            waiting some amount of
                                            time until they think
                                            all (most?)
                                            downstream results are
                                            in before firing,
                                            depending on how smart
                                            the runner wants to be.
                                            As you point out,
                                            they're all correct,
                                            and we'll have multiple
                                            firings due to the
                                            upstream trigger
                                            anyway, and this is
                                            safer than it used to
                                            be (though still
                                            possibly requires work).


                                        Just to clarify, as I got a
                                        little confused, is your
                                        suggestion: Leave
                                        AfterSynchronizedProcessingTime*
                                        triggers in the
                                        model/proto, let the SDK
                                        put them in where they
                                        want, and let runners
                                        decide how to interpret
                                        them? (this SGTM and
                                        requires the least/no changes)


                                    Yep. We may want to update
                                    Python/Go to produce
                                    AfterSynchronizedProcessingTime
                                    downstream of ProcessingTime
                                    triggers too, eventually, to
                                    better express intent.

                                        Kenn

                                        *noting that
                                        TimeDomain.SYNCHRONIZED_PROCESSING_TIME
                                        is not related to this,
                                        except in implementation,
                                        and should be removed
                                        either way.

                                            On Wed, Feb 10, 2021 at
                                            1:37 PM Kenneth Knowles
                                            <k...@apache.org
                                            <mailto:k...@apache.org>>
                                            wrote:

                                                Hi all,

                                                TL;DR:
                                                1. should we
                                                replace "after
                                                synchronized
                                                processing time"
                                                with "after count 1"?
                                                2. should we remove
                                                "continuation
                                                trigger" and leave
                                                this to runners?

                                                ----

                                                
"AfterSynchronizedProcessingTime"
                                                triggers were
                                                invented to solve a
                                                specific problem.
                                                They are
                                                inconsistent across
                                                SDKs today.

                                                 - You have an
                                                aggregation/GBK
                                                with aligned
                                                processing time
                                                trigger like
                                                ("output every
                                                minute on the minute")
                                                 - You have a
                                                downstream
                                                aggregation/GBK
                                                between that and
                                                the sink
                                                 - You expect to
                                                have about one
                                                output every minute
                                                per key+window pair

                                                Any output of the
                                                upstream
                                                aggregation may
                                                contribute to any
                                                key+window of the
                                                downstream
                                                aggregation. The
                                                AfterSynchronizedProcessingTime
                                                trigger waits for
                                                all the processing
                                                time based triggers
                                                to fire and commit
                                                their outputs. The
                                                downstream
                                                aggregation will
                                                output as fast as
                                                possible in panes
                                                consistent with the
                                                upstream aggregation.

                                                 - The Java SDK
                                                behavior is as
                                                above, to output
                                                "as fast as
                                                reasonable".
                                                 - The Python SDK
                                                never uses
                                                
"AfterSynchronizedProcessingTime"
                                                triggers but simply
                                                propagates the same
                                                trigger to the next
                                                GBK, creating
                                                additional delay.
                                                 - I don't know
                                                what the Go SDK may
                                                do, if it supports
                                                this at all.

                                                Any behavior could
                                                be defined as
                                                "correct". A simple
                                                option could be to
                                                have the downstream
                                                aggregation "fire
                                                always" aka "after
                                                element count 1".
                                                How would this
                                                change things? We
                                                would potentially
                                                see many more outputs.

                                                Why did we do this
                                                in the first place?
                                                There are (at
                                                least) these reasons:

                                                 - Previously,
                                                triggers could
                                                "finish" an
                                                aggregation thus
                                                dropping all
                                                further data. In
                                                this case, waiting
                                                for all outputs is
                                                critical or else
                                                you lose data. Now
                                                triggers cannot
                                                finish aggregations.
                                                 - Whenever there
                                                may be more than
                                                one pane, a user
                                                has to write logic
                                                to compensate and
                                                deal with it.
                                                Changing from
                                                guaranteed single
                                                pane to multi-pane
                                                would break things.
                                                So if the user
                                                configures a single
                                                firing, all
                                                downstream
                                                aggregations must
                                                respect it. Now
                                                that triggers
                                                cannot finish, I
                                                think processing
                                                time can only be
                                                used in multi-pane
                                                contexts anyhow.
                                                 - The above
                                                example illustrates
                                                how the behavior in
                                                Java maintains
                                                something that the
                                                user will expect.
                                                Or so we think.
                                                Maybe users don't care.

                                                How did we get into
                                                this inconsistent
                                                state? When the
                                                user specifies
                                                triggering it
                                                applies to the very
                                                nearest
                                                aggregation/GBK.
                                                The SDK decides
                                                what triggering to
                                                insert downstream.
                                                One possibility is
                                                to remove this and
                                                have it
                                                unspecified, left
                                                to runner behavior.

                                                I think maybe these
                                                pieces of
                                                complexity are both
                                                not helpful and
                                                also not
                                                (necessarily)
                                                breaking changes to
                                                alter, especially
                                                considering we have
                                                inconsistency in
                                                the model.

                                                WDYT? And I wonder
                                                what this means for
                                                xlang and
                                                portability... how
                                                does continuation
                                                triggering even
                                                work? (if at all)

                                                Kenn

Reply via email to