Thanks Luke for all the info; I wasn't previously aware of the Reify
utilities.  You raise a great point about how much simpler this IO would be
without bypassing GroupIntoBatches.  When I originally wrote the Stateful
BulkIO adapter in 2018 (which sadly sat in a walled garden for a while
before it was allowed to be contributed to Beam OSS repo), it seemed to me
that stateful processing was somewhat new and not universally supported by
runners.  I didn't want to alienate users that were using ElasticsearchIO
on a runner without stateful processing.

I think stateful processing is widely supported at this point.  I would be
happy to remove bundle-based batching from ElasticsearchIO if it's
acceptable that an IO cannot be used on a runner without state support.
For what it's worth, bundle-based batching created such small batches in my
experience that it was almost unusable at scale with Elasticsearch anyway.

Thanks,
Evan

On Thu, Mar 10, 2022 at 4:02 PM Luke Cwik <lc...@google.com> wrote:

> The ElasticsearchIO implementation seems to only buffer elements within a
> single bundle. Within the lifetime of a single bundle, you don't need to
> worry about holding the watermark back as the runners do that for you. Once
> you cross bundle boundaries via a stateful DoFn processing the same key
> multiple times or a splittable DoFn processing another part of the
> restriction do you have to handle the watermark by either setting the
> output time for a timer or controlling the watermark with the watermark
> estimator.
>
> The API for @ProcessElement doesn't allow you to output buffered elements
> with windowing information cleanly since there are use cases like the one
> that you have built where you want to buffer in memory and want to produce
> the entire record with windowing information at some point of time in the
> future.
>
> You could do as Reuven suggested where you structure the pipeline like:
> ... -> Reify.timestamps[1] -> Window.into(Default GlobalWindow strategy)
> -> (optional) GroupIntoBatches -> ParDo(BulkIOFn/StatefulBulkIOFn) ->
> Window.into(OriginalWindowingStrategy) -> ...
>
> Why do we opt-out of using the GroupIntoBatches route in ElasticsearchIO
> code?
>
> If we didn't need to opt out you could do something like:
> GroupIntoBatches -> ParDo(StatefulBulkIOFn)
> and the StatefulBulkIOFn would process the iterable as a batch that is
> sent to ElasticsearchIO.
>
> 1:
> https://github.com/apache/beam/blob/c76a1c8361b83dd85b6edf0eddd5ebe03873e0ce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java#L220
>
>
>
>
> On Thu, Mar 10, 2022 at 11:50 AM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Thu, Mar 10, 2022 at 11:47 AM Evan Galpin <evan.gal...@gmail.com>
>> wrote:
>>
>>> BigQuery...implementation could have preserved the windowing information
>>>> and restored it later before outputting those records into the
>>>> successful/failed write PCollections.
>>>
>>>
>>> @Luke Cwik <lc...@google.com> By "preserve" do you mean creating an
>>> in-memory data structure to hold windowing info and make use of that info
>>> later?  Would that also imply holding the watermark so that any windows
>>> in-memory could not be "passed over" before being restored?  I'd be keen to
>>> hear more about how one would ideally preserve and restore window
>>> information.
>>>
>>
>> One way is to preserve timestamps and simply reapply the WindowFn
>>
>>
>>>
>>> I agree that outputting to arbitrary windows is Bad™.  It's worth noting
>>> that RedisIO[1] also has this pattern of arbitrary window output via the
>>> same mechanism that ElasticsearchIO employs.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/0d007a7336e3bbeaa159111f3888f118dc56d0fe/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L433-L455
>>>
>>> On Thu, Mar 10, 2022 at 2:37 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> IMO the fact that BigQueryIO doesn't preserve windowing when outputting
>>>> successful or failed elements was a bug.
>>>>
>>>> On Thu, Mar 10, 2022 at 11:34 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I believe preserving the input window and element timestamp makes the
>>>>> most sense. Outputting in the global window seems like a distant second.
>>>>> Outputting in an arbitrary different window or with an arbitrary timestamp
>>>>> is incorrect.
>>>>>
>>>>> BigQuery outputs successful and failed writes in the global window but
>>>>> the implementation could have preserved the windowing information and
>>>>> restored it later before outputting those records into the
>>>>> successful/failed write PCollections. Based upon the code it seems like
>>>>> global windows was used to get around an implementation detail that is
>>>>> sub-optimal within Dataflow's GroupIntoBatches implementation.
>>>>>
>>>>> On Thu, Mar 10, 2022 at 6:26 AM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <evan.gal...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> But I'm not sure if an IO throwing away windows (in favour of
>>>>>>>> Global) would be acceptable either.  BigQueryIO#write (streaming) also 
>>>>>>>> has
>>>>>>>> a notion of "getFailedWrites" with the same intent as what's in
>>>>>>>> ElasticsearchIO;  I wonder how that's implemented?
>>>>>>>>
>>>>>>>
>>>>>>> It appears that BigQueryIO[1] just re-windows inputs into the global
>>>>>>> window and does not concern itself with preserving input windows of
>>>>>>> elements nor holding the watermark.  Is that a good precedent to follow?
>>>>>>> It would certainly simplify the implementation of 
>>>>>>> ElasticsearchIO#bulkIO.
>>>>>>>
>>>>>>
>>>>>> Big picture answer: Windowing into the global window happens whether
>>>>>> you remember to do it or not. The world outside of Beam doesn't relate to
>>>>>> Beam's windows. Doing the re-windowing explicitly might make you more
>>>>>> likely to author the best code and/or offer the users the needed API to
>>>>>> reify windowing information and/or document that they need to do it
>>>>>> themselves before applying the write transform.
>>>>>>
>>>>>> Even more pedantically big picture answer: holding the watermark is
>>>>>> exactly and only "reserving the right output an element with that 
>>>>>> timestamp
>>>>>> later" so you can guide your thinking with this. If you don't need to
>>>>>> output an element at a timestamp, don't hold it.
>>>>>>
>>>>>> BUT write transforms should almost always output elements describing
>>>>>> what was written. You want to think about the timestamps and windowing on
>>>>>> these elements. I'm not sure what the best practice is here. TBH I could
>>>>>> see this actually functioning as a new "source" where everything is 
>>>>>> global
>>>>>> window and timestamps describe the moment the data was committed, but not
>>>>>> sure this is easily expressed. What do other IOs do about windowing these
>>>>>> outputs?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318
>>>>>>>
>>>>>>> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <evan.gal...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Oops, unfortunate misfire before being done writing my prior email.
>>>>>>>>
>>>>>>>> Thanks Jan for the thoughtful responses!  It definitely sounds as
>>>>>>>> though the ElasticsearchIO#write transform would be safer to either 
>>>>>>>> hold
>>>>>>>> the watermark via similar machinery employed by GroupIntoBatches or
>>>>>>>> possibly output everything into a global window; either sounds more 
>>>>>>>> safe in
>>>>>>>> terms of data correctness than potentially outputting "on time" data as
>>>>>>>> "late" data.  But I'm not sure if an IO throwing away windows (in 
>>>>>>>> favour of
>>>>>>>> Global) would be acceptable either.  BigQueryIO#write (streaming) also 
>>>>>>>> has
>>>>>>>> a notion of "getFailedWrites" with the same intent as what's in
>>>>>>>> ElasticsearchIO;  I wonder how that's implemented?
>>>>>>>>
>>>>>>>> I'm keen to follow any further discussion on watermark holds for
>>>>>>>> DoFn's that implement both @StartBundle and @FinishBundle!
>>>>>>>>
>>>>>>>> - Evan
>>>>>>>>
>>>>>>>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <evan.gal...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Got it.  So ProcessContext#ouput has the semantics of "using the
>>>>>>>>> window associated with the current @Element, output the data being 
>>>>>>>>> passed
>>>>>>>>> in into that window."  Makes complete sense.
>>>>>>>>>
>>>>>>>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Interesting discussion. :-)
>>>>>>>>>>
>>>>>>>>>> Answers inline.
>>>>>>>>>> On 3/8/22 22:00, Evan Galpin wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks Jan for confirming that the fix looks alright.  I also
>>>>>>>>>> found a PR[1] that appears to be a good case study of the Timer 
>>>>>>>>>> watermark
>>>>>>>>>> hold technique that you previously mentioned so I'll study that a 
>>>>>>>>>> bit for
>>>>>>>>>> my own understanding and future use.  I was also previously missing 
>>>>>>>>>> the
>>>>>>>>>> notion that a singular bundle could contain elements from many 
>>>>>>>>>> disparate
>>>>>>>>>> Windows, so that's a great upgrade to my own mental model haha.
>>>>>>>>>>
>>>>>>>>>> I have a few follow-up questions for my own understanding
>>>>>>>>>> (hopefully not off topic).
>>>>>>>>>>
>>>>>>>>>>    1. Am I understanding correctly that using
>>>>>>>>>>    ProcessContext#output to output a given element will preserve the 
>>>>>>>>>> element's
>>>>>>>>>>    input window(s)? Or is it the case that when using 
>>>>>>>>>> ProcessContext#output,
>>>>>>>>>>    any buffered elements will all be output to whatever window the 
>>>>>>>>>> most recent
>>>>>>>>>>    element processed by @ProcessElement belongs to?
>>>>>>>>>>
>>>>>>>>>> There is no way Beam can distinguish *how* you produced elements
>>>>>>>>>> emitted from @ProcessElement (if those are result of some in-memory
>>>>>>>>>> buffering or direct result of computation of the single input element
>>>>>>>>>> currently being processed). From that it follows, that window(s) 
>>>>>>>>>> associated
>>>>>>>>>> with the output will be equal to window(s) of the input element 
>>>>>>>>>> currently
>>>>>>>>>> processed (generally, the model allows multiple applications of 
>>>>>>>>>> window
>>>>>>>>>> function, which is why window.maxTimestamp is 1 ms shifted 
>>>>>>>>>> backwards, to be
>>>>>>>>>> still part of the window itself, so that the window functions based 
>>>>>>>>>> on
>>>>>>>>>> timestamps are idempotent).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    1. Is it safe to emit outputs from @FinishBundle to windows
>>>>>>>>>>    which may be older than the watermark?  I believe this could 
>>>>>>>>>> still be
>>>>>>>>>>    happening given the current ElasticsearchIO implementation where 
>>>>>>>>>> any
>>>>>>>>>>    buffered elements are output in @FinishBundle using the same
>>>>>>>>>>    window they were associated with on input.  Intuitively, that 
>>>>>>>>>> sounds as
>>>>>>>>>>    though it could be a correctness bug if the watermark had 
>>>>>>>>>> progressed beyond
>>>>>>>>>>    those windows.
>>>>>>>>>>
>>>>>>>>>> This is great question that probably deserves deeper attention.
>>>>>>>>>>
>>>>>>>>>>  a) first of all - the fact, that you potentially could output
>>>>>>>>>> late data, that previously were not late is _potentially_ a 
>>>>>>>>>> correctness
>>>>>>>>>> bug, if you have any downstream logic, that performs any additional
>>>>>>>>>> grouping and - most notably - relies on causality being preserved 
>>>>>>>>>> (which is
>>>>>>>>>> how our universe works, so this is a natural requirement). Changing 
>>>>>>>>>> element
>>>>>>>>>> from "on time" to "late" can result in downstream processing see 
>>>>>>>>>> effect
>>>>>>>>>> prior to cause (in event time!, because the watermark move can cause
>>>>>>>>>> downstream timers to fire). I wrote something about this in [1]. 
>>>>>>>>>> Generally,
>>>>>>>>>> if there is no grouping (or your application logic accounts for the
>>>>>>>>>> swapping of cause and effect), then this is not an issue. If you 
>>>>>>>>>> produce a
>>>>>>>>>> PCollection for user, then you don't know the user does with it and
>>>>>>>>>> therefore should pay attention to it.
>>>>>>>>>>
>>>>>>>>>>  b) second, this effect would not appear if mid-bundle output
>>>>>>>>>> watermarks updates were not possible. I emphasize that I don't know 
>>>>>>>>>> if this
>>>>>>>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or 
>>>>>>>>>> @luke can
>>>>>>>>>> you correct me if I'm wrong?). I have some doubts if it is correct, 
>>>>>>>>>> though.
>>>>>>>>>> It seems that it causes generally issues with in-memory buffering and
>>>>>>>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements 
>>>>>>>>>> @StartBundle
>>>>>>>>>> and @FinishBundle it seems, that we should hold output watermark 
>>>>>>>>>> between
>>>>>>>>>> these two calls. But that would turn stateless DoFn into stateful, 
>>>>>>>>>> which is
>>>>>>>>>> ... unfortunate. :)
>>>>>>>>>>
>>>>>>>>>> I hope I made things a little more clear rather than more
>>>>>>>>>> obfuscated. :)
>>>>>>>>>>
>>>>>>>>>>  Jan
>>>>>>>>>>
>>>>>>>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> It's definitely interesting to think about the idea of enforcing
>>>>>>>>>> watermark update only between bundles, but presumably that would 
>>>>>>>>>> mean quite
>>>>>>>>>> extensive alterations.
>>>>>>>>>>
>>>>>>>>>> - Evan
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/beam/pull/15249
>>>>>>>>>>
>>>>>>>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Evan,
>>>>>>>>>>>
>>>>>>>>>>> the fix looks good to me, as long as the timestamp of the
>>>>>>>>>>> buffered data need to be preserved downstream. Generally I think it
>>>>>>>>>>> *should* be possible to output in-memory buffered data in 
>>>>>>>>>>> @ProcessElement
>>>>>>>>>>> (and @FinishBundle), the case where you need timers is when your 
>>>>>>>>>>> buffer
>>>>>>>>>>> needs to span multiple bundles. It that case it also must be stored 
>>>>>>>>>>> in
>>>>>>>>>>> regular state and not in-memory.
>>>>>>>>>>>
>>>>>>>>>>> It seems to me that there is some logical gap in how we handle
>>>>>>>>>>> the per-bundle buffering. I see two possibilities:
>>>>>>>>>>>
>>>>>>>>>>>  a) either we allow in the model to change the output watermark
>>>>>>>>>>> *while* processing a bundle, - in which case it is logical 
>>>>>>>>>>> requirement to
>>>>>>>>>>> have output timestamp from @ProcessElement no earlier than 
>>>>>>>>>>> timestamp of the
>>>>>>>>>>> current element (because that way we preserve the "on time", "late" 
>>>>>>>>>>> status
>>>>>>>>>>> of the current element, we don't swap anything), or
>>>>>>>>>>>
>>>>>>>>>>>  b) we enforce output watermark update only in-between of
>>>>>>>>>>> bundles - in that case the requirement could be relaxed that the 
>>>>>>>>>>> output
>>>>>>>>>>> timestamp from @ProcessElement might be no earlier than the minimum 
>>>>>>>>>>> of
>>>>>>>>>>> timestamps inside the bundle
>>>>>>>>>>>
>>>>>>>>>>> I'm afraid that our current position is a). But in that case it
>>>>>>>>>>> is somewhat questionable if it is semantically correct to use
>>>>>>>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. 
>>>>>>>>>>> It can
>>>>>>>>>>> move timestamps only to future instant (and inside same window!), 
>>>>>>>>>>> which has
>>>>>>>>>>> little semantic meaning to me. Looks more error prone than useful.
>>>>>>>>>>>
>>>>>>>>>>>  Jan
>>>>>>>>>>> On 3/8/22 15:53, Evan Galpin wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks Jan, it's interesting to read about the handling of
>>>>>>>>>>> timestamp in cases employing a buffering pattern.  In the case of 
>>>>>>>>>>> the ES
>>>>>>>>>>> write transform, buffered data could be output from ProcessElement 
>>>>>>>>>>> or
>>>>>>>>>>> FinishBundle.  It's the case where data is output from 
>>>>>>>>>>> ProcessElement that
>>>>>>>>>>> the error reported at the start of this thread shows up.  Based on 
>>>>>>>>>>> what you
>>>>>>>>>>> described, it sounds like the PR[1] I made to "fix" this issue is 
>>>>>>>>>>> actually
>>>>>>>>>>> fixing it by transitioning data from being late to being on time and
>>>>>>>>>>> outputting all buffered data into a non-deterministic window where 
>>>>>>>>>>> the
>>>>>>>>>>> output heuristic is later satisfied (number of elements buffered or 
>>>>>>>>>>> max
>>>>>>>>>>> time since last output).  It seems there's 2 issues as a result:
>>>>>>>>>>>
>>>>>>>>>>> 1. The window to which input elements belong is not being
>>>>>>>>>>> preserved. Presumably we want IOs to leave an element's window 
>>>>>>>>>>> unaltered?
>>>>>>>>>>> 2. The watermark of the system might be incorrect given that
>>>>>>>>>>> buffered data is assumed processed and allows the watermark to pass?
>>>>>>>>>>>
>>>>>>>>>>> It would definitely add complexity to the IO to employ timers to
>>>>>>>>>>> address this, but if that's the preferred or only solution I'll put 
>>>>>>>>>>> some
>>>>>>>>>>> thought into how to implement that solution.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Evan
>>>>>>>>>>>
>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to
>>>>>>>>>>>> update output watermark while a bundle is being processed? That 
>>>>>>>>>>>> seems it
>>>>>>>>>>>> could also cause the "watermark skip" problem, which is definitely 
>>>>>>>>>>>> an issue
>>>>>>>>>>>> (and is probably the reason why the check fails?).
>>>>>>>>>>>> On 3/8/22 09:35, Jan Lukavský wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> The buffering seems incorrect to me. Whenever there is a
>>>>>>>>>>>> buffer, we need to make sure we hold the output watermark, 
>>>>>>>>>>>> otherwise the
>>>>>>>>>>>> watermark might "jump over" a buffered element transitioning it 
>>>>>>>>>>>> from
>>>>>>>>>>>> "on-time" to "late", which would be a correctness bug (we can 
>>>>>>>>>>>> transition
>>>>>>>>>>>> elements only from "late" to "on-time", never the other way 
>>>>>>>>>>>> around). The
>>>>>>>>>>>> alternative is to use @FinishBundle to do the flushing, but might 
>>>>>>>>>>>> not be
>>>>>>>>>>>> appropriate here.
>>>>>>>>>>>>
>>>>>>>>>>>> Currently, the only way to limit the progress of output
>>>>>>>>>>>> watermark is by setting a timer with output timestamp that has the
>>>>>>>>>>>> timestamp of the earliest element in the buffer. There was a 
>>>>>>>>>>>> thread that
>>>>>>>>>>>> was discussing this in more details [1].
>>>>>>>>>>>>
>>>>>>>>>>>>  Jan
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
>>>>>>>>>>>> On 3/7/22 19:54, Evan Galpin wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> In general, I'd also really like to improve my understanding
>>>>>>>>>>>> and learn more about how the employed buffering can cause this 
>>>>>>>>>>>> skew.  Is it
>>>>>>>>>>>> because the call to "flush" is happening from a different "current 
>>>>>>>>>>>> window"
>>>>>>>>>>>> than the elements were originally buffered from?  I'm actually 
>>>>>>>>>>>> thinking
>>>>>>>>>>>> that the PR[1] to "fix" this would have had the side effect of 
>>>>>>>>>>>> outputting
>>>>>>>>>>>> buffered elements into the window from which "flush" was called 
>>>>>>>>>>>> rather than
>>>>>>>>>>>> the window from which the buffered data originated. I suppose that 
>>>>>>>>>>>> could be
>>>>>>>>>>>> problematic, but should at least satisfy the validation code.
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <
>>>>>>>>>>>> evan.gal...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> x-post from the associated Jira ticket[0]
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Fortunately/unfortunately this same issue struck me as well,
>>>>>>>>>>>>> and I opened a PR[1] to use `ProcessContext#output` rather
>>>>>>>>>>>>> than `ProcessContext#outputWithTimestamp`.  I believe that should 
>>>>>>>>>>>>> resolve
>>>>>>>>>>>>> this issue, it has for me when running jobs with a vendored SDK 
>>>>>>>>>>>>> with that
>>>>>>>>>>>>> change included.  Do folks feel this change to be cherry-picked 
>>>>>>>>>>>>> into 2.37.0?
>>>>>>>>>>>>>
>>>>>>>>>>>>> The change also prompted a question to the mailing list[2]
>>>>>>>>>>>>> about skew validation difference between ProcessContext vs
>>>>>>>>>>>>> FinishBundleContext (where there is no ability to compute skew as 
>>>>>>>>>>>>> I
>>>>>>>>>>>>> understand it).
>>>>>>>>>>>>>
>>>>>>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>>>>
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is specifically a case where the @ProcessElement saw
>>>>>>>>>>>>>> window X for element X0 and buffered it into memory and then when
>>>>>>>>>>>>>> processing window Y and element Y0 wanted to flush previously 
>>>>>>>>>>>>>> buffered
>>>>>>>>>>>>>> element X0. This all occurred as part of the same bundle.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In general, yes, outputting to an earlier window is
>>>>>>>>>>>>>> problematic.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Outputting to an earlier window is problematic,as the
>>>>>>>>>>>>>>> watermark can never be correct if a DoFn can move time backwards
>>>>>>>>>>>>>>> arbitrarily.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> A good question would be should I be able to output to a
>>>>>>>>>>>>>>>> different window from the current @ProcessElement call, like 
>>>>>>>>>>>>>>>> what we can do
>>>>>>>>>>>>>>>> from @FinishBundle to handle these buffering scenarios.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results
>>>>>>>>>>>>>>>>> from elements in window X and then trying to output them in 
>>>>>>>>>>>>>>>>> window Y when
>>>>>>>>>>>>>>>>> flushing the batch. This exposed a bug where elements that 
>>>>>>>>>>>>>>>>> were being
>>>>>>>>>>>>>>>>> buffered were being output as part of a different window than 
>>>>>>>>>>>>>>>>> what the
>>>>>>>>>>>>>>>>> window that produced them was.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This became visible because validation was added recently
>>>>>>>>>>>>>>>>> to ensure that when the pipeline is processing elements in 
>>>>>>>>>>>>>>>>> window X that
>>>>>>>>>>>>>>>>> output with a timestamp is valid for window X. Note that this 
>>>>>>>>>>>>>>>>> validation
>>>>>>>>>>>>>>>>> only occurs in @ProcessElement since output is associated 
>>>>>>>>>>>>>>>>> with the current
>>>>>>>>>>>>>>>>> window with the input element that is being processed.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no
>>>>>>>>>>>>>>>>> existing windowing context and when you output that element 
>>>>>>>>>>>>>>>>> is assigned to
>>>>>>>>>>>>>>>>> an appropriate window.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>>>>>>>>>>>>> emils.solma...@rvu.co.uk> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch
>>>>>>>>>>>>>>>>>> writing.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m
>>>>>>>>>>>>>>>>>> reasonably certain it’s this PR
>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/15381
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and
>>>>>>>>>>>>>>>>>> write to Elastic in a streaming job, the config for the 
>>>>>>>>>>>>>>>>>> source and sink is
>>>>>>>>>>>>>>>>>> respectively
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>             
>>>>>>>>>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>>>>>>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>>>>>>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ElasticsearchIO.write()
>>>>>>>>>>>>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>>>>>>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>>>>>>>>>>>>             
>>>>>>>>>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>>>>>>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>>>>>>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>>>>>>>>>>>>             // # of docs
>>>>>>>>>>>>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>>>>>>>>>>>>             .withConnectionConfiguration(
>>>>>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>>>>>>>>>>>>                     arrayOf(host),
>>>>>>>>>>>>>>>>>>                     "fubar",
>>>>>>>>>>>>>>>>>>                     "_doc"
>>>>>>>>>>>>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>>>>>>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>>>>             .withRetryConfiguration(
>>>>>>>>>>>>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>>>>>>>>>>>>                     10,
>>>>>>>>>>>>>>>>>>                     // the duration is wall clock, against 
>>>>>>>>>>>>>>>>>> the connection and socket timeouts specified
>>>>>>>>>>>>>>>>>>                     // above. I.e., 10 x 30s is gonna be 
>>>>>>>>>>>>>>>>>> more than 3 minutes, so if we're getting
>>>>>>>>>>>>>>>>>>                     // 10 socket timeouts in a row, this 
>>>>>>>>>>>>>>>>>> would ignore the "10" part and terminate
>>>>>>>>>>>>>>>>>>                     // after 6. The idea is that in a mixed 
>>>>>>>>>>>>>>>>>> failure mode, you'd get different timeouts
>>>>>>>>>>>>>>>>>>                     // of different durations, and on 
>>>>>>>>>>>>>>>>>> average 10 x fails < 4m.
>>>>>>>>>>>>>>>>>>                     // That said, 4m is arbitrary, so adjust 
>>>>>>>>>>>>>>>>>> as and when needed.
>>>>>>>>>>>>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>>>>>>>>>>>>                 )
>>>>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>>>>>>>>>>>>             .withIndexFn { f: JsonNode -> 
>>>>>>>>>>>>>>>>>> f["schema_name"].asText() }
>>>>>>>>>>>>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>>>>>>>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately
>>>>>>>>>>>>>>>>>> hit a bug in the consumer, due to alleged time skew, 
>>>>>>>>>>>>>>>>>> specifically
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with 
>>>>>>>>>>>>>>>>>> timestamp 2022-03-07T10:43:38.640Z. Output timestamps must 
>>>>>>>>>>>>>>>>>> be no earlier than the timestamp of the
>>>>>>>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed 
>>>>>>>>>>>>>>>>>> skew (0 milliseconds) and no later than 
>>>>>>>>>>>>>>>>>> 294247-01-10T04:00:54.775Z. See the 
>>>>>>>>>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>>>>>>>>>>>>> for details on changing the allowed skew.
>>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first
>>>>>>>>>>>>>>>>>> version this breaks, and it seems like the code in the trace 
>>>>>>>>>>>>>>>>>> is largely
>>>>>>>>>>>>>>>>>> added by the PR linked above. The error usually claims a 
>>>>>>>>>>>>>>>>>> skew of a few
>>>>>>>>>>>>>>>>>> seconds, but obviously I can’t override
>>>>>>>>>>>>>>>>>> getAllowedTimestampSkew() on the internal Elastic DoFn,
>>>>>>>>>>>>>>>>>> and it’s marked deprecated anyway.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the
>>>>>>>>>>>>>>>>>> code was intending to fix, and additionally, I’d also be 
>>>>>>>>>>>>>>>>>> happy if someone
>>>>>>>>>>>>>>>>>> else can reproduce this or knows of similar reports. I feel 
>>>>>>>>>>>>>>>>>> like what we’re
>>>>>>>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have
>>>>>>>>>>>>>>>>>> thought someone else would have hit this by now.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Emils
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>

Reply via email to