Interesting discussion. :-)

Answers inline.

On 3/8/22 22:00, Evan Galpin wrote:
Thanks Jan for confirming that the fix looks alright.  I also found a PR[1] that appears to be a good case study of the Timer watermark hold technique that you previously mentioned so I'll study that a bit for my own understanding and future use.  I was also previously missing the notion that a singular bundle could contain elements from many disparate Windows, so that's a great upgrade to my own mental model haha.

I have a few follow-up questions for my own understanding (hopefully not off topic).

 1. Am I understanding correctly that using ProcessContext#output to
    output a given element will preserve the element's input
    window(s)? Or is it the case that when using
    ProcessContext#output, any buffered elements will all be output to
    whatever window the most recent element processed
    by @ProcessElement belongs to?

There is no way Beam can distinguish *how* you produced elements emitted from @ProcessElement (if those are result of some in-memory buffering or direct result of computation of the single input element currently being processed). From that it follows, that window(s) associated with the output will be equal to window(s) of the input element currently processed (generally, the model allows multiple applications of window function, which is why window.maxTimestamp is 1 ms shifted backwards, to be still part of the window itself, so that the window functions based on timestamps are idempotent).

 1. Is it safe to emit outputs from @FinishBundle to windows which may
    be older than the watermark?  I believe this could still be
    happening given the current ElasticsearchIO implementation where
    any buffered elements are output in @FinishBundle using the same
    window they were associated with on input.  Intuitively, that
    sounds as though it could be a correctness bug if the watermark
    had progressed beyond those windows.

This is great question that probably deserves deeper attention.

 a) first of all - the fact, that you potentially could output late data, that previously were not late is _potentially_ a correctness bug, if you have any downstream logic, that performs any additional grouping and - most notably - relies on causality being preserved (which is how our universe works, so this is a natural requirement). Changing element from "on time" to "late" can result in downstream processing see effect prior to cause (in event time!, because the watermark move can cause downstream timers to fire). I wrote something about this in [1]. Generally, if there is no grouping (or your application logic accounts for the swapping of cause and effect), then this is not an issue. If you produce a PCollection for user, then you don't know the user does with it and therefore should pay attention to it.

 b) second, this effect would not appear if mid-bundle output watermarks updates were not possible. I emphasize that I don't know if this is allowed in the model, but I *suppose* it is (@kenn, @reuven or @luke can you correct me if I'm wrong?). I have some doubts if it is correct, though. It seems that it causes generally issues with in-memory buffering and 'outputWithTimestamp' in stateless DoFns. If a DoFn implements @StartBundle and @FinishBundle it seems, that we should hold output watermark between these two calls. But that would turn stateless DoFn into stateful, which is ... unfortunate. :)

I hope I made things a little more clear rather than more obfuscated. :)

 Jan

[1] https://twitter.com/janl_apache/status/1478757956263071745


It's definitely interesting to think about the idea of enforcing watermark update only between bundles, but presumably that would mean quite extensive alterations.

- Evan

[1] https://github.com/apache/beam/pull/15249

