> I will not try to formalize this notion in this email. But I will note that since it is universally assured, it would be zero cost and significantly safer to formalize it and add an annotation noting it was required. It has nothing to do with event time ordering, only trigger firing ordering.

I cannot agree with the last sentence (and I'm really not doing this on purpose :-)). Panes generally arrive out of order, as mentioned several times in the discussions linked from this thread. If we want to ensure "trigger firing ordering", we can use the pane index, that is correct. But - that is actually equivalent to sorting by event time, because pane index order will be (nearly) the same as event time order. This is due to the fact, that pane index and event time correlate (both are monotonic). The pane index "only" solves the issue of preserving ordering even in case where there are multiple firings within the same timestamp (regardless of granularity). This was mentioned in the initial discussion about event time ordering, and is part of the design doc - users should be allowed to provide UDF for extracting time-correlated ordering field (which means ability to choose a preferred, or authoritative, observer which assigns unambiguous ordering to events). Example of this might include Kafka offsets as well, or any queue index for that matter. This is not yet implemented, but could (should) be in the future.

The only case where these two things are (somewhat) different is the case mentioned by @Steve - if the output is stateless ParDo, which will get fused. But that is only because the processing is single-threaded per key, and therefore the ordering is implied by timer ordering (and careful here, many runners don't have this ordering 100% correct, as of now - this problem luckily appears only when there are multiple timers per key). Moreover, if there should be a failure, then the output might (would) get back in time anyway. If there would be a shuffle operation after GBK/Combine, then the ordering is no longer guaranteed and must be explicitly taken care of.

Last note, I must agree with @Rui that all these discussions are very much related to retractions (precisely the ability to implement them).

Jan

On 11/26/19 7:34 AM, Kenneth Knowles wrote:
Hi Aaron,

Another insightful observation.

Whenever an aggregation (GBK / Combine per key) has a trigger firing, there is a per-key sequence number attached. It is included in metadata known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially referred to as the "pane index". You can also make use of the "on time index" if you like. The best way to access this metadata is to add a parameter of type PaneInfo to your DoFn's @ProcessElement method. This works for stateful or stateless DoFn.

Most of Beam's IO connectors do not explicitly enforce that outputs occur in pane index order but instead rely on the hope that the runner delivers panes in order to the sink. IMO this is dangerous but it has not yet caused a known issue. In practice, each "input key to output key 'path' " through a pipeline's logic does preserve order for all existing runners AFAIK and it is the formalization that is missing. It is related to an observation by +Rui Wang <mailto:ruw...@google.com> that processing retractions requires the same key-to-key ordering.

I will not try to formalize this notion in this email. But I will note that since it is universally assured, it would be zero cost and significantly safer to formalize it and add an annotation noting it was required. It has nothing to do with event time ordering, only trigger firing ordering.

Kenn

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557


On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabl...@google.com <mailto:pabl...@google.com>> wrote:

    The blog posts on stateful and timely computation with Beam should
    help clarify a lot about how to use state and timers to do this:
    https://beam.apache.org/blog/2017/02/13/stateful-processing.html
    https://beam.apache.org/blog/2017/08/28/timely-processing.html

    You'll see there how there's an implicit per-single-element
    grouping for each key, so state and timers should support your use
    case very well.

    Best
    -P.

    On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniem...@apache.org
    <mailto:sniem...@apache.org>> wrote:

        If you have a pipeline that looks like Input -> GroupByKey ->
        ParDo, while it is not guaranteed, in practice the sink will
        observe the trigger firings in order (per key), since it'll be
        fused to the output of the GBK operation (in all runners I
        know of).

        There have been a couple threads about trigger ordering as
        well on the list recently that might have more information:
        
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
        
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E


        On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdi...@gmail.com
        <mailto:atdi...@gmail.com>> wrote:

            @Jan @Pablo Thank you

            @Pablo In this case it's a single global windowed
            Combine/perKey, triggered per element. Keys are few
            (client accounts) so they can live forever.

            It looks like just by virtue of using a stateful ParDo I
            could get this final execution to be "serialized" per key.
            (Then I could simply do the compare-and-swap using Beam's
            state mechanism to keep track of the "latest trigger
            timestamp" instead of having to orchestrate
            compare-and-swap in the target store :thinking:.)



            On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                One addition, to make the list of options exhaustive,
                there is probably
                one more option

                  c) create a ParDo keyed by primary key of your sink,
                cache the last
                write in there and compare it locally, without the
                need to query the
                database

                It would still need some timer to clear values after
                watermark + allowed
                lateness, because otherwise you would have to cache
                your whole database
                on workers. But because you don't need actual
                ordering, you just need
                the most recent value (if I got it right) this might
                be an option.

                Jan

                On 11/25/19 10:53 PM, Jan Lukavský wrote:
                > Hi Aaron,
                >
                > maybe someone else will give another option, but if
                I understand
                > correctly what you want to solve, then you
                essentially have to do either:
                >
                >  a) use the compare & swap mechanism in the sink you
                described
                >
                >  b) use a buffer to buffer elements inside the
                outputting ParDo and
                > only output them when watermark passes (using a timer).
                >
                > There is actually an ongoing discussion about how to
                make option b)
                > user-friendly and part of Beam itself, but currently
                there is no
                > out-of-the-box solution for that.
                >
                > Jan
                >
                > On 11/25/19 10:27 PM, Aaron Dixon wrote:
                >> Suppose I trigger a Combine per-element (in a
                high-volume stream) and
                >> use a ParDo as a sink.
                >>
                >> I assume there is no guarantee about the order that
                my ParDo will see
                >> these triggers, especially as it processes in
                parallel, anyway.
                >>
                >> That said, my sink writes to a db or cache and I
                would not like the
                >> cache to ever regress its value to something
                "before" what it has
                >> already written.
                >>
                >> Is the best way to solve this problem to always
                write the event-time
                >> in the cache and do a compare-and-swap only
                updating the sink if the
                >> triggered value in-hand is later than the target value?
                >>
                >> Or is there a better way to guarantee that my ParDo
                sink will process
                >> elements in-order? (Eg, if I can give up
                per-event/real-time, then a
                >> delay-based trigger would probably be sufficient I
                imagine.)
                >>
                >> Thanks for advice!

Reply via email to