Got it. So ProcessContext#ouput has the semantics of "using the window associated with the current @Element, output the data being passed in into that window." Makes complete sense.
On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <[email protected]> wrote: > Interesting discussion. :-) > > Answers inline. > On 3/8/22 22:00, Evan Galpin wrote: > > Thanks Jan for confirming that the fix looks alright. I also found a > PR[1] that appears to be a good case study of the Timer watermark hold > technique that you previously mentioned so I'll study that a bit for my own > understanding and future use. I was also previously missing the notion > that a singular bundle could contain elements from many disparate Windows, > so that's a great upgrade to my own mental model haha. > > I have a few follow-up questions for my own understanding (hopefully not > off topic). > > 1. Am I understanding correctly that using ProcessContext#output to > output a given element will preserve the element's input window(s)? Or is > it the case that when using ProcessContext#output, any buffered elements > will all be output to whatever window the most recent element processed > by @ProcessElement belongs to? > > There is no way Beam can distinguish *how* you produced elements emitted > from @ProcessElement (if those are result of some in-memory buffering or > direct result of computation of the single input element currently being > processed). From that it follows, that window(s) associated with the output > will be equal to window(s) of the input element currently processed > (generally, the model allows multiple applications of window function, > which is why window.maxTimestamp is 1 ms shifted backwards, to be still > part of the window itself, so that the window functions based on timestamps > are idempotent). > > > 1. Is it safe to emit outputs from @FinishBundle to windows which may > be older than the watermark? I believe this could still be happening given > the current ElasticsearchIO implementation where any buffered elements are > output in @FinishBundle using the same window they were associated > with on input. Intuitively, that sounds as though it could be a > correctness bug if the watermark had progressed beyond those windows. > > This is great question that probably deserves deeper attention. > > a) first of all - the fact, that you potentially could output late data, > that previously were not late is _potentially_ a correctness bug, if you > have any downstream logic, that performs any additional grouping and - most > notably - relies on causality being preserved (which is how our universe > works, so this is a natural requirement). Changing element from "on time" > to "late" can result in downstream processing see effect prior to cause (in > event time!, because the watermark move can cause downstream timers to > fire). I wrote something about this in [1]. Generally, if there is no > grouping (or your application logic accounts for the swapping of cause and > effect), then this is not an issue. If you produce a PCollection for user, > then you don't know the user does with it and therefore should pay > attention to it. > > b) second, this effect would not appear if mid-bundle output watermarks > updates were not possible. I emphasize that I don't know if this is allowed > in the model, but I *suppose* it is (@kenn, @reuven or @luke can you > correct me if I'm wrong?). I have some doubts if it is correct, though. It > seems that it causes generally issues with in-memory buffering and > 'outputWithTimestamp' in stateless DoFns. If a DoFn implements @StartBundle > and @FinishBundle it seems, that we should hold output watermark between > these two calls. But that would turn stateless DoFn into stateful, which is > ... unfortunate. :) > > I hope I made things a little more clear rather than more obfuscated. :) > > Jan > > [1] https://twitter.com/janl_apache/status/1478757956263071745 > > > It's definitely interesting to think about the idea of enforcing watermark > update only between bundles, but presumably that would mean quite extensive > alterations. > > - Evan > > [1] https://github.com/apache/beam/pull/15249 > > On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <[email protected]> wrote: > >> Hi Evan, >> >> the fix looks good to me, as long as the timestamp of the buffered data >> need to be preserved downstream. Generally I think it *should* be possible >> to output in-memory buffered data in @ProcessElement (and @FinishBundle), >> the case where you need timers is when your buffer needs to span multiple >> bundles. It that case it also must be stored in regular state and not >> in-memory. >> >> It seems to me that there is some logical gap in how we handle the >> per-bundle buffering. I see two possibilities: >> >> a) either we allow in the model to change the output watermark *while* >> processing a bundle, - in which case it is logical requirement to have >> output timestamp from @ProcessElement no earlier than timestamp of the >> current element (because that way we preserve the "on time", "late" status >> of the current element, we don't swap anything), or >> >> b) we enforce output watermark update only in-between of bundles - in >> that case the requirement could be relaxed that the output timestamp from >> @ProcessElement might be no earlier than the minimum of timestamps inside >> the bundle >> >> I'm afraid that our current position is a). But in that case it is >> somewhat questionable if it is semantically correct to use >> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It can >> move timestamps only to future instant (and inside same window!), which has >> little semantic meaning to me. Looks more error prone than useful. >> >> Jan >> On 3/8/22 15:53, Evan Galpin wrote: >> >> Thanks Jan, it's interesting to read about the handling of timestamp in >> cases employing a buffering pattern. In the case of the ES write >> transform, buffered data could be output from ProcessElement or >> FinishBundle. It's the case where data is output from ProcessElement that >> the error reported at the start of this thread shows up. Based on what you >> described, it sounds like the PR[1] I made to "fix" this issue is actually >> fixing it by transitioning data from being late to being on time and >> outputting all buffered data into a non-deterministic window where the >> output heuristic is later satisfied (number of elements buffered or max >> time since last output). It seems there's 2 issues as a result: >> >> 1. The window to which input elements belong is not being preserved. >> Presumably we want IOs to leave an element's window unaltered? >> 2. The watermark of the system might be incorrect given that buffered >> data is assumed processed and allows the watermark to pass? >> >> It would definitely add complexity to the IO to employ timers to address >> this, but if that's the preferred or only solution I'll put some thought >> into how to implement that solution. >> >> Thanks, >> Evan >> >> [1] https://github.com/apache/beam/pull/16744 >> >> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <[email protected]> wrote: >> >>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update >>> output watermark while a bundle is being processed? That seems it could >>> also cause the "watermark skip" problem, which is definitely an issue (and >>> is probably the reason why the check fails?). >>> On 3/8/22 09:35, Jan Lukavský wrote: >>> >>> The buffering seems incorrect to me. Whenever there is a buffer, we need >>> to make sure we hold the output watermark, otherwise the watermark might >>> "jump over" a buffered element transitioning it from "on-time" to "late", >>> which would be a correctness bug (we can transition elements only from >>> "late" to "on-time", never the other way around). The alternative is to use >>> @FinishBundle to do the flushing, but might not be appropriate here. >>> >>> Currently, the only way to limit the progress of output watermark is by >>> setting a timer with output timestamp that has the timestamp of the >>> earliest element in the buffer. There was a thread that was discussing this >>> in more details [1]. >>> >>> Jan >>> >>> [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8 >>> On 3/7/22 19:54, Evan Galpin wrote: >>> >>> In general, I'd also really like to improve my understanding and learn >>> more about how the employed buffering can cause this skew. Is it because >>> the call to "flush" is happening from a different "current window" than the >>> elements were originally buffered from? I'm actually thinking that the >>> PR[1] to "fix" this would have had the side effect of outputting buffered >>> elements into the window from which "flush" was called rather than the >>> window from which the buffered data originated. I suppose that could be >>> problematic, but should at least satisfy the validation code. >>> >>> [1] https://github.com/apache/beam/pull/16744 >>> >>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <[email protected]> >>> wrote: >>> >>>> x-post from the associated Jira ticket[0] >>>> >>>> >>>> Fortunately/unfortunately this same issue struck me as well, and I >>>> opened a PR[1] to use `ProcessContext#output` rather than >>>> `ProcessContext#outputWithTimestamp`. I believe that should resolve this >>>> issue, it has for me when running jobs with a vendored SDK with that change >>>> included. Do folks feel this change to be cherry-picked into 2.37.0? >>>> >>>> The change also prompted a question to the mailing list[2] about skew >>>> validation difference between ProcessContext vs FinishBundleContext (where >>>> there is no ability to compute skew as I understand it). >>>> >>>> [0] https://issues.apache.org/jira/browse/BEAM-14064 >>>> >>>> [1] https://github.com/apache/beam/pull/16744 >>>> >>>> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp >>>> >>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]> wrote: >>>> >>>>> This is specifically a case where the @ProcessElement saw window X for >>>>> element X0 and buffered it into memory and then when processing window Y >>>>> and element Y0 wanted to flush previously buffered element X0. This all >>>>> occurred as part of the same bundle. >>>>> >>>>> In general, yes, outputting to an earlier window is problematic. >>>>> >>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Outputting to an earlier window is problematic,as the watermark can >>>>>> never be correct if a DoFn can move time backwards arbitrarily. >>>>>> >>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]> wrote: >>>>>> >>>>>>> A good question would be should I be able to output to a different >>>>>>> window from the current @ProcessElement call, like what we can do from >>>>>>> @FinishBundle to handle these buffering scenarios. >>>>>>> >>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]> wrote: >>>>>>> >>>>>>>> The issue is that ElasticsearchIO is collecting results from >>>>>>>> elements in window X and then trying to output them in window Y when >>>>>>>> flushing the batch. This exposed a bug where elements that were being >>>>>>>> buffered were being output as part of a different window than what the >>>>>>>> window that produced them was. >>>>>>>> >>>>>>>> This became visible because validation was added recently to ensure >>>>>>>> that when the pipeline is processing elements in window X that output >>>>>>>> with >>>>>>>> a timestamp is valid for window X. Note that this validation only >>>>>>>> occurs in >>>>>>>> @ProcessElement since output is associated with the current window >>>>>>>> with the >>>>>>>> input element that is being processed. >>>>>>>> >>>>>>>> It is ok to do this in @FinishBundle since there is no existing >>>>>>>> windowing context and when you output that element is assigned to an >>>>>>>> appropriate window. >>>>>>>> >>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>> >>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> I think we’re hitting a regression in ElasticIO batch writing. >>>>>>>>> >>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m >>>>>>>>> reasonably certain it’s this PR >>>>>>>>> https://github.com/apache/beam/pull/15381 >>>>>>>>> >>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to >>>>>>>>> Elastic in a streaming job, the config for the source and sink is >>>>>>>>> respectively >>>>>>>>> >>>>>>>>> pipeline.apply( >>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>>>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>>>>>> .setCoder(KryoCoder.of()) >>>>>>>>> >>>>>>>>> and >>>>>>>>> >>>>>>>>> ElasticsearchIO.write() >>>>>>>>> .withUseStatefulBatches(true) >>>>>>>>> .withMaxParallelRequestsPerWindow(1) >>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>>>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>>>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>>>>>> // # of docs >>>>>>>>> .withMaxBatchSize(1000) >>>>>>>>> .withConnectionConfiguration( >>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>>>>>> arrayOf(host), >>>>>>>>> "fubar", >>>>>>>>> "_doc" >>>>>>>>> ).withConnectTimeout(5000) >>>>>>>>> .withSocketTimeout(30000) >>>>>>>>> ) >>>>>>>>> .withRetryConfiguration( >>>>>>>>> ElasticsearchIO.RetryConfiguration.create( >>>>>>>>> 10, >>>>>>>>> // the duration is wall clock, against the >>>>>>>>> connection and socket timeouts specified >>>>>>>>> // above. I.e., 10 x 30s is gonna be more than 3 >>>>>>>>> minutes, so if we're getting >>>>>>>>> // 10 socket timeouts in a row, this would ignore >>>>>>>>> the "10" part and terminate >>>>>>>>> // after 6. The idea is that in a mixed failure >>>>>>>>> mode, you'd get different timeouts >>>>>>>>> // of different durations, and on average 10 x >>>>>>>>> fails < 4m. >>>>>>>>> // That said, 4m is arbitrary, so adjust as and >>>>>>>>> when needed. >>>>>>>>> Duration.standardMinutes(4) >>>>>>>>> ) >>>>>>>>> ) >>>>>>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>>>>>> .withIndexFn { f: JsonNode -> f["schema_name"].asText() } >>>>>>>>> .withIsDeleteFn { f: JsonNode -> >>>>>>>>> f["_action"].asText("noop") == "delete" } >>>>>>>>> >>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug >>>>>>>>> in the consumer, due to alleged time skew, specifically >>>>>>>>> >>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp >>>>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than >>>>>>>>> the timestamp of the >>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 >>>>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the >>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc >>>>>>>>> for details on changing the allowed skew. >>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>>>>>> >>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version >>>>>>>>> this breaks, and it seems like the code in the trace is largely added >>>>>>>>> by >>>>>>>>> the PR linked above. The error usually claims a skew of a few >>>>>>>>> seconds, but >>>>>>>>> obviously I can’t override getAllowedTimestampSkew() on the >>>>>>>>> internal Elastic DoFn, and it’s marked deprecated anyway. >>>>>>>>> >>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was >>>>>>>>> intending to fix, and additionally, I’d also be happy if someone else >>>>>>>>> can >>>>>>>>> reproduce this or knows of similar reports. I feel like what we’re >>>>>>>>> doing is >>>>>>>>> not *that* uncommon a scenario, so I would have thought someone >>>>>>>>> else would have hit this by now. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Emils >>>>>>>>> >>>>>>>>