On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi Evan,

    the fix looks good to me, as long as the timestamp of the buffered
    data need to be preserved downstream. Generally I think it
    *should* be possible to output in-memory buffered data in
    @ProcessElement (and @FinishBundle), the case where you need
    timers is when your buffer needs to span multiple bundles. It that
    case it also must be stored in regular state and not in-memory.

    It seems to me that there is some logical gap in how we handle the
    per-bundle buffering. I see two possibilities:

     a) either we allow in the model to change the output watermark
    *while* processing a bundle, - in which case it is logical
    requirement to have output timestamp from @ProcessElement no
    earlier than timestamp of the current element (because that way we
    preserve the "on time", "late" status of the current element, we
    don't swap anything), or

     b) we enforce output watermark update only in-between of bundles
    - in that case the requirement could be relaxed that the output
    timestamp from @ProcessElement might be no earlier than the
    minimum of timestamps inside the bundle

    I'm afraid that our current position is a). But in that case it is
    somewhat questionable if it is semantically correct to use
    outputWithTimestamp() in @ProcessElement of stateless DoFn at all.
    It can move timestamps only to future instant (and inside same
    window!), which has little semantic meaning to me. Looks more
    error prone than useful.

     Jan

    On 3/8/22 15:53, Evan Galpin wrote:
    Thanks Jan, it's interesting to read about the handling of
    timestamp in cases employing a buffering pattern.  In the case of
    the ES write transform, buffered data could be output from
    ProcessElement or FinishBundle.  It's the case where data is
    output from ProcessElement that the error reported at the start
    of this thread shows up.  Based on what you described, it sounds
    like the PR[1] I made to "fix" this issue is actually fixing it
    by transitioning data from being late to being on time and
    outputting all buffered data into a non-deterministic window
    where the output heuristic is later satisfied (number of elements
    buffered or max time since last output).  It seems there's 2
    issues as a result:

    1. The window to which input elements belong is not being
    preserved. Presumably we want IOs to leave an element's window
    unaltered?
    2. The watermark of the system might be incorrect given that
    buffered data is assumed processed and allows the watermark to pass?

    It would definitely add complexity to the IO to employ timers to
    address this, but if that's the preferred or only solution I'll
    put some thought into how to implement that solution.

    Thanks,
    Evan

    [1] https://github.com/apache/beam/pull/16744

    On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote:

        Ah, sorry, the code flushes in @FinishBundle. Is it allowed
        to update output watermark while a bundle is being processed?
        That seems it could also cause the "watermark skip" problem,
        which is definitely an issue (and is probably the reason why
        the check fails?).

        On 3/8/22 09:35, Jan Lukavský wrote:

        The buffering seems incorrect to me. Whenever there is a
        buffer, we need to make sure we hold the output watermark,
        otherwise the watermark might "jump over" a buffered element
        transitioning it from "on-time" to "late", which would be a
        correctness bug (we can transition elements only from "late"
        to "on-time", never the other way around). The alternative
        is to use @FinishBundle to do the flushing, but might not be
        appropriate here.

        Currently, the only way to limit the progress of output
        watermark is by setting a timer with output timestamp that
        has the timestamp of the earliest element in the buffer.
        There was a thread that was discussing this in more details [1].

         Jan

        [1]
        https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8

        On 3/7/22 19:54, Evan Galpin wrote:
        In general, I'd also really like to improve my
        understanding and learn more about how the employed
        buffering can cause this skew.  Is it because the call to
        "flush" is happening from a different "current window" than
        the elements were originally buffered from?  I'm actually
        thinking that the PR[1] to "fix" this would have had the
        side effect of outputting buffered elements into the window
        from which "flush" was called rather than the window from
        which the buffered data originated. I suppose that could be
        problematic, but should at least satisfy the validation code.

        [1] https://github.com/apache/beam/pull/16744


        On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin
        <evan.gal...@gmail.com> wrote:

            x-post from the associated Jira ticket[0]


            Fortunately/unfortunately this same issue struck me as
            well, and I opened a PR[1] to use
            `ProcessContext#output` rather than
            `ProcessContext#outputWithTimestamp`.  I believe that
            should resolve this issue, it has for me when running
            jobs with a vendored SDK with that change included. Do
            folks feel this change to be cherry-picked into 2.37.0?

            The change also prompted a question to the mailing
            list[2] about skew validation difference between
            ProcessContext vs FinishBundleContext (where there is
            no ability to compute skew as I understand it).

            [0] https://issues.apache.org/jira/browse/BEAM-14064

            [1] https://github.com/apache/beam/pull/16744

            [2]
            https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp


            On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik
            <lc...@google.com> wrote:

                This is specifically a case where the
                @ProcessElement saw window X for element X0 and
                buffered it into memory and then when processing
                window Y and element Y0 wanted to flush
                previously buffered element X0. This all occurred
                as part of the same bundle.

                In general, yes, outputting to an earlier window is
                problematic.

                On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax
                <re...@google.com> wrote:

                    Outputting to an earlier window is
                    problematic,as the watermark can never be
                    correct if a DoFn can move time backwards
                    arbitrarily.

                    On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik
                    <lc...@google.com> wrote:

                        A good question would be should I be able
                        to output to a different window from the
                        current @ProcessElement call, like what we
                        can do from @FinishBundle to handle these
                        buffering scenarios.

                        On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik
                        <lc...@google.com> wrote:

                            The issue is that ElasticsearchIO is
                            collecting results from elements in
                            window X and then trying to output them
                            in window Y when flushing the batch.
                            This exposed a bug where elements that
                            were being buffered were being output
                            as part of a different window than what
                            the window that produced them was.

                            This became visible because
                            validation was added recently to ensure
                            that when the pipeline is processing
                            elements in window X that output with a
                            timestamp is valid for window X. Note
                            that this validation only occurs in
                            @ProcessElement since output is
                            associated with the current window with
                            the input element that is being processed.

                            It is ok to do this in @FinishBundle
                            since there is no existing windowing
                            context and when you output that
                            element is assigned to an appropriate
                            window.

                            Filed
                            https://issues.apache.org/jira/browse/BEAM-14064

                            On Mon, Mar 7, 2022 at 7:44 AM Emils
                            Solmanis <emils.solma...@rvu.co.uk> wrote:

                                Hi all,
                                I think we’re hitting a regression
                                in ElasticIO batch writing.

                                We’ve bisected it to being
                                introduced in 2.35.0, and I’m
                                reasonably certain it’s this PR
                                https://github.com/apache/beam/pull/15381

                                Our scenario is pretty trivial, we
                                read off Pubsub and write to
                                Elastic in a streaming job, the
                                config for the source and sink is
                                respectively

                                |pipeline.apply(
                                
PubsubIO.readStrings().fromSubscription(subscription)
                                ).apply(ParseJsons.of(OurObject::class.java))
                                .setCoder(KryoCoder.of()) |

                                and

                                |ElasticsearchIO.write()
                                .withUseStatefulBatches(true)
                                .withMaxParallelRequestsPerWindow(1)
                                
