In general, I'd also really like to improve my understanding and learn more about how the employed buffering can cause this skew. Is it because the call to "flush" is happening from a different "current window" than the elements were originally buffered from? I'm actually thinking that the PR[1] to "fix" this would have had the side effect of outputting buffered elements into the window from which "flush" was called rather than the window from which the buffered data originated. I suppose that could be problematic, but should at least satisfy the validation code.
[1] https://github.com/apache/beam/pull/16744 On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <[email protected]> wrote: > x-post from the associated Jira ticket[0] > > > Fortunately/unfortunately this same issue struck me as well, and I opened > a PR[1] to use `ProcessContext#output` rather than > `ProcessContext#outputWithTimestamp`. I believe that should resolve this > issue, it has for me when running jobs with a vendored SDK with that change > included. Do folks feel this change to be cherry-picked into 2.37.0? > > The change also prompted a question to the mailing list[2] about skew > validation difference between ProcessContext vs FinishBundleContext (where > there is no ability to compute skew as I understand it). > > [0] https://issues.apache.org/jira/browse/BEAM-14064 > > [1] https://github.com/apache/beam/pull/16744 > > [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp > > On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]> wrote: > >> This is specifically a case where the @ProcessElement saw window X for >> element X0 and buffered it into memory and then when processing window Y >> and element Y0 wanted to flush previously buffered element X0. This all >> occurred as part of the same bundle. >> >> In general, yes, outputting to an earlier window is problematic. >> >> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]> wrote: >> >>> Outputting to an earlier window is problematic,as the watermark can >>> never be correct if a DoFn can move time backwards arbitrarily. >>> >>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]> wrote: >>> >>>> A good question would be should I be able to output to a different >>>> window from the current @ProcessElement call, like what we can do from >>>> @FinishBundle to handle these buffering scenarios. >>>> >>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]> wrote: >>>> >>>>> The issue is that ElasticsearchIO is collecting results from elements >>>>> in window X and then trying to output them in window Y when flushing the >>>>> batch. This exposed a bug where elements that were being buffered were >>>>> being output as part of a different window than what the window that >>>>> produced them was. >>>>> >>>>> This became visible because validation was added recently to ensure >>>>> that when the pipeline is processing elements in window X that output with >>>>> a timestamp is valid for window X. Note that this validation only occurs >>>>> in >>>>> @ProcessElement since output is associated with the current window with >>>>> the >>>>> input element that is being processed. >>>>> >>>>> It is ok to do this in @FinishBundle since there is no existing >>>>> windowing context and when you output that element is assigned to an >>>>> appropriate window. >>>>> >>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>>> >>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi all, >>>>>> I think we’re hitting a regression in ElasticIO batch writing. >>>>>> >>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably >>>>>> certain it’s this PR https://github.com/apache/beam/pull/15381 >>>>>> >>>>>> Our scenario is pretty trivial, we read off Pubsub and write to >>>>>> Elastic in a streaming job, the config for the source and sink is >>>>>> respectively >>>>>> >>>>>> pipeline.apply( >>>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>>> .setCoder(KryoCoder.of()) >>>>>> >>>>>> and >>>>>> >>>>>> ElasticsearchIO.write() >>>>>> .withUseStatefulBatches(true) >>>>>> .withMaxParallelRequestsPerWindow(1) >>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>>> // # of docs >>>>>> .withMaxBatchSize(1000) >>>>>> .withConnectionConfiguration( >>>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>>> arrayOf(host), >>>>>> "fubar", >>>>>> "_doc" >>>>>> ).withConnectTimeout(5000) >>>>>> .withSocketTimeout(30000) >>>>>> ) >>>>>> .withRetryConfiguration( >>>>>> ElasticsearchIO.RetryConfiguration.create( >>>>>> 10, >>>>>> // the duration is wall clock, against the >>>>>> connection and socket timeouts specified >>>>>> // above. I.e., 10 x 30s is gonna be more than 3 >>>>>> minutes, so if we're getting >>>>>> // 10 socket timeouts in a row, this would ignore >>>>>> the "10" part and terminate >>>>>> // after 6. The idea is that in a mixed failure >>>>>> mode, you'd get different timeouts >>>>>> // of different durations, and on average 10 x fails >>>>>> < 4m. >>>>>> // That said, 4m is arbitrary, so adjust as and when >>>>>> needed. >>>>>> Duration.standardMinutes(4) >>>>>> ) >>>>>> ) >>>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>>> .withIndexFn { f: JsonNode -> f["schema_name"].asText() } >>>>>> .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") >>>>>> == "delete" } >>>>>> >>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in >>>>>> the consumer, due to alleged time skew, specifically >>>>>> >>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp >>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the >>>>>> timestamp of the >>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 >>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the >>>>>> DoFn#getAllowedTimestampSkew() Javadoc >>>>>> for details on changing the allowed skew. >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>>> >>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version this >>>>>> breaks, and it seems like the code in the trace is largely added by the >>>>>> PR >>>>>> linked above. The error usually claims a skew of a few seconds, but >>>>>> obviously I can’t override getAllowedTimestampSkew() on the internal >>>>>> Elastic DoFn, and it’s marked deprecated anyway. >>>>>> >>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was >>>>>> intending to fix, and additionally, I’d also be happy if someone else can >>>>>> reproduce this or knows of similar reports. I feel like what we’re doing >>>>>> is >>>>>> not *that* uncommon a scenario, so I would have thought someone else >>>>>> would have hit this by now. >>>>>> >>>>>> Best, >>>>>> Emils >>>>>> >>>>>
