> > BigQuery...implementation could have preserved the windowing information > and restored it later before outputting those records into the > successful/failed write PCollections.
@Luke Cwik <[email protected]> By "preserve" do you mean creating an in-memory data structure to hold windowing info and make use of that info later? Would that also imply holding the watermark so that any windows in-memory could not be "passed over" before being restored? I'd be keen to hear more about how one would ideally preserve and restore window information. I agree that outputting to arbitrary windows is Bad™. It's worth noting that RedisIO[1] also has this pattern of arbitrary window output via the same mechanism that ElasticsearchIO employs. [1] https://github.com/apache/beam/blob/0d007a7336e3bbeaa159111f3888f118dc56d0fe/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L433-L455 On Thu, Mar 10, 2022 at 2:37 PM Reuven Lax <[email protected]> wrote: > IMO the fact that BigQueryIO doesn't preserve windowing when outputting > successful or failed elements was a bug. > > On Thu, Mar 10, 2022 at 11:34 AM Luke Cwik <[email protected]> wrote: > >> I believe preserving the input window and element timestamp makes the >> most sense. Outputting in the global window seems like a distant second. >> Outputting in an arbitrary different window or with an arbitrary timestamp >> is incorrect. >> >> BigQuery outputs successful and failed writes in the global window but >> the implementation could have preserved the windowing information and >> restored it later before outputting those records into the >> successful/failed write PCollections. Based upon the code it seems like >> global windows was used to get around an implementation detail that is >> sub-optimal within Dataflow's GroupIntoBatches implementation. >> >> On Thu, Mar 10, 2022 at 6:26 AM Kenneth Knowles <[email protected]> wrote: >> >>> >>> >>> On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <[email protected]> >>> wrote: >>> >>>> But I'm not sure if an IO throwing away windows (in favour of Global) >>>>> would be acceptable either. BigQueryIO#write (streaming) also has a >>>>> notion >>>>> of "getFailedWrites" with the same intent as what's in ElasticsearchIO; I >>>>> wonder how that's implemented? >>>>> >>>> >>>> It appears that BigQueryIO[1] just re-windows inputs into the global >>>> window and does not concern itself with preserving input windows of >>>> elements nor holding the watermark. Is that a good precedent to follow? >>>> It would certainly simplify the implementation of ElasticsearchIO#bulkIO. >>>> >>> >>> Big picture answer: Windowing into the global window happens whether you >>> remember to do it or not. The world outside of Beam doesn't relate to >>> Beam's windows. Doing the re-windowing explicitly might make you more >>> likely to author the best code and/or offer the users the needed API to >>> reify windowing information and/or document that they need to do it >>> themselves before applying the write transform. >>> >>> Even more pedantically big picture answer: holding the watermark is >>> exactly and only "reserving the right output an element with that timestamp >>> later" so you can guide your thinking with this. If you don't need to >>> output an element at a timestamp, don't hold it. >>> >>> BUT write transforms should almost always output elements describing >>> what was written. You want to think about the timestamps and windowing on >>> these elements. I'm not sure what the best practice is here. TBH I could >>> see this actually functioning as a new "source" where everything is global >>> window and timestamps describe the moment the data was committed, but not >>> sure this is easily expressed. What do other IOs do about windowing these >>> outputs? >>> >>> Kenn >>> >>> >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318 >>>> >>>> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <[email protected]> >>>> wrote: >>>> >>>>> Oops, unfortunate misfire before being done writing my prior email. >>>>> >>>>> Thanks Jan for the thoughtful responses! It definitely sounds as >>>>> though the ElasticsearchIO#write transform would be safer to either hold >>>>> the watermark via similar machinery employed by GroupIntoBatches or >>>>> possibly output everything into a global window; either sounds more safe >>>>> in >>>>> terms of data correctness than potentially outputting "on time" data as >>>>> "late" data. But I'm not sure if an IO throwing away windows (in favour >>>>> of >>>>> Global) would be acceptable either. BigQueryIO#write (streaming) also has >>>>> a notion of "getFailedWrites" with the same intent as what's in >>>>> ElasticsearchIO; I wonder how that's implemented? >>>>> >>>>> I'm keen to follow any further discussion on watermark holds for >>>>> DoFn's that implement both @StartBundle and @FinishBundle! >>>>> >>>>> - Evan >>>>> >>>>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <[email protected]> >>>>> wrote: >>>>> >>>>>> Got it. So ProcessContext#ouput has the semantics of "using the >>>>>> window associated with the current @Element, output the data being passed >>>>>> in into that window." Makes complete sense. >>>>>> >>>>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>>> Interesting discussion. :-) >>>>>>> >>>>>>> Answers inline. >>>>>>> On 3/8/22 22:00, Evan Galpin wrote: >>>>>>> >>>>>>> Thanks Jan for confirming that the fix looks alright. I also found >>>>>>> a PR[1] that appears to be a good case study of the Timer watermark hold >>>>>>> technique that you previously mentioned so I'll study that a bit for my >>>>>>> own >>>>>>> understanding and future use. I was also previously missing the notion >>>>>>> that a singular bundle could contain elements from many disparate >>>>>>> Windows, >>>>>>> so that's a great upgrade to my own mental model haha. >>>>>>> >>>>>>> I have a few follow-up questions for my own understanding (hopefully >>>>>>> not off topic). >>>>>>> >>>>>>> 1. Am I understanding correctly that using ProcessContext#output >>>>>>> to output a given element will preserve the element's input >>>>>>> window(s)? Or >>>>>>> is it the case that when using ProcessContext#output, any buffered >>>>>>> elements >>>>>>> will all be output to whatever window the most recent element >>>>>>> processed >>>>>>> by @ProcessElement belongs to? >>>>>>> >>>>>>> There is no way Beam can distinguish *how* you produced elements >>>>>>> emitted from @ProcessElement (if those are result of some in-memory >>>>>>> buffering or direct result of computation of the single input element >>>>>>> currently being processed). From that it follows, that window(s) >>>>>>> associated >>>>>>> with the output will be equal to window(s) of the input element >>>>>>> currently >>>>>>> processed (generally, the model allows multiple applications of window >>>>>>> function, which is why window.maxTimestamp is 1 ms shifted backwards, >>>>>>> to be >>>>>>> still part of the window itself, so that the window functions based on >>>>>>> timestamps are idempotent). >>>>>>> >>>>>>> >>>>>>> 1. Is it safe to emit outputs from @FinishBundle to windows >>>>>>> which may be older than the watermark? I believe this could still be >>>>>>> happening given the current ElasticsearchIO implementation where any >>>>>>> buffered elements are output in @FinishBundle using the same >>>>>>> window they were associated with on input. Intuitively, that sounds >>>>>>> as >>>>>>> though it could be a correctness bug if the watermark had progressed >>>>>>> beyond >>>>>>> those windows. >>>>>>> >>>>>>> This is great question that probably deserves deeper attention. >>>>>>> >>>>>>> a) first of all - the fact, that you potentially could output late >>>>>>> data, that previously were not late is _potentially_ a correctness bug, >>>>>>> if >>>>>>> you have any downstream logic, that performs any additional grouping >>>>>>> and - >>>>>>> most notably - relies on causality being preserved (which is how our >>>>>>> universe works, so this is a natural requirement). Changing element from >>>>>>> "on time" to "late" can result in downstream processing see effect >>>>>>> prior to >>>>>>> cause (in event time!, because the watermark move can cause downstream >>>>>>> timers to fire). I wrote something about this in [1]. Generally, if >>>>>>> there >>>>>>> is no grouping (or your application logic accounts for the swapping of >>>>>>> cause and effect), then this is not an issue. If you produce a >>>>>>> PCollection >>>>>>> for user, then you don't know the user does with it and therefore should >>>>>>> pay attention to it. >>>>>>> >>>>>>> b) second, this effect would not appear if mid-bundle output >>>>>>> watermarks updates were not possible. I emphasize that I don't know if >>>>>>> this >>>>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or @luke >>>>>>> can >>>>>>> you correct me if I'm wrong?). I have some doubts if it is correct, >>>>>>> though. >>>>>>> It seems that it causes generally issues with in-memory buffering and >>>>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements >>>>>>> @StartBundle >>>>>>> and @FinishBundle it seems, that we should hold output watermark between >>>>>>> these two calls. But that would turn stateless DoFn into stateful, >>>>>>> which is >>>>>>> ... unfortunate. :) >>>>>>> >>>>>>> I hope I made things a little more clear rather than more >>>>>>> obfuscated. :) >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745 >>>>>>> >>>>>>> >>>>>>> It's definitely interesting to think about the idea of enforcing >>>>>>> watermark update only between bundles, but presumably that would mean >>>>>>> quite >>>>>>> extensive alterations. >>>>>>> >>>>>>> - Evan >>>>>>> >>>>>>> [1] https://github.com/apache/beam/pull/15249 >>>>>>> >>>>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Evan, >>>>>>>> >>>>>>>> the fix looks good to me, as long as the timestamp of the buffered >>>>>>>> data need to be preserved downstream. Generally I think it *should* be >>>>>>>> possible to output in-memory buffered data in @ProcessElement (and >>>>>>>> @FinishBundle), the case where you need timers is when your buffer >>>>>>>> needs to >>>>>>>> span multiple bundles. It that case it also must be stored in regular >>>>>>>> state >>>>>>>> and not in-memory. >>>>>>>> >>>>>>>> It seems to me that there is some logical gap in how we handle the >>>>>>>> per-bundle buffering. I see two possibilities: >>>>>>>> >>>>>>>> a) either we allow in the model to change the output watermark >>>>>>>> *while* processing a bundle, - in which case it is logical requirement >>>>>>>> to >>>>>>>> have output timestamp from @ProcessElement no earlier than timestamp >>>>>>>> of the >>>>>>>> current element (because that way we preserve the "on time", "late" >>>>>>>> status >>>>>>>> of the current element, we don't swap anything), or >>>>>>>> >>>>>>>> b) we enforce output watermark update only in-between of bundles - >>>>>>>> in that case the requirement could be relaxed that the output timestamp >>>>>>>> from @ProcessElement might be no earlier than the minimum of timestamps >>>>>>>> inside the bundle >>>>>>>> >>>>>>>> I'm afraid that our current position is a). But in that case it is >>>>>>>> somewhat questionable if it is semantically correct to use >>>>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It >>>>>>>> can >>>>>>>> move timestamps only to future instant (and inside same window!), >>>>>>>> which has >>>>>>>> little semantic meaning to me. Looks more error prone than useful. >>>>>>>> >>>>>>>> Jan >>>>>>>> On 3/8/22 15:53, Evan Galpin wrote: >>>>>>>> >>>>>>>> Thanks Jan, it's interesting to read about the handling of >>>>>>>> timestamp in cases employing a buffering pattern. In the case of the >>>>>>>> ES >>>>>>>> write transform, buffered data could be output from ProcessElement or >>>>>>>> FinishBundle. It's the case where data is output from ProcessElement >>>>>>>> that >>>>>>>> the error reported at the start of this thread shows up. Based on >>>>>>>> what you >>>>>>>> described, it sounds like the PR[1] I made to "fix" this issue is >>>>>>>> actually >>>>>>>> fixing it by transitioning data from being late to being on time and >>>>>>>> outputting all buffered data into a non-deterministic window where the >>>>>>>> output heuristic is later satisfied (number of elements buffered or max >>>>>>>> time since last output). It seems there's 2 issues as a result: >>>>>>>> >>>>>>>> 1. The window to which input elements belong is not being >>>>>>>> preserved. Presumably we want IOs to leave an element's window >>>>>>>> unaltered? >>>>>>>> 2. The watermark of the system might be incorrect given that >>>>>>>> buffered data is assumed processed and allows the watermark to pass? >>>>>>>> >>>>>>>> It would definitely add complexity to the IO to employ timers to >>>>>>>> address this, but if that's the preferred or only solution I'll put >>>>>>>> some >>>>>>>> thought into how to implement that solution. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Evan >>>>>>>> >>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>> >>>>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to >>>>>>>>> update output watermark while a bundle is being processed? That seems >>>>>>>>> it >>>>>>>>> could also cause the "watermark skip" problem, which is definitely an >>>>>>>>> issue >>>>>>>>> (and is probably the reason why the check fails?). >>>>>>>>> On 3/8/22 09:35, Jan Lukavský wrote: >>>>>>>>> >>>>>>>>> The buffering seems incorrect to me. Whenever there is a buffer, >>>>>>>>> we need to make sure we hold the output watermark, otherwise the >>>>>>>>> watermark >>>>>>>>> might "jump over" a buffered element transitioning it from "on-time" >>>>>>>>> to >>>>>>>>> "late", which would be a correctness bug (we can transition elements >>>>>>>>> only >>>>>>>>> from "late" to "on-time", never the other way around). The >>>>>>>>> alternative is >>>>>>>>> to use @FinishBundle to do the flushing, but might not be appropriate >>>>>>>>> here. >>>>>>>>> >>>>>>>>> Currently, the only way to limit the progress of output watermark >>>>>>>>> is by setting a timer with output timestamp that has the timestamp of >>>>>>>>> the >>>>>>>>> earliest element in the buffer. There was a thread that was >>>>>>>>> discussing this >>>>>>>>> in more details [1]. >>>>>>>>> >>>>>>>>> Jan >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8 >>>>>>>>> On 3/7/22 19:54, Evan Galpin wrote: >>>>>>>>> >>>>>>>>> In general, I'd also really like to improve my understanding and >>>>>>>>> learn more about how the employed buffering can cause this skew. Is >>>>>>>>> it >>>>>>>>> because the call to "flush" is happening from a different "current >>>>>>>>> window" >>>>>>>>> than the elements were originally buffered from? I'm actually >>>>>>>>> thinking >>>>>>>>> that the PR[1] to "fix" this would have had the side effect of >>>>>>>>> outputting >>>>>>>>> buffered elements into the window from which "flush" was called >>>>>>>>> rather than >>>>>>>>> the window from which the buffered data originated. I suppose that >>>>>>>>> could be >>>>>>>>> problematic, but should at least satisfy the validation code. >>>>>>>>> >>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>> >>>>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> x-post from the associated Jira ticket[0] >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Fortunately/unfortunately this same issue struck me as well, and >>>>>>>>>> I opened a PR[1] to use `ProcessContext#output` rather than >>>>>>>>>> `ProcessContext#outputWithTimestamp`. I believe that should resolve >>>>>>>>>> this >>>>>>>>>> issue, it has for me when running jobs with a vendored SDK with that >>>>>>>>>> change >>>>>>>>>> included. Do folks feel this change to be cherry-picked into 2.37.0? >>>>>>>>>> >>>>>>>>>> The change also prompted a question to the mailing list[2] about >>>>>>>>>> skew validation difference between ProcessContext vs >>>>>>>>>> FinishBundleContext >>>>>>>>>> (where there is no ability to compute skew as I understand it). >>>>>>>>>> >>>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>> >>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>> >>>>>>>>>> [2] >>>>>>>>>> https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp >>>>>>>>>> >>>>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> This is specifically a case where the @ProcessElement saw window >>>>>>>>>>> X for element X0 and buffered it into memory and then when >>>>>>>>>>> processing >>>>>>>>>>> window Y and element Y0 wanted to flush previously buffered element >>>>>>>>>>> X0. >>>>>>>>>>> This all occurred as part of the same bundle. >>>>>>>>>>> >>>>>>>>>>> In general, yes, outputting to an earlier window is problematic. >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Outputting to an earlier window is problematic,as the watermark >>>>>>>>>>>> can never be correct if a DoFn can move time backwards arbitrarily. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> A good question would be should I be able to output to a >>>>>>>>>>>>> different window from the current @ProcessElement call, like what >>>>>>>>>>>>> we can do >>>>>>>>>>>>> from @FinishBundle to handle these buffering scenarios. >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results from >>>>>>>>>>>>>> elements in window X and then trying to output them in window Y >>>>>>>>>>>>>> when >>>>>>>>>>>>>> flushing the batch. This exposed a bug where elements that were >>>>>>>>>>>>>> being >>>>>>>>>>>>>> buffered were being output as part of a different window than >>>>>>>>>>>>>> what the >>>>>>>>>>>>>> window that produced them was. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This became visible because validation was added recently to >>>>>>>>>>>>>> ensure that when the pipeline is processing elements in window X >>>>>>>>>>>>>> that >>>>>>>>>>>>>> output with a timestamp is valid for window X. Note that this >>>>>>>>>>>>>> validation >>>>>>>>>>>>>> only occurs in @ProcessElement since output is associated with >>>>>>>>>>>>>> the current >>>>>>>>>>>>>> window with the input element that is being processed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no >>>>>>>>>>>>>> existing windowing context and when you output that element is >>>>>>>>>>>>>> assigned to >>>>>>>>>>>>>> an appropriate window. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch >>>>>>>>>>>>>>> writing. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m >>>>>>>>>>>>>>> reasonably certain it’s this PR >>>>>>>>>>>>>>> https://github.com/apache/beam/pull/15381 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write >>>>>>>>>>>>>>> to Elastic in a streaming job, the config for the source and >>>>>>>>>>>>>>> sink is >>>>>>>>>>>>>>> respectively >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>>>>>>>>>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>>>>>>>>>>>> .setCoder(KryoCoder.of()) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ElasticsearchIO.write() >>>>>>>>>>>>>>> .withUseStatefulBatches(true) >>>>>>>>>>>>>>> .withMaxParallelRequestsPerWindow(1) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>>>>>>>>>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>>>>>>>>>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>>>>>>>>>>>> // # of docs >>>>>>>>>>>>>>> .withMaxBatchSize(1000) >>>>>>>>>>>>>>> .withConnectionConfiguration( >>>>>>>>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>>>>>>>>>>>> arrayOf(host), >>>>>>>>>>>>>>> "fubar", >>>>>>>>>>>>>>> "_doc" >>>>>>>>>>>>>>> ).withConnectTimeout(5000) >>>>>>>>>>>>>>> .withSocketTimeout(30000) >>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>> .withRetryConfiguration( >>>>>>>>>>>>>>> ElasticsearchIO.RetryConfiguration.create( >>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>>> // the duration is wall clock, against the >>>>>>>>>>>>>>> connection and socket timeouts specified >>>>>>>>>>>>>>> // above. I.e., 10 x 30s is gonna be more >>>>>>>>>>>>>>> than 3 minutes, so if we're getting >>>>>>>>>>>>>>> // 10 socket timeouts in a row, this would >>>>>>>>>>>>>>> ignore the "10" part and terminate >>>>>>>>>>>>>>> // after 6. The idea is that in a mixed >>>>>>>>>>>>>>> failure mode, you'd get different timeouts >>>>>>>>>>>>>>> // of different durations, and on average >>>>>>>>>>>>>>> 10 x fails < 4m. >>>>>>>>>>>>>>> // That said, 4m is arbitrary, so adjust as >>>>>>>>>>>>>>> and when needed. >>>>>>>>>>>>>>> Duration.standardMinutes(4) >>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>>>>>>>>>>>> .withIndexFn { f: JsonNode -> >>>>>>>>>>>>>>> f["schema_name"].asText() } >>>>>>>>>>>>>>> .withIsDeleteFn { f: JsonNode -> >>>>>>>>>>>>>>> f["_action"].asText("noop") == "delete" } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit >>>>>>>>>>>>>>> a bug in the consumer, due to alleged time skew, specifically >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with >>>>>>>>>>>>>>> timestamp 2022-03-07T10:43:38.640Z. Output timestamps must be >>>>>>>>>>>>>>> no earlier than the timestamp of the >>>>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew >>>>>>>>>>>>>>> (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. >>>>>>>>>>>>>>> See the DoFn#getAllowedTimestampSkew() Javadoc >>>>>>>>>>>>>>> for details on changing the allowed skew. >>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first >>>>>>>>>>>>>>> version this breaks, and it seems like the code in the trace is >>>>>>>>>>>>>>> largely >>>>>>>>>>>>>>> added by the PR linked above. The error usually claims a skew >>>>>>>>>>>>>>> of a few >>>>>>>>>>>>>>> seconds, but obviously I can’t override >>>>>>>>>>>>>>> getAllowedTimestampSkew() on the internal Elastic DoFn, and >>>>>>>>>>>>>>> it’s marked deprecated anyway. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the >>>>>>>>>>>>>>> code was intending to fix, and additionally, I’d also be happy >>>>>>>>>>>>>>> if someone >>>>>>>>>>>>>>> else can reproduce this or knows of similar reports. I feel >>>>>>>>>>>>>>> like what we’re >>>>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have >>>>>>>>>>>>>>> thought someone else would have hit this by now. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Emils >>>>>>>>>>>>>>> >>>>>>>>>>>>>>
