> Trigger firings can have decreasing event timestamps w/ the minimum timestamp combiner*. I do think the issue at hand is best analyzed in terms of the explicit ordering on panes. And I do think we need to have an explicit guarantee or annotation strong enough to describe a correct-under-all-allowed runners sink. Today an antagonistic runner could probably break a lot of things.

Thanks for this insight. I didn't know about the relation between trigger firing (event) time - which is always non-decreasing - and the resulting timestamp of output pane - which can be affected by timestamp combiner and decrease in cases you describe. What actually correlates with the pane index at all times is processing time of trigger firings with the pane index. Would you say, that if the "annotation that would guarantee ordering of panes" could be viewed as a time ordering annotation with an additional time domain (event time, processing time)? Could then these two be viewed as a single one with some distinguishing parameter?

@RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)

?

Event time should be probably made the default, because that is information that is accessible with every WindowedValue, while pane index is available only after GBK (or generally might be available after every keyed sequential operation, but is missing after source for instance).

Jan

On 11/27/19 1:32 AM, Kenneth Knowles wrote:


On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > I will not try to formalize this notion in this email. But I
    will note that since it is universally assured, it would be zero
    cost and significantly safer to formalize it and add an annotation
    noting it was required. It has nothing to do with event time
    ordering, only trigger firing ordering.

    I cannot agree with the last sentence (and I'm really not doing
    this on purpose :-)). Panes generally arrive out of order, as
    mentioned several times in the discussions linked from this
    thread. If we want to ensure "trigger firing ordering", we can use
    the pane index, that is correct. But - that is actually equivalent
    to sorting by event time, because pane index order will be
    (nearly) the same as event time order. This is due to the fact,
    that pane index and event time correlate (both are monotonic).

Trigger firings can have decreasing event timestamps w/ the minimum timestamp combiner*. I do think the issue at hand is best analyzed in terms of the explicit ordering on panes. And I do think we need to have an explicit guarantee or annotation strong enough to describe a correct-under-all-allowed runners sink. Today an antagonistic runner could probably break a lot of things.

Kenn

