Thanks Luke for all the info; I wasn't previously aware of the Reify utilities. You raise a great point about how much simpler this IO would be without bypassing GroupIntoBatches. When I originally wrote the Stateful BulkIO adapter in 2018 (which sadly sat in a walled garden for a while before it was allowed to be contributed to Beam OSS repo), it seemed to me that stateful processing was somewhat new and not universally supported by runners. I didn't want to alienate users that were using ElasticsearchIO on a runner without stateful processing.
I think stateful processing is widely supported at this point. I would be happy to remove bundle-based batching from ElasticsearchIO if it's acceptable that an IO cannot be used on a runner without state support. For what it's worth, bundle-based batching created such small batches in my experience that it was almost unusable at scale with Elasticsearch anyway. Thanks, Evan On Thu, Mar 10, 2022 at 4:02 PM Luke Cwik <lc...@google.com> wrote: > The ElasticsearchIO implementation seems to only buffer elements within a > single bundle. Within the lifetime of a single bundle, you don't need to > worry about holding the watermark back as the runners do that for you. Once > you cross bundle boundaries via a stateful DoFn processing the same key > multiple times or a splittable DoFn processing another part of the > restriction do you have to handle the watermark by either setting the > output time for a timer or controlling the watermark with the watermark > estimator. > > The API for @ProcessElement doesn't allow you to output buffered elements > with windowing information cleanly since there are use cases like the one > that you have built where you want to buffer in memory and want to produce > the entire record with windowing information at some point of time in the > future. > > You could do as Reuven suggested where you structure the pipeline like: > ... -> Reify.timestamps[1] -> Window.into(Default GlobalWindow strategy) > -> (optional) GroupIntoBatches -> ParDo(BulkIOFn/StatefulBulkIOFn) -> > Window.into(OriginalWindowingStrategy) -> ... > > Why do we opt-out of using the GroupIntoBatches route in ElasticsearchIO > code? > > If we didn't need to opt out you could do something like: > GroupIntoBatches -> ParDo(StatefulBulkIOFn) > and the StatefulBulkIOFn would process the iterable as a batch that is > sent to ElasticsearchIO. > > 1: > https://github.com/apache/beam/blob/c76a1c8361b83dd85b6edf0eddd5ebe03873e0ce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java#L220 > > > > > On Thu, Mar 10, 2022 at 11:50 AM Reuven Lax <re...@google.com> wrote: > >> >> >> On Thu, Mar 10, 2022 at 11:47 AM Evan Galpin <evan.gal...@gmail.com> >> wrote: >> >>> BigQuery...implementation could have preserved the windowing information >>>> and restored it later before outputting those records into the >>>> successful/failed write PCollections. >>> >>> >>> @Luke Cwik <lc...@google.com> By "preserve" do you mean creating an >>> in-memory data structure to hold windowing info and make use of that info >>> later? Would that also imply holding the watermark so that any windows >>> in-memory could not be "passed over" before being restored? I'd be keen to >>> hear more about how one would ideally preserve and restore window >>> information. >>> >> >> One way is to preserve timestamps and simply reapply the WindowFn >> >> >>> >>> I agree that outputting to arbitrary windows is Bad™. It's worth noting >>> that RedisIO[1] also has this pattern of arbitrary window output via the >>> same mechanism that ElasticsearchIO employs. >>> >>> [1] >>> https://github.com/apache/beam/blob/0d007a7336e3bbeaa159111f3888f118dc56d0fe/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L433-L455 >>> >>> On Thu, Mar 10, 2022 at 2:37 PM Reuven Lax <re...@google.com> wrote: >>> >>>> IMO the fact that BigQueryIO doesn't preserve windowing when outputting >>>> successful or failed elements was a bug. >>>> >>>> On Thu, Mar 10, 2022 at 11:34 AM Luke Cwik <lc...@google.com> wrote: >>>> >>>>> I believe preserving the input window and element timestamp makes the >>>>> most sense. Outputting in the global window seems like a distant second. >>>>> Outputting in an arbitrary different window or with an arbitrary timestamp >>>>> is incorrect. >>>>> >>>>> BigQuery outputs successful and failed writes in the global window but >>>>> the implementation could have preserved the windowing information and >>>>> restored it later before outputting those records into the >>>>> successful/failed write PCollections. Based upon the code it seems like >>>>> global windows was used to get around an implementation detail that is >>>>> sub-optimal within Dataflow's GroupIntoBatches implementation. >>>>> >>>>> On Thu, Mar 10, 2022 at 6:26 AM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <evan.gal...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> But I'm not sure if an IO throwing away windows (in favour of >>>>>>>> Global) would be acceptable either. BigQueryIO#write (streaming) also >>>>>>>> has >>>>>>>> a notion of "getFailedWrites" with the same intent as what's in >>>>>>>> ElasticsearchIO; I wonder how that's implemented? >>>>>>>> >>>>>>> >>>>>>> It appears that BigQueryIO[1] just re-windows inputs into the global >>>>>>> window and does not concern itself with preserving input windows of >>>>>>> elements nor holding the watermark. Is that a good precedent to follow? >>>>>>> It would certainly simplify the implementation of >>>>>>> ElasticsearchIO#bulkIO. >>>>>>> >>>>>> >>>>>> Big picture answer: Windowing into the global window happens whether >>>>>> you remember to do it or not. The world outside of Beam doesn't relate to >>>>>> Beam's windows. Doing the re-windowing explicitly might make you more >>>>>> likely to author the best code and/or offer the users the needed API to >>>>>> reify windowing information and/or document that they need to do it >>>>>> themselves before applying the write transform. >>>>>> >>>>>> Even more pedantically big picture answer: holding the watermark is >>>>>> exactly and only "reserving the right output an element with that >>>>>> timestamp >>>>>> later" so you can guide your thinking with this. If you don't need to >>>>>> output an element at a timestamp, don't hold it. >>>>>> >>>>>> BUT write transforms should almost always output elements describing >>>>>> what was written. You want to think about the timestamps and windowing on >>>>>> these elements. I'm not sure what the best practice is here. TBH I could >>>>>> see this actually functioning as a new "source" where everything is >>>>>> global >>>>>> window and timestamps describe the moment the data was committed, but not >>>>>> sure this is easily expressed. What do other IOs do about windowing these >>>>>> outputs? >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318 >>>>>>> >>>>>>> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <evan.gal...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Oops, unfortunate misfire before being done writing my prior email. >>>>>>>> >>>>>>>> Thanks Jan for the thoughtful responses! It definitely sounds as >>>>>>>> though the ElasticsearchIO#write transform would be safer to either >>>>>>>> hold >>>>>>>> the watermark via similar machinery employed by GroupIntoBatches or >>>>>>>> possibly output everything into a global window; either sounds more >>>>>>>> safe in >>>>>>>> terms of data correctness than potentially outputting "on time" data as >>>>>>>> "late" data. But I'm not sure if an IO throwing away windows (in >>>>>>>> favour of >>>>>>>> Global) would be acceptable either. BigQueryIO#write (streaming) also >>>>>>>> has >>>>>>>> a notion of "getFailedWrites" with the same intent as what's in >>>>>>>> ElasticsearchIO; I wonder how that's implemented? >>>>>>>> >>>>>>>> I'm keen to follow any further discussion on watermark holds for >>>>>>>> DoFn's that implement both @StartBundle and @FinishBundle! >>>>>>>> >>>>>>>> - Evan >>>>>>>> >>>>>>>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <evan.gal...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Got it. So ProcessContext#ouput has the semantics of "using the >>>>>>>>> window associated with the current @Element, output the data being >>>>>>>>> passed >>>>>>>>> in into that window." Makes complete sense. >>>>>>>>> >>>>>>>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Interesting discussion. :-) >>>>>>>>>> >>>>>>>>>> Answers inline. >>>>>>>>>> On 3/8/22 22:00, Evan Galpin wrote: >>>>>>>>>> >>>>>>>>>> Thanks Jan for confirming that the fix looks alright. I also >>>>>>>>>> found a PR[1] that appears to be a good case study of the Timer >>>>>>>>>> watermark >>>>>>>>>> hold technique that you previously mentioned so I'll study that a >>>>>>>>>> bit for >>>>>>>>>> my own understanding and future use. I was also previously missing >>>>>>>>>> the >>>>>>>>>> notion that a singular bundle could contain elements from many >>>>>>>>>> disparate >>>>>>>>>> Windows, so that's a great upgrade to my own mental model haha. >>>>>>>>>> >>>>>>>>>> I have a few follow-up questions for my own understanding >>>>>>>>>> (hopefully not off topic). >>>>>>>>>> >>>>>>>>>> 1. Am I understanding correctly that using >>>>>>>>>> ProcessContext#output to output a given element will preserve the >>>>>>>>>> element's >>>>>>>>>> input window(s)? Or is it the case that when using >>>>>>>>>> ProcessContext#output, >>>>>>>>>> any buffered elements will all be output to whatever window the >>>>>>>>>> most recent >>>>>>>>>> element processed by @ProcessElement belongs to? >>>>>>>>>> >>>>>>>>>> There is no way Beam can distinguish *how* you produced elements >>>>>>>>>> emitted from @ProcessElement (if those are result of some in-memory >>>>>>>>>> buffering or direct result of computation of the single input element >>>>>>>>>> currently being processed). From that it follows, that window(s) >>>>>>>>>> associated >>>>>>>>>> with the output will be equal to window(s) of the input element >>>>>>>>>> currently >>>>>>>>>> processed (generally, the model allows multiple applications of >>>>>>>>>> window >>>>>>>>>> function, which is why window.maxTimestamp is 1 ms shifted >>>>>>>>>> backwards, to be >>>>>>>>>> still part of the window itself, so that the window functions based >>>>>>>>>> on >>>>>>>>>> timestamps are idempotent). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. Is it safe to emit outputs from @FinishBundle to windows >>>>>>>>>> which may be older than the watermark? I believe this could >>>>>>>>>> still be >>>>>>>>>> happening given the current ElasticsearchIO implementation where >>>>>>>>>> any >>>>>>>>>> buffered elements are output in @FinishBundle using the same >>>>>>>>>> window they were associated with on input. Intuitively, that >>>>>>>>>> sounds as >>>>>>>>>> though it could be a correctness bug if the watermark had >>>>>>>>>> progressed beyond >>>>>>>>>> those windows. >>>>>>>>>> >>>>>>>>>> This is great question that probably deserves deeper attention. >>>>>>>>>> >>>>>>>>>> a) first of all - the fact, that you potentially could output >>>>>>>>>> late data, that previously were not late is _potentially_ a >>>>>>>>>> correctness >>>>>>>>>> bug, if you have any downstream logic, that performs any additional >>>>>>>>>> grouping and - most notably - relies on causality being preserved >>>>>>>>>> (which is >>>>>>>>>> how our universe works, so this is a natural requirement). Changing >>>>>>>>>> element >>>>>>>>>> from "on time" to "late" can result in downstream processing see >>>>>>>>>> effect >>>>>>>>>> prior to cause (in event time!, because the watermark move can cause >>>>>>>>>> downstream timers to fire). I wrote something about this in [1]. >>>>>>>>>> Generally, >>>>>>>>>> if there is no grouping (or your application logic accounts for the >>>>>>>>>> swapping of cause and effect), then this is not an issue. If you >>>>>>>>>> produce a >>>>>>>>>> PCollection for user, then you don't know the user does with it and >>>>>>>>>> therefore should pay attention to it. >>>>>>>>>> >>>>>>>>>> b) second, this effect would not appear if mid-bundle output >>>>>>>>>> watermarks updates were not possible. I emphasize that I don't know >>>>>>>>>> if this >>>>>>>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or >>>>>>>>>> @luke can >>>>>>>>>> you correct me if I'm wrong?). I have some doubts if it is correct, >>>>>>>>>> though. >>>>>>>>>> It seems that it causes generally issues with in-memory buffering and >>>>>>>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements >>>>>>>>>> @StartBundle >>>>>>>>>> and @FinishBundle it seems, that we should hold output watermark >>>>>>>>>> between >>>>>>>>>> these two calls. But that would turn stateless DoFn into stateful, >>>>>>>>>> which is >>>>>>>>>> ... unfortunate. :) >>>>>>>>>> >>>>>>>>>> I hope I made things a little more clear rather than more >>>>>>>>>> obfuscated. :) >>>>>>>>>> >>>>>>>>>> Jan >>>>>>>>>> >>>>>>>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> It's definitely interesting to think about the idea of enforcing >>>>>>>>>> watermark update only between bundles, but presumably that would >>>>>>>>>> mean quite >>>>>>>>>> extensive alterations. >>>>>>>>>> >>>>>>>>>> - Evan >>>>>>>>>> >>>>>>>>>> [1] https://github.com/apache/beam/pull/15249 >>>>>>>>>> >>>>>>>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Evan, >>>>>>>>>>> >>>>>>>>>>> the fix looks good to me, as long as the timestamp of the >>>>>>>>>>> buffered data need to be preserved downstream. Generally I think it >>>>>>>>>>> *should* be possible to output in-memory buffered data in >>>>>>>>>>> @ProcessElement >>>>>>>>>>> (and @FinishBundle), the case where you need timers is when your >>>>>>>>>>> buffer >>>>>>>>>>> needs to span multiple bundles. It that case it also must be stored >>>>>>>>>>> in >>>>>>>>>>> regular state and not in-memory. >>>>>>>>>>> >>>>>>>>>>> It seems to me that there is some logical gap in how we handle >>>>>>>>>>> the per-bundle buffering. I see two possibilities: >>>>>>>>>>> >>>>>>>>>>> a) either we allow in the model to change the output watermark >>>>>>>>>>> *while* processing a bundle, - in which case it is logical >>>>>>>>>>> requirement to >>>>>>>>>>> have output timestamp from @ProcessElement no earlier than >>>>>>>>>>> timestamp of the >>>>>>>>>>> current element (because that way we preserve the "on time", "late" >>>>>>>>>>> status >>>>>>>>>>> of the current element, we don't swap anything), or >>>>>>>>>>> >>>>>>>>>>> b) we enforce output watermark update only in-between of >>>>>>>>>>> bundles - in that case the requirement could be relaxed that the >>>>>>>>>>> output >>>>>>>>>>> timestamp from @ProcessElement might be no earlier than the minimum >>>>>>>>>>> of >>>>>>>>>>> timestamps inside the bundle >>>>>>>>>>> >>>>>>>>>>> I'm afraid that our current position is a). But in that case it >>>>>>>>>>> is somewhat questionable if it is semantically correct to use >>>>>>>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. >>>>>>>>>>> It can >>>>>>>>>>> move timestamps only to future instant (and inside same window!), >>>>>>>>>>> which has >>>>>>>>>>> little semantic meaning to me. Looks more error prone than useful. >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> On 3/8/22 15:53, Evan Galpin wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks Jan, it's interesting to read about the handling of >>>>>>>>>>> timestamp in cases employing a buffering pattern. In the case of >>>>>>>>>>> the ES >>>>>>>>>>> write transform, buffered data could be output from ProcessElement >>>>>>>>>>> or >>>>>>>>>>> FinishBundle. It's the case where data is output from >>>>>>>>>>> ProcessElement that >>>>>>>>>>> the error reported at the start of this thread shows up. Based on >>>>>>>>>>> what you >>>>>>>>>>> described, it sounds like the PR[1] I made to "fix" this issue is >>>>>>>>>>> actually >>>>>>>>>>> fixing it by transitioning data from being late to being on time and >>>>>>>>>>> outputting all buffered data into a non-deterministic window where >>>>>>>>>>> the >>>>>>>>>>> output heuristic is later satisfied (number of elements buffered or >>>>>>>>>>> max >>>>>>>>>>> time since last output). It seems there's 2 issues as a result: >>>>>>>>>>> >>>>>>>>>>> 1. The window to which input elements belong is not being >>>>>>>>>>> preserved. Presumably we want IOs to leave an element's window >>>>>>>>>>> unaltered? >>>>>>>>>>> 2. The watermark of the system might be incorrect given that >>>>>>>>>>> buffered data is assumed processed and allows the watermark to pass? >>>>>>>>>>> >>>>>>>>>>> It would definitely add complexity to the IO to employ timers to >>>>>>>>>>> address this, but if that's the preferred or only solution I'll put >>>>>>>>>>> some >>>>>>>>>>> thought into how to implement that solution. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Evan >>>>>>>>>>> >>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to >>>>>>>>>>>> update output watermark while a bundle is being processed? That >>>>>>>>>>>> seems it >>>>>>>>>>>> could also cause the "watermark skip" problem, which is definitely >>>>>>>>>>>> an issue >>>>>>>>>>>> (and is probably the reason why the check fails?). >>>>>>>>>>>> On 3/8/22 09:35, Jan Lukavský wrote: >>>>>>>>>>>> >>>>>>>>>>>> The buffering seems incorrect to me. Whenever there is a >>>>>>>>>>>> buffer, we need to make sure we hold the output watermark, >>>>>>>>>>>> otherwise the >>>>>>>>>>>> watermark might "jump over" a buffered element transitioning it >>>>>>>>>>>> from >>>>>>>>>>>> "on-time" to "late", which would be a correctness bug (we can >>>>>>>>>>>> transition >>>>>>>>>>>> elements only from "late" to "on-time", never the other way >>>>>>>>>>>> around). The >>>>>>>>>>>> alternative is to use @FinishBundle to do the flushing, but might >>>>>>>>>>>> not be >>>>>>>>>>>> appropriate here. >>>>>>>>>>>> >>>>>>>>>>>> Currently, the only way to limit the progress of output >>>>>>>>>>>> watermark is by setting a timer with output timestamp that has the >>>>>>>>>>>> timestamp of the earliest element in the buffer. There was a >>>>>>>>>>>> thread that >>>>>>>>>>>> was discussing this in more details [1]. >>>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8 >>>>>>>>>>>> On 3/7/22 19:54, Evan Galpin wrote: >>>>>>>>>>>> >>>>>>>>>>>> In general, I'd also really like to improve my understanding >>>>>>>>>>>> and learn more about how the employed buffering can cause this >>>>>>>>>>>> skew. Is it >>>>>>>>>>>> because the call to "flush" is happening from a different "current >>>>>>>>>>>> window" >>>>>>>>>>>> than the elements were originally buffered from? I'm actually >>>>>>>>>>>> thinking >>>>>>>>>>>> that the PR[1] to "fix" this would have had the side effect of >>>>>>>>>>>> outputting >>>>>>>>>>>> buffered elements into the window from which "flush" was called >>>>>>>>>>>> rather than >>>>>>>>>>>> the window from which the buffered data originated. I suppose that >>>>>>>>>>>> could be >>>>>>>>>>>> problematic, but should at least satisfy the validation code. >>>>>>>>>>>> >>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin < >>>>>>>>>>>> evan.gal...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> x-post from the associated Jira ticket[0] >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Fortunately/unfortunately this same issue struck me as well, >>>>>>>>>>>>> and I opened a PR[1] to use `ProcessContext#output` rather >>>>>>>>>>>>> than `ProcessContext#outputWithTimestamp`. I believe that should >>>>>>>>>>>>> resolve >>>>>>>>>>>>> this issue, it has for me when running jobs with a vendored SDK >>>>>>>>>>>>> with that >>>>>>>>>>>>> change included. Do folks feel this change to be cherry-picked >>>>>>>>>>>>> into 2.37.0? >>>>>>>>>>>>> >>>>>>>>>>>>> The change also prompted a question to the mailing list[2] >>>>>>>>>>>>> about skew validation difference between ProcessContext vs >>>>>>>>>>>>> FinishBundleContext (where there is no ability to compute skew as >>>>>>>>>>>>> I >>>>>>>>>>>>> understand it). >>>>>>>>>>>>> >>>>>>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>>>>> >>>>>>>>>>>>> [2] >>>>>>>>>>>>> https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> This is specifically a case where the @ProcessElement saw >>>>>>>>>>>>>> window X for element X0 and buffered it into memory and then when >>>>>>>>>>>>>> processing window Y and element Y0 wanted to flush previously >>>>>>>>>>>>>> buffered >>>>>>>>>>>>>> element X0. This all occurred as part of the same bundle. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In general, yes, outputting to an earlier window is >>>>>>>>>>>>>> problematic. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Outputting to an earlier window is problematic,as the >>>>>>>>>>>>>>> watermark can never be correct if a DoFn can move time backwards >>>>>>>>>>>>>>> arbitrarily. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> A good question would be should I be able to output to a >>>>>>>>>>>>>>>> different window from the current @ProcessElement call, like >>>>>>>>>>>>>>>> what we can do >>>>>>>>>>>>>>>> from @FinishBundle to handle these buffering scenarios. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results >>>>>>>>>>>>>>>>> from elements in window X and then trying to output them in >>>>>>>>>>>>>>>>> window Y when >>>>>>>>>>>>>>>>> flushing the batch. This exposed a bug where elements that >>>>>>>>>>>>>>>>> were being >>>>>>>>>>>>>>>>> buffered were being output as part of a different window than >>>>>>>>>>>>>>>>> what the >>>>>>>>>>>>>>>>> window that produced them was. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This became visible because validation was added recently >>>>>>>>>>>>>>>>> to ensure that when the pipeline is processing elements in >>>>>>>>>>>>>>>>> window X that >>>>>>>>>>>>>>>>> output with a timestamp is valid for window X. Note that this >>>>>>>>>>>>>>>>> validation >>>>>>>>>>>>>>>>> only occurs in @ProcessElement since output is associated >>>>>>>>>>>>>>>>> with the current >>>>>>>>>>>>>>>>> window with the input element that is being processed. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no >>>>>>>>>>>>>>>>> existing windowing context and when you output that element >>>>>>>>>>>>>>>>> is assigned to >>>>>>>>>>>>>>>>> an appropriate window. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis < >>>>>>>>>>>>>>>>> emils.solma...@rvu.co.uk> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch >>>>>>>>>>>>>>>>>> writing. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m >>>>>>>>>>>>>>>>>> reasonably certain it’s this PR >>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/15381 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and >>>>>>>>>>>>>>>>>> write to Elastic in a streaming job, the config for the >>>>>>>>>>>>>>>>>> source and sink is >>>>>>>>>>>>>>>>>> respectively >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>>>>>>>>>>>>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>>>>>>>>>>>>>>> .setCoder(KryoCoder.of()) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ElasticsearchIO.write() >>>>>>>>>>>>>>>>>> .withUseStatefulBatches(true) >>>>>>>>>>>>>>>>>> .withMaxParallelRequestsPerWindow(1) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>>>>>>>>>>>>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>>>>>>>>>>>>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>>>>>>>>>>>>>>> // # of docs >>>>>>>>>>>>>>>>>> .withMaxBatchSize(1000) >>>>>>>>>>>>>>>>>> .withConnectionConfiguration( >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>>>>>>>>>>>>>>> arrayOf(host), >>>>>>>>>>>>>>>>>> "fubar", >>>>>>>>>>>>>>>>>> "_doc" >>>>>>>>>>>>>>>>>> ).withConnectTimeout(5000) >>>>>>>>>>>>>>>>>> .withSocketTimeout(30000) >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> .withRetryConfiguration( >>>>>>>>>>>>>>>>>> ElasticsearchIO.RetryConfiguration.create( >>>>>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>>>>>> // the duration is wall clock, against >>>>>>>>>>>>>>>>>> the connection and socket timeouts specified >>>>>>>>>>>>>>>>>> // above. I.e., 10 x 30s is gonna be >>>>>>>>>>>>>>>>>> more than 3 minutes, so if we're getting >>>>>>>>>>>>>>>>>> // 10 socket timeouts in a row, this >>>>>>>>>>>>>>>>>> would ignore the "10" part and terminate >>>>>>>>>>>>>>>>>> // after 6. The idea is that in a mixed >>>>>>>>>>>>>>>>>> failure mode, you'd get different timeouts >>>>>>>>>>>>>>>>>> // of different durations, and on >>>>>>>>>>>>>>>>>> average 10 x fails < 4m. >>>>>>>>>>>>>>>>>> // That said, 4m is arbitrary, so adjust >>>>>>>>>>>>>>>>>> as and when needed. >>>>>>>>>>>>>>>>>> Duration.standardMinutes(4) >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>>>>>>>>>>>>>>> .withIndexFn { f: JsonNode -> >>>>>>>>>>>>>>>>>> f["schema_name"].asText() } >>>>>>>>>>>>>>>>>> .withIsDeleteFn { f: JsonNode -> >>>>>>>>>>>>>>>>>> f["_action"].asText("noop") == "delete" } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately >>>>>>>>>>>>>>>>>> hit a bug in the consumer, due to alleged time skew, >>>>>>>>>>>>>>>>>> specifically >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>>>>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with >>>>>>>>>>>>>>>>>> timestamp 2022-03-07T10:43:38.640Z. Output timestamps must >>>>>>>>>>>>>>>>>> be no earlier than the timestamp of the >>>>>>>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed >>>>>>>>>>>>>>>>>> skew (0 milliseconds) and no later than >>>>>>>>>>>>>>>>>> 294247-01-10T04:00:54.775Z. See the >>>>>>>>>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc >>>>>>>>>>>>>>>>>> for details on changing the allowed skew. >>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first >>>>>>>>>>>>>>>>>> version this breaks, and it seems like the code in the trace >>>>>>>>>>>>>>>>>> is largely >>>>>>>>>>>>>>>>>> added by the PR linked above. The error usually claims a >>>>>>>>>>>>>>>>>> skew of a few >>>>>>>>>>>>>>>>>> seconds, but obviously I can’t override >>>>>>>>>>>>>>>>>> getAllowedTimestampSkew() on the internal Elastic DoFn, >>>>>>>>>>>>>>>>>> and it’s marked deprecated anyway. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the >>>>>>>>>>>>>>>>>> code was intending to fix, and additionally, I’d also be >>>>>>>>>>>>>>>>>> happy if someone >>>>>>>>>>>>>>>>>> else can reproduce this or knows of similar reports. I feel >>>>>>>>>>>>>>>>>> like what we’re >>>>>>>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have >>>>>>>>>>>>>>>>>> thought someone else would have hit this by now. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> Emils >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>