Hi Kenn,

On 12/4/19 5:38 AM, Kenneth Knowles wrote:
Jan - let's try to defrag the threads on your time sorting proposal. This thread may have useful ideas but I want to focus on helping Aaron in this thread. You can link to this thread from other threads or from a design doc. Does this seem OK to you?

sure. :-)

I actually think the best thread to continue the discussion would be [1]. The reason why this discussion probably got fragmented is that the other threads seem to die out without any conclusion. :-(

Jan

[1] https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E


Aaron - do you have the information you need to implement your sink? My impression is that you have quite a good grasp of the issues even before you asked.

Kenn

On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > Trigger firings can have decreasing event timestamps w/ the
    minimum timestamp combiner*. I do think the issue at hand is best
    analyzed in terms of the explicit ordering on panes. And I do
    think we need to have an explicit guarantee or annotation strong
    enough to describe a correct-under-all-allowed runners sink. Today
    an antagonistic runner could probably break a lot of things.

    Thanks for this insight. I didn't know about the relation between
    trigger firing (event) time - which is always non-decreasing - and
    the resulting timestamp of output pane - which can be affected by
    timestamp combiner and decrease in cases you describe. What
    actually correlates with the pane index at all times is processing
    time of trigger firings with the pane index. Would you say, that
    if the "annotation that would guarantee ordering of panes" could
    be viewed as a time ordering annotation with an additional time
    domain (event time, processing time)? Could then these two be
    viewed as a single one with some distinguishing parameter?

    @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)

    ?

    Event time should be probably made the default, because that is
    information that is accessible with every WindowedValue, while
    pane index is available only after GBK (or generally might be
    available after every keyed sequential operation, but is missing
    after source for instance).

    Jan

    On 11/27/19 1:32 AM, Kenneth Knowles wrote:


    On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        > I will not try to formalize this notion in this email. But
        I will note that since it is universally assured, it would be
        zero cost and significantly safer to formalize it and add an
        annotation noting it was required. It has nothing to do with
        event time ordering, only trigger firing ordering.

        I cannot agree with the last sentence (and I'm really not
        doing this on purpose :-)). Panes generally arrive out of
        order, as mentioned several times in the discussions linked
        from this thread. If we want to ensure "trigger firing
        ordering", we can use the pane index, that is correct. But -
        that is actually equivalent to sorting by event time, because
        pane index order will be (nearly) the same as event time
        order. This is due to the fact, that pane index and event
        time correlate (both are monotonic).

    Trigger firings can have decreasing event timestamps w/ the
    minimum timestamp combiner*. I do think the issue at hand is best
    analyzed in terms of the explicit ordering on panes. And I do
    think we need to have an explicit guarantee or annotation strong
    enough to describe a correct-under-all-allowed runners sink.
    Today an antagonistic runner could probably break a lot of things.

    Kenn

    *In fact, they can decrease via the "maximum" timestamp combiner
    because actually timestamp combiners only apply to the elements
    that particular pane. This is weird, and maybe a design bug, but
    good to know about.

        The pane index "only" solves the issue of preserving ordering
        even in case where there are multiple firings within the same
        timestamp (regardless of granularity). This was mentioned in
        the initial discussion about event time ordering, and is part
        of the design doc - users should be allowed to provide UDF
        for extracting time-correlated ordering field (which means
        ability to choose a preferred, or authoritative, observer
        which assigns unambiguous ordering to events). Example of
        this might include Kafka offsets as well, or any queue index
        for that matter. This is not yet implemented, but could
        (should) be in the future.

        The only case where these two things are (somewhat) different
        is the case mentioned by @Steve - if the output is stateless
        ParDo, which will get fused. But that is only because the
        processing is single-threaded per key, and therefore the
        ordering is implied by timer ordering (and careful here, many
        runners don't have this ordering 100% correct, as of now -
        this problem luckily appears only when there are multiple
        timers per key). Moreover, if there should be a failure, then
        the output might (would) get back in time anyway. If there
        would be a shuffle operation after GBK/Combine, then the
        ordering is no longer guaranteed and must be explicitly taken
        care of.

        Last note, I must agree with @Rui that all these discussions
        are very much related to retractions (precisely the ability
        to implement them).

        Jan

        On 11/26/19 7:34 AM, Kenneth Knowles wrote:
        Hi Aaron,

        Another insightful observation.

        Whenever an aggregation (GBK / Combine per key) has a
        trigger firing, there is a per-key sequence number attached.
        It is included in metadata known as "PaneInfo" [1]. The
        value of PaneInfo.getIndex() is colloquially referred to as
        the "pane index". You can also make use of the "on time
        index" if you like. The best way to access this metadata is
        to add a parameter of type PaneInfo to your
        DoFn's @ProcessElement method. This works for stateful or
        stateless DoFn.

        Most of Beam's IO connectors do not explicitly enforce that
        outputs occur in pane index order but instead rely on the
        hope that the runner delivers panes in order to the sink.
        IMO this is dangerous but it has not yet caused a known
        issue. In practice, each "input key to output key 'path' "
        through a pipeline's logic does preserve order for all
        existing runners AFAIK and it is the formalization that is
        missing. It is related to an observation by +Rui Wang
        <mailto:[email protected]> that processing retractions
        requires the same key-to-key ordering.

        I will not try to formalize this notion in this email. But I
        will note that since it is universally assured, it would be
        zero cost and significantly safer to formalize it and add an
        annotation noting it was required. It has nothing to do with
        event time ordering, only trigger firing ordering.

        Kenn

        [1]
        
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
        [2]
        
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557


        On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada
        <[email protected] <mailto:[email protected]>> wrote:

            The blog posts on stateful and timely computation with
            Beam should help clarify a lot about how to use state
            and timers to do this:
            https://beam.apache.org/blog/2017/02/13/stateful-processing.html
            https://beam.apache.org/blog/2017/08/28/timely-processing.html

            You'll see there how there's an implicit
            per-single-element grouping for each key, so state and
            timers should support your use case very well.

            Best
            -P.

            On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
            <[email protected] <mailto:[email protected]>> wrote:

                If you have a pipeline that looks like Input ->
                GroupByKey -> ParDo, while it is not guaranteed, in
                practice the sink will observe the trigger firings
                in order (per key), since it'll be fused to the
                output of the GBK operation (in all runners I know of).

                There have been a couple threads about trigger
                ordering as well on the list recently that might
                have more information:
                
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
                
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E


                On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
                <[email protected] <mailto:[email protected]>> wrote:

                    @Jan @Pablo Thank you

                    @Pablo In this case it's a single global
                    windowed Combine/perKey, triggered per element.
                    Keys are few (client accounts) so they can live
                    forever.

                    It looks like just by virtue of using a stateful
                    ParDo I could get this final execution to be
                    "serialized" per key. (Then I could simply do
                    the compare-and-swap using Beam's state
                    mechanism to keep track of the "latest trigger
                    timestamp" instead of having to orchestrate
                    compare-and-swap in the target store :thinking:.)



                    On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        One addition, to make the list of options
                        exhaustive, there is probably
                        one more option

                          c) create a ParDo keyed by primary key of
                        your sink, cache the last
                        write in there and compare it locally,
                        without the need to query the
                        database

                        It would still need some timer to clear
                        values after watermark + allowed
                        lateness, because otherwise you would have
                        to cache your whole database
                        on workers. But because you don't need
                        actual ordering, you just need
                        the most recent value (if I got it right)
                        this might be an option.

                        Jan

                        On 11/25/19 10:53 PM, Jan Lukavský wrote:
                        > Hi Aaron,
                        >
                        > maybe someone else will give another
                        option, but if I understand
                        > correctly what you want to solve, then you
                        essentially have to do either:
                        >
                        >  a) use the compare & swap mechanism in
                        the sink you described
                        >
                        >  b) use a buffer to buffer elements inside
                        the outputting ParDo and
                        > only output them when watermark passes
                        (using a timer).
                        >
                        > There is actually an ongoing discussion
                        about how to make option b)
                        > user-friendly and part of Beam itself, but
                        currently there is no
                        > out-of-the-box solution for that.
                        >
                        > Jan
                        >
                        > On 11/25/19 10:27 PM, Aaron Dixon wrote:
                        >> Suppose I trigger a Combine per-element
                        (in a high-volume stream) and
                        >> use a ParDo as a sink.
                        >>
                        >> I assume there is no guarantee about the
                        order that my ParDo will see
                        >> these triggers, especially as it
                        processes in parallel, anyway.
                        >>
                        >> That said, my sink writes to a db or
                        cache and I would not like the
                        >> cache to ever regress its value to
                        something "before" what it has
                        >> already written.
                        >>
                        >> Is the best way to solve this problem to
                        always write the event-time
                        >> in the cache and do a compare-and-swap
                        only updating the sink if the
                        >> triggered value in-hand is later than the
                        target value?
                        >>
                        >> Or is there a better way to guarantee
                        that my ParDo sink will process
                        >> elements in-order? (Eg, if I can give up
                        per-event/real-time, then a
                        >> delay-based trigger would probably be
                        sufficient I imagine.)
                        >>
                        >> Thanks for advice!

Reply via email to