.withMaxBufferingDuration(Duration.standardSeconds(30))
                                // 5 bytes **> KiB **> MiB, so 5
                                MiB .withMaxBatchSizeBytes(5L *
                                1024 * 1024) // # of docs
                                .withMaxBatchSize(1000)
                                .withConnectionConfiguration(
                                ElasticsearchIO.ConnectionConfiguration.create(
                                arrayOf(host), "fubar", "_doc"
                                ).withConnectTimeout(5000)
                                .withSocketTimeout(30000) )
                                .withRetryConfiguration(
                                ElasticsearchIO.RetryConfiguration.create(
                                10, // the duration is wall clock,
                                against the connection and socket
                                timeouts specified // above. I.e.,
                                10 x 30s is gonna be more than 3
                                minutes, so if we're getting // 10
                                socket timeouts in a row, this
                                would ignore the "10" part and
                                terminate // after 6. The idea is
                                that in a mixed failure mode, you'd
                                get different timeouts // of
                                different durations, and on average
                                10 x fails < 4m. // That said, 4m
                                is arbitrary, so adjust as and when
                                needed. Duration.standardMinutes(4)
                                ) ) .withIdFn { f: JsonNode ->
                                f["id"].asText() } .withIndexFn {
                                f: JsonNode ->
                                f["schema_name"].asText() }
                                .withIsDeleteFn { f: JsonNode ->
                                f["_action"].asText("noop") ==
                                "delete" } |

                                We recently tried upgrading 2.33 to
                                2.36 and immediately hit a bug in
                                the consumer, due to alleged time
                                skew, specifically

                                |2022-03-07 10:48:37.886 GMTError
                                message from worker:
                                java.lang.IllegalArgumentException:
                                Cannot output with timestamp
                                2022-03-07T10:43:38.640Z. Output
                                timestamps must be no earlier than
                                the timestamp of the current input
                                (2022-03-07T10:43:43.562Z) minus
                                the allowed skew (0 milliseconds)
                                and no later than
                                294247-01-10T04:00:54.775Z. See the
                                DoFn#getAllowedTimestampSkew()
                                Javadoc for details on changing the
                                allowed skew.
                                
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
                                
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
                                
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
                                
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
                                
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
                                
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
                                |

                                I’ve bisected it and 2.34 works
                                fine, 2.35 is the first version
                                this breaks, and it seems like the
                                code in the trace is largely added
                                by the PR linked above. The error
                                usually claims a skew of a few
                                seconds, but obviously I can’t
                                override
                                |getAllowedTimestampSkew()| on the
                                internal Elastic DoFn, and it’s
                                marked deprecated anyway.

                                I’m happy to raise a JIRA but I’m
                                not 100% sure what the code was
                                intending to fix, and additionally,
                                I’d also be happy if someone else
                                can reproduce this or knows of
                                similar reports. I feel like what
                                we’re doing is not *that* uncommon
                                a scenario, so I would have thought
                                someone else would have hit this by
                                now.

                                Best,
                                Emils

Reply via email to