*In fact, they can decrease via the "maximum" timestamp combiner because actually timestamp combiners only apply to the elements that particular pane. This is weird, and maybe a design bug, but good to know about.

    The pane index "only" solves the issue of preserving ordering even
    in case where there are multiple firings within the same timestamp
    (regardless of granularity). This was mentioned in the initial
    discussion about event time ordering, and is part of the design
    doc - users should be allowed to provide UDF for extracting
    time-correlated ordering field (which means ability to choose a
    preferred, or authoritative, observer which assigns unambiguous
    ordering to events). Example of this might include Kafka offsets
    as well, or any queue index for that matter. This is not yet
    implemented, but could (should) be in the future.

    The only case where these two things are (somewhat) different is
    the case mentioned by @Steve - if the output is stateless ParDo,
    which will get fused. But that is only because the processing is
    single-threaded per key, and therefore the ordering is implied by
    timer ordering (and careful here, many runners don't have this
    ordering 100% correct, as of now - this problem luckily appears
    only when there are multiple timers per key). Moreover, if there
    should be a failure, then the output might (would) get back in
    time anyway. If there would be a shuffle operation after
    GBK/Combine, then the ordering is no longer guaranteed and must be
    explicitly taken care of.

    Last note, I must agree with @Rui that all these discussions are
    very much related to retractions (precisely the ability to
    implement them).

    Jan

    On 11/26/19 7:34 AM, Kenneth Knowles wrote:
    Hi Aaron,

    Another insightful observation.

    Whenever an aggregation (GBK / Combine per key) has a trigger
    firing, there is a per-key sequence number attached. It is
    included in metadata known as "PaneInfo" [1]. The value of
    PaneInfo.getIndex() is colloquially referred to as the "pane
    index". You can also make use of the "on time index" if you like.
    The best way to access this metadata is to add a parameter of
    type PaneInfo to your DoFn's @ProcessElement method. This works
    for stateful or stateless DoFn.

    Most of Beam's IO connectors do not explicitly enforce that
    outputs occur in pane index order but instead rely on the hope
    that the runner delivers panes in order to the sink. IMO this is
    dangerous but it has not yet caused a known issue. In practice,
    each "input key to output key 'path' " through a pipeline's logic
    does preserve order for all existing runners AFAIK and it is the
    formalization that is missing. It is related to an observation by
    +Rui Wang <mailto:[email protected]> that processing retractions
    requires the same key-to-key ordering.

    I will not try to formalize this notion in this email. But I will
    note that since it is universally assured, it would be zero cost
    and significantly safer to formalize it and add an annotation
    noting it was required. It has nothing to do with event time
    ordering, only trigger firing ordering.

    Kenn

    [1]
    
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
    [2]
    
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557


    On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]
    <mailto:[email protected]>> wrote:

        The blog posts on stateful and timely computation with Beam
        should help clarify a lot about how to use state and timers
        to do this:
        https://beam.apache.org/blog/2017/02/13/stateful-processing.html
        https://beam.apache.org/blog/2017/08/28/timely-processing.html

        You'll see there how there's an implicit per-single-element
        grouping for each key, so state and timers should support
        your use case very well.

        Best
        -P.

        On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
        <[email protected] <mailto:[email protected]>> wrote:

            If you have a pipeline that looks like Input ->
            GroupByKey -> ParDo, while it is not guaranteed, in
            practice the sink will observe the trigger firings in
            order (per key), since it'll be fused to the output of
            the GBK operation (in all runners I know of).

            There have been a couple threads about trigger ordering
            as well on the list recently that might have more
            information:
            
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
            
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E


            On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
            <[email protected] <mailto:[email protected]>> wrote:

                @Jan @Pablo Thank you

                @Pablo In this case it's a single global windowed
                Combine/perKey, triggered per element. Keys are few
                (client accounts) so they can live forever.

                It looks like just by virtue of using a stateful
                ParDo I could get this final execution to be
                "serialized" per key. (Then I could simply do the
                compare-and-swap using Beam's state mechanism to keep
                track of the "latest trigger timestamp" instead of
                having to orchestrate compare-and-swap in the target
                store :thinking:.)



                On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    One addition, to make the list of options
                    exhaustive, there is probably
                    one more option

                      c) create a ParDo keyed by primary key of your
                    sink, cache the last
                    write in there and compare it locally, without
                    the need to query the
                    database

                    It would still need some timer to clear values
                    after watermark + allowed
                    lateness, because otherwise you would have to
                    cache your whole database
                    on workers. But because you don't need actual
                    ordering, you just need
                    the most recent value (if I got it right) this
                    might be an option.

                    Jan

                    On 11/25/19 10:53 PM, Jan Lukavský wrote:
                    > Hi Aaron,
                    >
                    > maybe someone else will give another option,
                    but if I understand
                    > correctly what you want to solve, then you
                    essentially have to do either:
                    >
                    >  a) use the compare & swap mechanism in the
                    sink you described
                    >
                    >  b) use a buffer to buffer elements inside the
                    outputting ParDo and
                    > only output them when watermark passes (using a
                    timer).
                    >
                    > There is actually an ongoing discussion about
                    how to make option b)
                    > user-friendly and part of Beam itself, but
                    currently there is no
                    > out-of-the-box solution for that.
                    >
                    > Jan
                    >
                    > On 11/25/19 10:27 PM, Aaron Dixon wrote:
                    >> Suppose I trigger a Combine per-element (in a
                    high-volume stream) and
                    >> use a ParDo as a sink.
                    >>
                    >> I assume there is no guarantee about the order
                    that my ParDo will see
                    >> these triggers, especially as it processes in
                    parallel, anyway.
                    >>
                    >> That said, my sink writes to a db or cache and
                    I would not like the
                    >> cache to ever regress its value to something
                    "before" what it has
                    >> already written.
                    >>
                    >> Is the best way to solve this problem to
                    always write the event-time
                    >> in the cache and do a compare-and-swap only
                    updating the sink if the
                    >> triggered value in-hand is later than the
                    target value?
                    >>
                    >> Or is there a better way to guarantee that my
                    ParDo sink will process
                    >> elements in-order? (Eg, if I can give up
                    per-event/real-time, then a
                    >> delay-based trigger would probably be
                    sufficient I imagine.)
                    >>
                    >> Thanks for advice!

Reply via email to