I also wanted to clarify–for my own understanding–a potential discrepancy between 2 descriptions in this thread of intra-bundle watermark handling. Which is correct? Or are both correct and not at odds in the way I am understanding them to be?
Watermark can and will be updated between invocations of @ProcessElement for elements within a single bundle[1] (emphasis mine): > a) either we allow in the model to *change the output watermark *while* > processing a bundle*, - in which case it is logical requirement to have > *output > timestamp from @ProcessElement no earlier than timestamp of the current > element* (because that way we preserve the "on time", "late" status of > the current element, we don't swap anything), or > > ... > > I'm afraid that our current position is a). > Watermark is held by runners until all elements of a singular bundle are processed[2]" > Within the lifetime of a single bundle, you don't need to worry about > holding the watermark back as the runners do that for you. [1] https://lists.apache.org/thread/7m1gx1o7br7jfblhz4yl7czwhhgc7ml3 [2] https://lists.apache.org/thread/fnrlw7rkcg4of5xyc7s43mwz00dgqf72 On Fri, Mar 11, 2022 at 11:00 AM Evan Galpin <[email protected]> wrote: > Thanks Luke for all the info; I wasn't previously aware of the Reify > utilities. You raise a great point about how much simpler this IO would be > without bypassing GroupIntoBatches. When I originally wrote the Stateful > BulkIO adapter in 2018 (which sadly sat in a walled garden for a while > before it was allowed to be contributed to Beam OSS repo), it seemed to me > that stateful processing was somewhat new and not universally supported by > runners. I didn't want to alienate users that were using ElasticsearchIO > on a runner without stateful processing. > > I think stateful processing is widely supported at this point. I would be > happy to remove bundle-based batching from ElasticsearchIO if it's > acceptable that an IO cannot be used on a runner without state support. > For what it's worth, bundle-based batching created such small batches in my > experience that it was almost unusable at scale with Elasticsearch anyway. > > Thanks, > Evan > > On Thu, Mar 10, 2022 at 4:02 PM Luke Cwik <[email protected]> wrote: > >> The ElasticsearchIO implementation seems to only buffer elements within a >> single bundle. Within the lifetime of a single bundle, you don't need to >> worry about holding the watermark back as the runners do that for you. Once >> you cross bundle boundaries via a stateful DoFn processing the same key >> multiple times or a splittable DoFn processing another part of the >> restriction do you have to handle the watermark by either setting the >> output time for a timer or controlling the watermark with the watermark >> estimator. >> >> The API for @ProcessElement doesn't allow you to output buffered elements >> with windowing information cleanly since there are use cases like the one >> that you have built where you want to buffer in memory and want to produce >> the entire record with windowing information at some point of time in the >> future. >> >> You could do as Reuven suggested where you structure the pipeline like: >> ... -> Reify.timestamps[1] -> Window.into(Default GlobalWindow strategy) >> -> (optional) GroupIntoBatches -> ParDo(BulkIOFn/StatefulBulkIOFn) -> >> Window.into(OriginalWindowingStrategy) -> ... >> >> Why do we opt-out of using the GroupIntoBatches route in ElasticsearchIO >> code? >> >> If we didn't need to opt out you could do something like: >> GroupIntoBatches -> ParDo(StatefulBulkIOFn) >> and the StatefulBulkIOFn would process the iterable as a batch that is >> sent to ElasticsearchIO. >> >> 1: >> https://github.com/apache/beam/blob/c76a1c8361b83dd85b6edf0eddd5ebe03873e0ce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java#L220 >> >> >> >> >> On Thu, Mar 10, 2022 at 11:50 AM Reuven Lax <[email protected]> wrote: >> >>> >>> >>> On Thu, Mar 10, 2022 at 11:47 AM Evan Galpin <[email protected]> >>> wrote: >>> >>>> BigQuery...implementation could have preserved the windowing >>>>> information and restored it later before outputting those records into the >>>>> successful/failed write PCollections. >>>> >>>> >>>> @Luke Cwik <[email protected]> By "preserve" do you mean creating an >>>> in-memory data structure to hold windowing info and make use of that info >>>> later? Would that also imply holding the watermark so that any windows >>>> in-memory could not be "passed over" before being restored? I'd be keen to >>>> hear more about how one would ideally preserve and restore window >>>> information. >>>> >>> >>> One way is to preserve timestamps and simply reapply the WindowFn >>> >>> >>>> >>>> I agree that outputting to arbitrary windows is Bad™. It's worth >>>> noting that RedisIO[1] also has this pattern of arbitrary window output via >>>> the same mechanism that ElasticsearchIO employs. >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/0d007a7336e3bbeaa159111f3888f118dc56d0fe/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L433-L455 >>>> >>>> On Thu, Mar 10, 2022 at 2:37 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> IMO the fact that BigQueryIO doesn't preserve windowing when >>>>> outputting successful or failed elements was a bug. >>>>> >>>>> On Thu, Mar 10, 2022 at 11:34 AM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> I believe preserving the input window and element timestamp makes the >>>>>> most sense. Outputting in the global window seems like a distant second. >>>>>> Outputting in an arbitrary different window or with an arbitrary >>>>>> timestamp >>>>>> is incorrect. >>>>>> >>>>>> BigQuery outputs successful and failed writes in the global window >>>>>> but the implementation could have preserved the windowing information and >>>>>> restored it later before outputting those records into the >>>>>> successful/failed write PCollections. Based upon the code it seems like >>>>>> global windows was used to get around an implementation detail that is >>>>>> sub-optimal within Dataflow's GroupIntoBatches implementation. >>>>>> >>>>>> On Thu, Mar 10, 2022 at 6:26 AM Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> But I'm not sure if an IO throwing away windows (in favour of >>>>>>>>> Global) would be acceptable either. BigQueryIO#write (streaming) >>>>>>>>> also has >>>>>>>>> a notion of "getFailedWrites" with the same intent as what's in >>>>>>>>> ElasticsearchIO; I wonder how that's implemented? >>>>>>>>> >>>>>>>> >>>>>>>> It appears that BigQueryIO[1] just re-windows inputs into the >>>>>>>> global window and does not concern itself with preserving input >>>>>>>> windows of >>>>>>>> elements nor holding the watermark. Is that a good precedent to >>>>>>>> follow? >>>>>>>> It would certainly simplify the implementation of >>>>>>>> ElasticsearchIO#bulkIO. >>>>>>>> >>>>>>> >>>>>>> Big picture answer: Windowing into the global window happens whether >>>>>>> you remember to do it or not. The world outside of Beam doesn't relate >>>>>>> to >>>>>>> Beam's windows. Doing the re-windowing explicitly might make you more >>>>>>> likely to author the best code and/or offer the users the needed API to >>>>>>> reify windowing information and/or document that they need to do it >>>>>>> themselves before applying the write transform. >>>>>>> >>>>>>> Even more pedantically big picture answer: holding the watermark is >>>>>>> exactly and only "reserving the right output an element with that >>>>>>> timestamp >>>>>>> later" so you can guide your thinking with this. If you don't need to >>>>>>> output an element at a timestamp, don't hold it. >>>>>>> >>>>>>> BUT write transforms should almost always output elements describing >>>>>>> what was written. You want to think about the timestamps and windowing >>>>>>> on >>>>>>> these elements. I'm not sure what the best practice is here. TBH I could >>>>>>> see this actually functioning as a new "source" where everything is >>>>>>> global >>>>>>> window and timestamps describe the moment the data was committed, but >>>>>>> not >>>>>>> sure this is easily expressed. What do other IOs do about windowing >>>>>>> these >>>>>>> outputs? >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318 >>>>>>>> >>>>>>>> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Oops, unfortunate misfire before being done writing my prior email. >>>>>>>>> >>>>>>>>> Thanks Jan for the thoughtful responses! It definitely sounds as >>>>>>>>> though the ElasticsearchIO#write transform would be safer to either >>>>>>>>> hold >>>>>>>>> the watermark via similar machinery employed by GroupIntoBatches or >>>>>>>>> possibly output everything into a global window; either sounds more >>>>>>>>> safe in >>>>>>>>> terms of data correctness than potentially outputting "on time" data >>>>>>>>> as >>>>>>>>> "late" data. But I'm not sure if an IO throwing away windows (in >>>>>>>>> favour of >>>>>>>>> Global) would be acceptable either. BigQueryIO#write (streaming) >>>>>>>>> also has >>>>>>>>> a notion of "getFailedWrites" with the same intent as what's in >>>>>>>>> ElasticsearchIO; I wonder how that's implemented? >>>>>>>>> >>>>>>>>> I'm keen to follow any further discussion on watermark holds for >>>>>>>>> DoFn's that implement both @StartBundle and @FinishBundle! >>>>>>>>> >>>>>>>>> - Evan >>>>>>>>> >>>>>>>>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Got it. So ProcessContext#ouput has the semantics of "using the >>>>>>>>>> window associated with the current @Element, output the data being >>>>>>>>>> passed >>>>>>>>>> in into that window." Makes complete sense. >>>>>>>>>> >>>>>>>>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Interesting discussion. :-) >>>>>>>>>>> >>>>>>>>>>> Answers inline. >>>>>>>>>>> On 3/8/22 22:00, Evan Galpin wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks Jan for confirming that the fix looks alright. I also >>>>>>>>>>> found a PR[1] that appears to be a good case study of the Timer >>>>>>>>>>> watermark >>>>>>>>>>> hold technique that you previously mentioned so I'll study that a >>>>>>>>>>> bit for >>>>>>>>>>> my own understanding and future use. I was also previously missing >>>>>>>>>>> the >>>>>>>>>>> notion that a singular bundle could contain elements from many >>>>>>>>>>> disparate >>>>>>>>>>> Windows, so that's a great upgrade to my own mental model haha. >>>>>>>>>>> >>>>>>>>>>> I have a few follow-up questions for my own understanding >>>>>>>>>>> (hopefully not off topic). >>>>>>>>>>> >>>>>>>>>>> 1. Am I understanding correctly that using >>>>>>>>>>> ProcessContext#output to output a given element will preserve >>>>>>>>>>> the element's >>>>>>>>>>> input window(s)? Or is it the case that when using >>>>>>>>>>> ProcessContext#output, >>>>>>>>>>> any buffered elements will all be output to whatever window the >>>>>>>>>>> most recent >>>>>>>>>>> element processed by @ProcessElement belongs to? >>>>>>>>>>> >>>>>>>>>>> There is no way Beam can distinguish *how* you produced elements >>>>>>>>>>> emitted from @ProcessElement (if those are result of some in-memory >>>>>>>>>>> buffering or direct result of computation of the single input >>>>>>>>>>> element >>>>>>>>>>> currently being processed). From that it follows, that window(s) >>>>>>>>>>> associated >>>>>>>>>>> with the output will be equal to window(s) of the input element >>>>>>>>>>> currently >>>>>>>>>>> processed (generally, the model allows multiple applications of >>>>>>>>>>> window >>>>>>>>>>> function, which is why window.maxTimestamp is 1 ms shifted >>>>>>>>>>> backwards, to be >>>>>>>>>>> still part of the window itself, so that the window functions based >>>>>>>>>>> on >>>>>>>>>>> timestamps are idempotent). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 1. Is it safe to emit outputs from @FinishBundle to windows >>>>>>>>>>> which may be older than the watermark? I believe this could >>>>>>>>>>> still be >>>>>>>>>>> happening given the current ElasticsearchIO implementation where >>>>>>>>>>> any >>>>>>>>>>> buffered elements are output in @FinishBundle using the same >>>>>>>>>>> window they were associated with on input. Intuitively, that >>>>>>>>>>> sounds as >>>>>>>>>>> though it could be a correctness bug if the watermark had >>>>>>>>>>> progressed beyond >>>>>>>>>>> those windows. >>>>>>>>>>> >>>>>>>>>>> This is great question that probably deserves deeper attention. >>>>>>>>>>> >>>>>>>>>>> a) first of all - the fact, that you potentially could output >>>>>>>>>>> late data, that previously were not late is _potentially_ a >>>>>>>>>>> correctness >>>>>>>>>>> bug, if you have any downstream logic, that performs any additional >>>>>>>>>>> grouping and - most notably - relies on causality being preserved >>>>>>>>>>> (which is >>>>>>>>>>> how our universe works, so this is a natural requirement). Changing >>>>>>>>>>> element >>>>>>>>>>> from "on time" to "late" can result in downstream processing see >>>>>>>>>>> effect >>>>>>>>>>> prior to cause (in event time!, because the watermark move can cause >>>>>>>>>>> downstream timers to fire). I wrote something about this in [1]. >>>>>>>>>>> Generally, >>>>>>>>>>> if there is no grouping (or your application logic accounts for the >>>>>>>>>>> swapping of cause and effect), then this is not an issue. If you >>>>>>>>>>> produce a >>>>>>>>>>> PCollection for user, then you don't know the user does with it and >>>>>>>>>>> therefore should pay attention to it. >>>>>>>>>>> >>>>>>>>>>> b) second, this effect would not appear if mid-bundle output >>>>>>>>>>> watermarks updates were not possible. I emphasize that I don't know >>>>>>>>>>> if this >>>>>>>>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or >>>>>>>>>>> @luke can >>>>>>>>>>> you correct me if I'm wrong?). I have some doubts if it is correct, >>>>>>>>>>> though. >>>>>>>>>>> It seems that it causes generally issues with in-memory buffering >>>>>>>>>>> and >>>>>>>>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements >>>>>>>>>>> @StartBundle >>>>>>>>>>> and @FinishBundle it seems, that we should hold output watermark >>>>>>>>>>> between >>>>>>>>>>> these two calls. But that would turn stateless DoFn into stateful, >>>>>>>>>>> which is >>>>>>>>>>> ... unfortunate. :) >>>>>>>>>>> >>>>>>>>>>> I hope I made things a little more clear rather than more >>>>>>>>>>> obfuscated. :) >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> >>>>>>>>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> It's definitely interesting to think about the idea of enforcing >>>>>>>>>>> watermark update only between bundles, but presumably that would >>>>>>>>>>> mean quite >>>>>>>>>>> extensive alterations. >>>>>>>>>>> >>>>>>>>>>> - Evan >>>>>>>>>>> >>>>>>>>>>> [1] https://github.com/apache/beam/pull/15249 >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Evan, >>>>>>>>>>>> >>>>>>>>>>>> the fix looks good to me, as long as the timestamp of the >>>>>>>>>>>> buffered data need to be preserved downstream. Generally I think it >>>>>>>>>>>> *should* be possible to output in-memory buffered data in >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> (and @FinishBundle), the case where you need timers is when your >>>>>>>>>>>> buffer >>>>>>>>>>>> needs to span multiple bundles. It that case it also must be >>>>>>>>>>>> stored in >>>>>>>>>>>> regular state and not in-memory. >>>>>>>>>>>> >>>>>>>>>>>> It seems to me that there is some logical gap in how we handle >>>>>>>>>>>> the per-bundle buffering. I see two possibilities: >>>>>>>>>>>> >>>>>>>>>>>> a) either we allow in the model to change the output watermark >>>>>>>>>>>> *while* processing a bundle, - in which case it is logical >>>>>>>>>>>> requirement to >>>>>>>>>>>> have output timestamp from @ProcessElement no earlier than >>>>>>>>>>>> timestamp of the >>>>>>>>>>>> current element (because that way we preserve the "on time", >>>>>>>>>>>> "late" status >>>>>>>>>>>> of the current element, we don't swap anything), or >>>>>>>>>>>> >>>>>>>>>>>> b) we enforce output watermark update only in-between of >>>>>>>>>>>> bundles - in that case the requirement could be relaxed that the >>>>>>>>>>>> output >>>>>>>>>>>> timestamp from @ProcessElement might be no earlier than the >>>>>>>>>>>> minimum of >>>>>>>>>>>> timestamps inside the bundle >>>>>>>>>>>> >>>>>>>>>>>> I'm afraid that our current position is a). But in that case it >>>>>>>>>>>> is somewhat questionable if it is semantically correct to use >>>>>>>>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. >>>>>>>>>>>> It can >>>>>>>>>>>> move timestamps only to future instant (and inside same window!), >>>>>>>>>>>> which has >>>>>>>>>>>> little semantic meaning to me. Looks more error prone than useful. >>>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> On 3/8/22 15:53, Evan Galpin wrote: >>>>>>>>>>>> >>>>>>>>>>>> Thanks Jan, it's interesting to read about the handling of >>>>>>>>>>>> timestamp in cases employing a buffering pattern. In the case of >>>>>>>>>>>> the ES >>>>>>>>>>>> write transform, buffered data could be output from ProcessElement >>>>>>>>>>>> or >>>>>>>>>>>> FinishBundle. It's the case where data is output from >>>>>>>>>>>> ProcessElement that >>>>>>>>>>>> the error reported at the start of this thread shows up. Based on >>>>>>>>>>>> what you >>>>>>>>>>>> described, it sounds like the PR[1] I made to "fix" this issue is >>>>>>>>>>>> actually >>>>>>>>>>>> fixing it by transitioning data from being late to being on time >>>>>>>>>>>> and >>>>>>>>>>>> outputting all buffered data into a non-deterministic window where >>>>>>>>>>>> the >>>>>>>>>>>> output heuristic is later satisfied (number of elements buffered >>>>>>>>>>>> or max >>>>>>>>>>>> time since last output). It seems there's 2 issues as a result: >>>>>>>>>>>> >>>>>>>>>>>> 1. The window to which input elements belong is not being >>>>>>>>>>>> preserved. Presumably we want IOs to leave an element's window >>>>>>>>>>>> unaltered? >>>>>>>>>>>> 2. The watermark of the system might be incorrect given that >>>>>>>>>>>> buffered data is assumed processed and allows the watermark to >>>>>>>>>>>> pass? >>>>>>>>>>>> >>>>>>>>>>>> It would definitely add complexity to the IO to employ timers >>>>>>>>>>>> to address this, but if that's the preferred or only solution I'll >>>>>>>>>>>> put some >>>>>>>>>>>> thought into how to implement that solution. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Evan >>>>>>>>>>>> >>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to >>>>>>>>>>>>> update output watermark while a bundle is being processed? That >>>>>>>>>>>>> seems it >>>>>>>>>>>>> could also cause the "watermark skip" problem, which is >>>>>>>>>>>>> definitely an issue >>>>>>>>>>>>> (and is probably the reason why the check fails?). >>>>>>>>>>>>> On 3/8/22 09:35, Jan Lukavský wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> The buffering seems incorrect to me. Whenever there is a >>>>>>>>>>>>> buffer, we need to make sure we hold the output watermark, >>>>>>>>>>>>> otherwise the >>>>>>>>>>>>> watermark might "jump over" a buffered element transitioning it >>>>>>>>>>>>> from >>>>>>>>>>>>> "on-time" to "late", which would be a correctness bug (we can >>>>>>>>>>>>> transition >>>>>>>>>>>>> elements only from "late" to "on-time", never the other way >>>>>>>>>>>>> around). The >>>>>>>>>>>>> alternative is to use @FinishBundle to do the flushing, but might >>>>>>>>>>>>> not be >>>>>>>>>>>>> appropriate here. >>>>>>>>>>>>> >>>>>>>>>>>>> Currently, the only way to limit the progress of output >>>>>>>>>>>>> watermark is by setting a timer with output timestamp that has the >>>>>>>>>>>>> timestamp of the earliest element in the buffer. There was a >>>>>>>>>>>>> thread that >>>>>>>>>>>>> was discussing this in more details [1]. >>>>>>>>>>>>> >>>>>>>>>>>>> Jan >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8 >>>>>>>>>>>>> On 3/7/22 19:54, Evan Galpin wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> In general, I'd also really like to improve my understanding >>>>>>>>>>>>> and learn more about how the employed buffering can cause this >>>>>>>>>>>>> skew. Is it >>>>>>>>>>>>> because the call to "flush" is happening from a different >>>>>>>>>>>>> "current window" >>>>>>>>>>>>> than the elements were originally buffered from? I'm actually >>>>>>>>>>>>> thinking >>>>>>>>>>>>> that the PR[1] to "fix" this would have had the side effect of >>>>>>>>>>>>> outputting >>>>>>>>>>>>> buffered elements into the window from which "flush" was called >>>>>>>>>>>>> rather than >>>>>>>>>>>>> the window from which the buffered data originated. I suppose >>>>>>>>>>>>> that could be >>>>>>>>>>>>> problematic, but should at least satisfy the validation code. >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> x-post from the associated Jira ticket[0] >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Fortunately/unfortunately this same issue struck me as well, >>>>>>>>>>>>>> and I opened a PR[1] to use `ProcessContext#output` rather >>>>>>>>>>>>>> than `ProcessContext#outputWithTimestamp`. I believe that >>>>>>>>>>>>>> should resolve >>>>>>>>>>>>>> this issue, it has for me when running jobs with a vendored SDK >>>>>>>>>>>>>> with that >>>>>>>>>>>>>> change included. Do folks feel this change to be cherry-picked >>>>>>>>>>>>>> into 2.37.0? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The change also prompted a question to the mailing list[2] >>>>>>>>>>>>>> about skew validation difference between ProcessContext vs >>>>>>>>>>>>>> FinishBundleContext (where there is no ability to compute skew >>>>>>>>>>>>>> as I >>>>>>>>>>>>>> understand it). >>>>>>>>>>>>>> >>>>>>>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744 >>>>>>>>>>>>>> >>>>>>>>>>>>>> [2] >>>>>>>>>>>>>> https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is specifically a case where the @ProcessElement saw >>>>>>>>>>>>>>> window X for element X0 and buffered it into memory and then >>>>>>>>>>>>>>> when >>>>>>>>>>>>>>> processing window Y and element Y0 wanted to flush previously >>>>>>>>>>>>>>> buffered >>>>>>>>>>>>>>> element X0. This all occurred as part of the same bundle. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In general, yes, outputting to an earlier window is >>>>>>>>>>>>>>> problematic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Outputting to an earlier window is problematic,as the >>>>>>>>>>>>>>>> watermark can never be correct if a DoFn can move time >>>>>>>>>>>>>>>> backwards >>>>>>>>>>>>>>>> arbitrarily. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> A good question would be should I be able to output to a >>>>>>>>>>>>>>>>> different window from the current @ProcessElement call, like >>>>>>>>>>>>>>>>> what we can do >>>>>>>>>>>>>>>>> from @FinishBundle to handle these buffering scenarios. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results >>>>>>>>>>>>>>>>>> from elements in window X and then trying to output them in >>>>>>>>>>>>>>>>>> window Y when >>>>>>>>>>>>>>>>>> flushing the batch. This exposed a bug where elements that >>>>>>>>>>>>>>>>>> were being >>>>>>>>>>>>>>>>>> buffered were being output as part of a different window >>>>>>>>>>>>>>>>>> than what the >>>>>>>>>>>>>>>>>> window that produced them was. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This became visible because validation was added recently >>>>>>>>>>>>>>>>>> to ensure that when the pipeline is processing elements in >>>>>>>>>>>>>>>>>> window X that >>>>>>>>>>>>>>>>>> output with a timestamp is valid for window X. Note that >>>>>>>>>>>>>>>>>> this validation >>>>>>>>>>>>>>>>>> only occurs in @ProcessElement since output is associated >>>>>>>>>>>>>>>>>> with the current >>>>>>>>>>>>>>>>>> window with the input element that is being processed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no >>>>>>>>>>>>>>>>>> existing windowing context and when you output that element >>>>>>>>>>>>>>>>>> is assigned to >>>>>>>>>>>>>>>>>> an appropriate window. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch >>>>>>>>>>>>>>>>>>> writing. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m >>>>>>>>>>>>>>>>>>> reasonably certain it’s this PR >>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/15381 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and >>>>>>>>>>>>>>>>>>> write to Elastic in a streaming job, the config for the >>>>>>>>>>>>>>>>>>> source and sink is >>>>>>>>>>>>>>>>>>> respectively >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription) >>>>>>>>>>>>>>>>>>> ).apply(ParseJsons.of(OurObject::class.java)) >>>>>>>>>>>>>>>>>>> .setCoder(KryoCoder.of()) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ElasticsearchIO.write() >>>>>>>>>>>>>>>>>>> .withUseStatefulBatches(true) >>>>>>>>>>>>>>>>>>> .withMaxParallelRequestsPerWindow(1) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30)) >>>>>>>>>>>>>>>>>>> // 5 bytes **> KiB **> MiB, so 5 MiB >>>>>>>>>>>>>>>>>>> .withMaxBatchSizeBytes(5L * 1024 * 1024) >>>>>>>>>>>>>>>>>>> // # of docs >>>>>>>>>>>>>>>>>>> .withMaxBatchSize(1000) >>>>>>>>>>>>>>>>>>> .withConnectionConfiguration( >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create( >>>>>>>>>>>>>>>>>>> arrayOf(host), >>>>>>>>>>>>>>>>>>> "fubar", >>>>>>>>>>>>>>>>>>> "_doc" >>>>>>>>>>>>>>>>>>> ).withConnectTimeout(5000) >>>>>>>>>>>>>>>>>>> .withSocketTimeout(30000) >>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>> .withRetryConfiguration( >>>>>>>>>>>>>>>>>>> ElasticsearchIO.RetryConfiguration.create( >>>>>>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>>>>>>> // the duration is wall clock, against >>>>>>>>>>>>>>>>>>> the connection and socket timeouts specified >>>>>>>>>>>>>>>>>>> // above. I.e., 10 x 30s is gonna be >>>>>>>>>>>>>>>>>>> more than 3 minutes, so if we're getting >>>>>>>>>>>>>>>>>>> // 10 socket timeouts in a row, this >>>>>>>>>>>>>>>>>>> would ignore the "10" part and terminate >>>>>>>>>>>>>>>>>>> // after 6. The idea is that in a mixed >>>>>>>>>>>>>>>>>>> failure mode, you'd get different timeouts >>>>>>>>>>>>>>>>>>> // of different durations, and on >>>>>>>>>>>>>>>>>>> average 10 x fails < 4m. >>>>>>>>>>>>>>>>>>> // That said, 4m is arbitrary, so >>>>>>>>>>>>>>>>>>> adjust as and when needed. >>>>>>>>>>>>>>>>>>> Duration.standardMinutes(4) >>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>> .withIdFn { f: JsonNode -> f["id"].asText() } >>>>>>>>>>>>>>>>>>> .withIndexFn { f: JsonNode -> >>>>>>>>>>>>>>>>>>> f["schema_name"].asText() } >>>>>>>>>>>>>>>>>>> .withIsDeleteFn { f: JsonNode -> >>>>>>>>>>>>>>>>>>> f["_action"].asText("noop") == "delete" } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately >>>>>>>>>>>>>>>>>>> hit a bug in the consumer, due to alleged time skew, >>>>>>>>>>>>>>>>>>> specifically >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: >>>>>>>>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with >>>>>>>>>>>>>>>>>>> timestamp 2022-03-07T10:43:38.640Z. Output timestamps must >>>>>>>>>>>>>>>>>>> be no earlier than the timestamp of the >>>>>>>>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed >>>>>>>>>>>>>>>>>>> skew (0 milliseconds) and no later than >>>>>>>>>>>>>>>>>>> 294247-01-10T04:00:54.775Z. See the >>>>>>>>>>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc >>>>>>>>>>>>>>>>>>> for details on changing the allowed skew. >>>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) >>>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) >>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) >>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) >>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) >>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first >>>>>>>>>>>>>>>>>>> version this breaks, and it seems like the code in the >>>>>>>>>>>>>>>>>>> trace is largely >>>>>>>>>>>>>>>>>>> added by the PR linked above. The error usually claims a >>>>>>>>>>>>>>>>>>> skew of a few >>>>>>>>>>>>>>>>>>> seconds, but obviously I can’t override >>>>>>>>>>>>>>>>>>> getAllowedTimestampSkew() on the internal Elastic DoFn, >>>>>>>>>>>>>>>>>>> and it’s marked deprecated anyway. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the >>>>>>>>>>>>>>>>>>> code was intending to fix, and additionally, I’d also be >>>>>>>>>>>>>>>>>>> happy if someone >>>>>>>>>>>>>>>>>>> else can reproduce this or knows of similar reports. I feel >>>>>>>>>>>>>>>>>>> like what we’re >>>>>>>>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would >>>>>>>>>>>>>>>>>>> have thought someone else would have hit this by now. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Emils >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>
