I also wanted to clarify–for my own understanding–a potential discrepancy
between 2 descriptions in this thread of intra-bundle watermark handling.
Which is correct? Or are both correct and not at odds in the way I am
understanding them to be?

Watermark can and will be updated between invocations of @ProcessElement
for elements within a single bundle[1] (emphasis mine):

> a) either we allow in the model to *change the output watermark *while*
> processing a bundle*, - in which case it is logical requirement to have 
> *output
> timestamp from @ProcessElement no earlier than timestamp of the current
> element* (because that way we preserve the "on time", "late" status of
> the current element, we don't swap anything), or
>
> ...
>
> I'm afraid that our current position is a).
>

Watermark is held by runners until all elements of a singular bundle are
processed[2]"

> Within the lifetime of a single bundle, you don't need to worry about
> holding the watermark back as the runners do that for you.


[1] https://lists.apache.org/thread/7m1gx1o7br7jfblhz4yl7czwhhgc7ml3
[2] https://lists.apache.org/thread/fnrlw7rkcg4of5xyc7s43mwz00dgqf72

On Fri, Mar 11, 2022 at 11:00 AM Evan Galpin <[email protected]> wrote:

> Thanks Luke for all the info; I wasn't previously aware of the Reify
> utilities.  You raise a great point about how much simpler this IO would be
> without bypassing GroupIntoBatches.  When I originally wrote the Stateful
> BulkIO adapter in 2018 (which sadly sat in a walled garden for a while
> before it was allowed to be contributed to Beam OSS repo), it seemed to me
> that stateful processing was somewhat new and not universally supported by
> runners.  I didn't want to alienate users that were using ElasticsearchIO
> on a runner without stateful processing.
>
> I think stateful processing is widely supported at this point.  I would be
> happy to remove bundle-based batching from ElasticsearchIO if it's
> acceptable that an IO cannot be used on a runner without state support.
> For what it's worth, bundle-based batching created such small batches in my
> experience that it was almost unusable at scale with Elasticsearch anyway.
>
> Thanks,
> Evan
>
> On Thu, Mar 10, 2022 at 4:02 PM Luke Cwik <[email protected]> wrote:
>
>> The ElasticsearchIO implementation seems to only buffer elements within a
>> single bundle. Within the lifetime of a single bundle, you don't need to
>> worry about holding the watermark back as the runners do that for you. Once
>> you cross bundle boundaries via a stateful DoFn processing the same key
>> multiple times or a splittable DoFn processing another part of the
>> restriction do you have to handle the watermark by either setting the
>> output time for a timer or controlling the watermark with the watermark
>> estimator.
>>
>> The API for @ProcessElement doesn't allow you to output buffered elements
>> with windowing information cleanly since there are use cases like the one
>> that you have built where you want to buffer in memory and want to produce
>> the entire record with windowing information at some point of time in the
>> future.
>>
>> You could do as Reuven suggested where you structure the pipeline like:
>> ... -> Reify.timestamps[1] -> Window.into(Default GlobalWindow strategy)
>> -> (optional) GroupIntoBatches -> ParDo(BulkIOFn/StatefulBulkIOFn) ->
>> Window.into(OriginalWindowingStrategy) -> ...
>>
>> Why do we opt-out of using the GroupIntoBatches route in ElasticsearchIO
>> code?
>>
>> If we didn't need to opt out you could do something like:
>> GroupIntoBatches -> ParDo(StatefulBulkIOFn)
>> and the StatefulBulkIOFn would process the iterable as a batch that is
>> sent to ElasticsearchIO.
>>
>> 1:
>> https://github.com/apache/beam/blob/c76a1c8361b83dd85b6edf0eddd5ebe03873e0ce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java#L220
>>
>>
>>
>>
>> On Thu, Mar 10, 2022 at 11:50 AM Reuven Lax <[email protected]> wrote:
>>
>>>
>>>
>>> On Thu, Mar 10, 2022 at 11:47 AM Evan Galpin <[email protected]>
>>> wrote:
>>>
>>>> BigQuery...implementation could have preserved the windowing
>>>>> information and restored it later before outputting those records into the
>>>>> successful/failed write PCollections.
>>>>
>>>>
>>>> @Luke Cwik <[email protected]> By "preserve" do you mean creating an
>>>> in-memory data structure to hold windowing info and make use of that info
>>>> later?  Would that also imply holding the watermark so that any windows
>>>> in-memory could not be "passed over" before being restored?  I'd be keen to
>>>> hear more about how one would ideally preserve and restore window
>>>> information.
>>>>
>>>
>>> One way is to preserve timestamps and simply reapply the WindowFn
>>>
>>>
>>>>
>>>> I agree that outputting to arbitrary windows is Bad™.  It's worth
>>>> noting that RedisIO[1] also has this pattern of arbitrary window output via
>>>> the same mechanism that ElasticsearchIO employs.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/0d007a7336e3bbeaa159111f3888f118dc56d0fe/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L433-L455
>>>>
>>>> On Thu, Mar 10, 2022 at 2:37 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> IMO the fact that BigQueryIO doesn't preserve windowing when
>>>>> outputting successful or failed elements was a bug.
>>>>>
>>>>> On Thu, Mar 10, 2022 at 11:34 AM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> I believe preserving the input window and element timestamp makes the
>>>>>> most sense. Outputting in the global window seems like a distant second.
>>>>>> Outputting in an arbitrary different window or with an arbitrary 
>>>>>> timestamp
>>>>>> is incorrect.
>>>>>>
>>>>>> BigQuery outputs successful and failed writes in the global window
>>>>>> but the implementation could have preserved the windowing information and
>>>>>> restored it later before outputting those records into the
>>>>>> successful/failed write PCollections. Based upon the code it seems like
>>>>>> global windows was used to get around an implementation detail that is
>>>>>> sub-optimal within Dataflow's GroupIntoBatches implementation.
>>>>>>
>>>>>> On Thu, Mar 10, 2022 at 6:26 AM Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> But I'm not sure if an IO throwing away windows (in favour of
>>>>>>>>> Global) would be acceptable either.  BigQueryIO#write (streaming) 
>>>>>>>>> also has
>>>>>>>>> a notion of "getFailedWrites" with the same intent as what's in
>>>>>>>>> ElasticsearchIO;  I wonder how that's implemented?
>>>>>>>>>
>>>>>>>>
>>>>>>>> It appears that BigQueryIO[1] just re-windows inputs into the
>>>>>>>> global window and does not concern itself with preserving input 
>>>>>>>> windows of
>>>>>>>> elements nor holding the watermark.  Is that a good precedent to 
>>>>>>>> follow?
>>>>>>>> It would certainly simplify the implementation of 
>>>>>>>> ElasticsearchIO#bulkIO.
>>>>>>>>
>>>>>>>
>>>>>>> Big picture answer: Windowing into the global window happens whether
>>>>>>> you remember to do it or not. The world outside of Beam doesn't relate 
>>>>>>> to
>>>>>>> Beam's windows. Doing the re-windowing explicitly might make you more
>>>>>>> likely to author the best code and/or offer the users the needed API to
>>>>>>> reify windowing information and/or document that they need to do it
>>>>>>> themselves before applying the write transform.
>>>>>>>
>>>>>>> Even more pedantically big picture answer: holding the watermark is
>>>>>>> exactly and only "reserving the right output an element with that 
>>>>>>> timestamp
>>>>>>> later" so you can guide your thinking with this. If you don't need to
>>>>>>> output an element at a timestamp, don't hold it.
>>>>>>>
>>>>>>> BUT write transforms should almost always output elements describing
>>>>>>> what was written. You want to think about the timestamps and windowing 
>>>>>>> on
>>>>>>> these elements. I'm not sure what the best practice is here. TBH I could
>>>>>>> see this actually functioning as a new "source" where everything is 
>>>>>>> global
>>>>>>> window and timestamps describe the moment the data was committed, but 
>>>>>>> not
>>>>>>> sure this is easily expressed. What do other IOs do about windowing 
>>>>>>> these
>>>>>>> outputs?
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318
>>>>>>>>
>>>>>>>> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Oops, unfortunate misfire before being done writing my prior email.
>>>>>>>>>
>>>>>>>>> Thanks Jan for the thoughtful responses!  It definitely sounds as
>>>>>>>>> though the ElasticsearchIO#write transform would be safer to either 
>>>>>>>>> hold
>>>>>>>>> the watermark via similar machinery employed by GroupIntoBatches or
>>>>>>>>> possibly output everything into a global window; either sounds more 
>>>>>>>>> safe in
>>>>>>>>> terms of data correctness than potentially outputting "on time" data 
>>>>>>>>> as
>>>>>>>>> "late" data.  But I'm not sure if an IO throwing away windows (in 
>>>>>>>>> favour of
>>>>>>>>> Global) would be acceptable either.  BigQueryIO#write (streaming) 
>>>>>>>>> also has
>>>>>>>>> a notion of "getFailedWrites" with the same intent as what's in
>>>>>>>>> ElasticsearchIO;  I wonder how that's implemented?
>>>>>>>>>
>>>>>>>>> I'm keen to follow any further discussion on watermark holds for
>>>>>>>>> DoFn's that implement both @StartBundle and @FinishBundle!
>>>>>>>>>
>>>>>>>>> - Evan
>>>>>>>>>
>>>>>>>>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Got it.  So ProcessContext#ouput has the semantics of "using the
>>>>>>>>>> window associated with the current @Element, output the data being 
>>>>>>>>>> passed
>>>>>>>>>> in into that window."  Makes complete sense.
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Interesting discussion. :-)
>>>>>>>>>>>
>>>>>>>>>>> Answers inline.
>>>>>>>>>>> On 3/8/22 22:00, Evan Galpin wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks Jan for confirming that the fix looks alright.  I also
>>>>>>>>>>> found a PR[1] that appears to be a good case study of the Timer 
>>>>>>>>>>> watermark
>>>>>>>>>>> hold technique that you previously mentioned so I'll study that a 
>>>>>>>>>>> bit for
>>>>>>>>>>> my own understanding and future use.  I was also previously missing 
>>>>>>>>>>> the
>>>>>>>>>>> notion that a singular bundle could contain elements from many 
>>>>>>>>>>> disparate
>>>>>>>>>>> Windows, so that's a great upgrade to my own mental model haha.
>>>>>>>>>>>
>>>>>>>>>>> I have a few follow-up questions for my own understanding
>>>>>>>>>>> (hopefully not off topic).
>>>>>>>>>>>
>>>>>>>>>>>    1. Am I understanding correctly that using
>>>>>>>>>>>    ProcessContext#output to output a given element will preserve 
>>>>>>>>>>> the element's
>>>>>>>>>>>    input window(s)? Or is it the case that when using 
>>>>>>>>>>> ProcessContext#output,
>>>>>>>>>>>    any buffered elements will all be output to whatever window the 
>>>>>>>>>>> most recent
>>>>>>>>>>>    element processed by @ProcessElement belongs to?
>>>>>>>>>>>
>>>>>>>>>>> There is no way Beam can distinguish *how* you produced elements
>>>>>>>>>>> emitted from @ProcessElement (if those are result of some in-memory
>>>>>>>>>>> buffering or direct result of computation of the single input 
>>>>>>>>>>> element
>>>>>>>>>>> currently being processed). From that it follows, that window(s) 
>>>>>>>>>>> associated
>>>>>>>>>>> with the output will be equal to window(s) of the input element 
>>>>>>>>>>> currently
>>>>>>>>>>> processed (generally, the model allows multiple applications of 
>>>>>>>>>>> window
>>>>>>>>>>> function, which is why window.maxTimestamp is 1 ms shifted 
>>>>>>>>>>> backwards, to be
>>>>>>>>>>> still part of the window itself, so that the window functions based 
>>>>>>>>>>> on
>>>>>>>>>>> timestamps are idempotent).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    1. Is it safe to emit outputs from @FinishBundle to windows
>>>>>>>>>>>    which may be older than the watermark?  I believe this could 
>>>>>>>>>>> still be
>>>>>>>>>>>    happening given the current ElasticsearchIO implementation where 
>>>>>>>>>>> any
>>>>>>>>>>>    buffered elements are output in @FinishBundle using the same
>>>>>>>>>>>    window they were associated with on input.  Intuitively, that 
>>>>>>>>>>> sounds as
>>>>>>>>>>>    though it could be a correctness bug if the watermark had 
>>>>>>>>>>> progressed beyond
>>>>>>>>>>>    those windows.
>>>>>>>>>>>
>>>>>>>>>>> This is great question that probably deserves deeper attention.
>>>>>>>>>>>
>>>>>>>>>>>  a) first of all - the fact, that you potentially could output
>>>>>>>>>>> late data, that previously were not late is _potentially_ a 
>>>>>>>>>>> correctness
>>>>>>>>>>> bug, if you have any downstream logic, that performs any additional
>>>>>>>>>>> grouping and - most notably - relies on causality being preserved 
>>>>>>>>>>> (which is
>>>>>>>>>>> how our universe works, so this is a natural requirement). Changing 
>>>>>>>>>>> element
>>>>>>>>>>> from "on time" to "late" can result in downstream processing see 
>>>>>>>>>>> effect
>>>>>>>>>>> prior to cause (in event time!, because the watermark move can cause
>>>>>>>>>>> downstream timers to fire). I wrote something about this in [1]. 
>>>>>>>>>>> Generally,
>>>>>>>>>>> if there is no grouping (or your application logic accounts for the
>>>>>>>>>>> swapping of cause and effect), then this is not an issue. If you 
>>>>>>>>>>> produce a
>>>>>>>>>>> PCollection for user, then you don't know the user does with it and
>>>>>>>>>>> therefore should pay attention to it.
>>>>>>>>>>>
>>>>>>>>>>>  b) second, this effect would not appear if mid-bundle output
>>>>>>>>>>> watermarks updates were not possible. I emphasize that I don't know 
>>>>>>>>>>> if this
>>>>>>>>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or 
>>>>>>>>>>> @luke can
>>>>>>>>>>> you correct me if I'm wrong?). I have some doubts if it is correct, 
>>>>>>>>>>> though.
>>>>>>>>>>> It seems that it causes generally issues with in-memory buffering 
>>>>>>>>>>> and
>>>>>>>>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements 
>>>>>>>>>>> @StartBundle
>>>>>>>>>>> and @FinishBundle it seems, that we should hold output watermark 
>>>>>>>>>>> between
>>>>>>>>>>> these two calls. But that would turn stateless DoFn into stateful, 
>>>>>>>>>>> which is
>>>>>>>>>>> ... unfortunate. :)
>>>>>>>>>>>
>>>>>>>>>>> I hope I made things a little more clear rather than more
>>>>>>>>>>> obfuscated. :)
>>>>>>>>>>>
>>>>>>>>>>>  Jan
>>>>>>>>>>>
>>>>>>>>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> It's definitely interesting to think about the idea of enforcing
>>>>>>>>>>> watermark update only between bundles, but presumably that would 
>>>>>>>>>>> mean quite
>>>>>>>>>>> extensive alterations.
>>>>>>>>>>>
>>>>>>>>>>> - Evan
>>>>>>>>>>>
>>>>>>>>>>> [1] https://github.com/apache/beam/pull/15249
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Evan,
>>>>>>>>>>>>
>>>>>>>>>>>> the fix looks good to me, as long as the timestamp of the
>>>>>>>>>>>> buffered data need to be preserved downstream. Generally I think it
>>>>>>>>>>>> *should* be possible to output in-memory buffered data in 
>>>>>>>>>>>> @ProcessElement
>>>>>>>>>>>> (and @FinishBundle), the case where you need timers is when your 
>>>>>>>>>>>> buffer
>>>>>>>>>>>> needs to span multiple bundles. It that case it also must be 
>>>>>>>>>>>> stored in
>>>>>>>>>>>> regular state and not in-memory.
>>>>>>>>>>>>
>>>>>>>>>>>> It seems to me that there is some logical gap in how we handle
>>>>>>>>>>>> the per-bundle buffering. I see two possibilities:
>>>>>>>>>>>>
>>>>>>>>>>>>  a) either we allow in the model to change the output watermark
>>>>>>>>>>>> *while* processing a bundle, - in which case it is logical 
>>>>>>>>>>>> requirement to
>>>>>>>>>>>> have output timestamp from @ProcessElement no earlier than 
>>>>>>>>>>>> timestamp of the
>>>>>>>>>>>> current element (because that way we preserve the "on time", 
>>>>>>>>>>>> "late" status
>>>>>>>>>>>> of the current element, we don't swap anything), or
>>>>>>>>>>>>
>>>>>>>>>>>>  b) we enforce output watermark update only in-between of
>>>>>>>>>>>> bundles - in that case the requirement could be relaxed that the 
>>>>>>>>>>>> output
>>>>>>>>>>>> timestamp from @ProcessElement might be no earlier than the 
>>>>>>>>>>>> minimum of
>>>>>>>>>>>> timestamps inside the bundle
>>>>>>>>>>>>
>>>>>>>>>>>> I'm afraid that our current position is a). But in that case it
>>>>>>>>>>>> is somewhat questionable if it is semantically correct to use
>>>>>>>>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. 
>>>>>>>>>>>> It can
>>>>>>>>>>>> move timestamps only to future instant (and inside same window!), 
>>>>>>>>>>>> which has
>>>>>>>>>>>> little semantic meaning to me. Looks more error prone than useful.
>>>>>>>>>>>>
>>>>>>>>>>>>  Jan
>>>>>>>>>>>> On 3/8/22 15:53, Evan Galpin wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks Jan, it's interesting to read about the handling of
>>>>>>>>>>>> timestamp in cases employing a buffering pattern.  In the case of 
>>>>>>>>>>>> the ES
>>>>>>>>>>>> write transform, buffered data could be output from ProcessElement 
>>>>>>>>>>>> or
>>>>>>>>>>>> FinishBundle.  It's the case where data is output from 
>>>>>>>>>>>> ProcessElement that
>>>>>>>>>>>> the error reported at the start of this thread shows up.  Based on 
>>>>>>>>>>>> what you
>>>>>>>>>>>> described, it sounds like the PR[1] I made to "fix" this issue is 
>>>>>>>>>>>> actually
>>>>>>>>>>>> fixing it by transitioning data from being late to being on time 
>>>>>>>>>>>> and
>>>>>>>>>>>> outputting all buffered data into a non-deterministic window where 
>>>>>>>>>>>> the
>>>>>>>>>>>> output heuristic is later satisfied (number of elements buffered 
>>>>>>>>>>>> or max
>>>>>>>>>>>> time since last output).  It seems there's 2 issues as a result:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. The window to which input elements belong is not being
>>>>>>>>>>>> preserved. Presumably we want IOs to leave an element's window 
>>>>>>>>>>>> unaltered?
>>>>>>>>>>>> 2. The watermark of the system might be incorrect given that
>>>>>>>>>>>> buffered data is assumed processed and allows the watermark to 
>>>>>>>>>>>> pass?
>>>>>>>>>>>>
>>>>>>>>>>>> It would definitely add complexity to the IO to employ timers
>>>>>>>>>>>> to address this, but if that's the preferred or only solution I'll 
>>>>>>>>>>>> put some
>>>>>>>>>>>> thought into how to implement that solution.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Evan
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to
>>>>>>>>>>>>> update output watermark while a bundle is being processed? That 
>>>>>>>>>>>>> seems it
>>>>>>>>>>>>> could also cause the "watermark skip" problem, which is 
>>>>>>>>>>>>> definitely an issue
>>>>>>>>>>>>> (and is probably the reason why the check fails?).
>>>>>>>>>>>>> On 3/8/22 09:35, Jan Lukavský wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> The buffering seems incorrect to me. Whenever there is a
>>>>>>>>>>>>> buffer, we need to make sure we hold the output watermark, 
>>>>>>>>>>>>> otherwise the
>>>>>>>>>>>>> watermark might "jump over" a buffered element transitioning it 
>>>>>>>>>>>>> from
>>>>>>>>>>>>> "on-time" to "late", which would be a correctness bug (we can 
>>>>>>>>>>>>> transition
>>>>>>>>>>>>> elements only from "late" to "on-time", never the other way 
>>>>>>>>>>>>> around). The
>>>>>>>>>>>>> alternative is to use @FinishBundle to do the flushing, but might 
>>>>>>>>>>>>> not be
>>>>>>>>>>>>> appropriate here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently, the only way to limit the progress of output
>>>>>>>>>>>>> watermark is by setting a timer with output timestamp that has the
>>>>>>>>>>>>> timestamp of the earliest element in the buffer. There was a 
>>>>>>>>>>>>> thread that
>>>>>>>>>>>>> was discussing this in more details [1].
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Jan
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
>>>>>>>>>>>>> On 3/7/22 19:54, Evan Galpin wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> In general, I'd also really like to improve my understanding
>>>>>>>>>>>>> and learn more about how the employed buffering can cause this 
>>>>>>>>>>>>> skew.  Is it
>>>>>>>>>>>>> because the call to "flush" is happening from a different 
>>>>>>>>>>>>> "current window"
>>>>>>>>>>>>> than the elements were originally buffered from?  I'm actually 
>>>>>>>>>>>>> thinking
>>>>>>>>>>>>> that the PR[1] to "fix" this would have had the side effect of 
>>>>>>>>>>>>> outputting
>>>>>>>>>>>>> buffered elements into the window from which "flush" was called 
>>>>>>>>>>>>> rather than
>>>>>>>>>>>>> the window from which the buffered data originated. I suppose 
>>>>>>>>>>>>> that could be
>>>>>>>>>>>>> problematic, but should at least satisfy the validation code.
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> x-post from the associated Jira ticket[0]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Fortunately/unfortunately this same issue struck me as well,
>>>>>>>>>>>>>> and I opened a PR[1] to use `ProcessContext#output` rather
>>>>>>>>>>>>>> than `ProcessContext#outputWithTimestamp`.  I believe that 
>>>>>>>>>>>>>> should resolve
>>>>>>>>>>>>>> this issue, it has for me when running jobs with a vendored SDK 
>>>>>>>>>>>>>> with that
>>>>>>>>>>>>>> change included.  Do folks feel this change to be cherry-picked 
>>>>>>>>>>>>>> into 2.37.0?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The change also prompted a question to the mailing list[2]
>>>>>>>>>>>>>> about skew validation difference between ProcessContext vs
>>>>>>>>>>>>>> FinishBundleContext (where there is no ability to compute skew 
>>>>>>>>>>>>>> as I
>>>>>>>>>>>>>> understand it).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is specifically a case where the @ProcessElement saw
>>>>>>>>>>>>>>> window X for element X0 and buffered it into memory and then 
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>> processing window Y and element Y0 wanted to flush previously 
>>>>>>>>>>>>>>> buffered
>>>>>>>>>>>>>>> element X0. This all occurred as part of the same bundle.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In general, yes, outputting to an earlier window is
>>>>>>>>>>>>>>> problematic.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Outputting to an earlier window is problematic,as the
>>>>>>>>>>>>>>>> watermark can never be correct if a DoFn can move time 
>>>>>>>>>>>>>>>> backwards
>>>>>>>>>>>>>>>> arbitrarily.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> A good question would be should I be able to output to a
>>>>>>>>>>>>>>>>> different window from the current @ProcessElement call, like 
>>>>>>>>>>>>>>>>> what we can do
>>>>>>>>>>>>>>>>> from @FinishBundle to handle these buffering scenarios.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results
>>>>>>>>>>>>>>>>>> from elements in window X and then trying to output them in 
>>>>>>>>>>>>>>>>>> window Y when
>>>>>>>>>>>>>>>>>> flushing the batch. This exposed a bug where elements that 
>>>>>>>>>>>>>>>>>> were being
>>>>>>>>>>>>>>>>>> buffered were being output as part of a different window 
>>>>>>>>>>>>>>>>>> than what the
>>>>>>>>>>>>>>>>>> window that produced them was.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This became visible because validation was added recently
>>>>>>>>>>>>>>>>>> to ensure that when the pipeline is processing elements in 
>>>>>>>>>>>>>>>>>> window X that
>>>>>>>>>>>>>>>>>> output with a timestamp is valid for window X. Note that 
>>>>>>>>>>>>>>>>>> this validation
>>>>>>>>>>>>>>>>>> only occurs in @ProcessElement since output is associated 
>>>>>>>>>>>>>>>>>> with the current
>>>>>>>>>>>>>>>>>> window with the input element that is being processed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no
>>>>>>>>>>>>>>>>>> existing windowing context and when you output that element 
>>>>>>>>>>>>>>>>>> is assigned to
>>>>>>>>>>>>>>>>>> an appropriate window.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch
>>>>>>>>>>>>>>>>>>> writing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m
>>>>>>>>>>>>>>>>>>> reasonably certain it’s this PR
>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/15381
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and
>>>>>>>>>>>>>>>>>>> write to Elastic in a streaming job, the config for the 
>>>>>>>>>>>>>>>>>>> source and sink is
>>>>>>>>>>>>>>>>>>> respectively
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>             
>>>>>>>>>>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>>>>>>>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>>>>>>>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> ElasticsearchIO.write()
>>>>>>>>>>>>>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>>>>>>>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>>>>>>>>>>>>>             
>>>>>>>>>>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>>>>>>>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>>>>>>>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>>>>>>>>>>>>>             // # of docs
>>>>>>>>>>>>>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>>>>>>>>>>>>>             .withConnectionConfiguration(
>>>>>>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>>>>>>> ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>>>>>>>>>>>>>                     arrayOf(host),
>>>>>>>>>>>>>>>>>>>                     "fubar",
>>>>>>>>>>>>>>>>>>>                     "_doc"
>>>>>>>>>>>>>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>>>>>>>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>>>>>             .withRetryConfiguration(
>>>>>>>>>>>>>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>>>>>>>>>>>>>                     10,
>>>>>>>>>>>>>>>>>>>                     // the duration is wall clock, against 
>>>>>>>>>>>>>>>>>>> the connection and socket timeouts specified
>>>>>>>>>>>>>>>>>>>                     // above. I.e., 10 x 30s is gonna be 
>>>>>>>>>>>>>>>>>>> more than 3 minutes, so if we're getting
>>>>>>>>>>>>>>>>>>>                     // 10 socket timeouts in a row, this 
>>>>>>>>>>>>>>>>>>> would ignore the "10" part and terminate
>>>>>>>>>>>>>>>>>>>                     // after 6. The idea is that in a mixed 
>>>>>>>>>>>>>>>>>>> failure mode, you'd get different timeouts
>>>>>>>>>>>>>>>>>>>                     // of different durations, and on 
>>>>>>>>>>>>>>>>>>> average 10 x fails < 4m.
>>>>>>>>>>>>>>>>>>>                     // That said, 4m is arbitrary, so 
>>>>>>>>>>>>>>>>>>> adjust as and when needed.
>>>>>>>>>>>>>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>>>>>>>>>>>>>                 )
>>>>>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>>>>>>>>>>>>>             .withIndexFn { f: JsonNode -> 
>>>>>>>>>>>>>>>>>>> f["schema_name"].asText() }
>>>>>>>>>>>>>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>>>>>>>>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately
>>>>>>>>>>>>>>>>>>> hit a bug in the consumer, due to alleged time skew, 
>>>>>>>>>>>>>>>>>>> specifically
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>>>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with 
>>>>>>>>>>>>>>>>>>> timestamp 2022-03-07T10:43:38.640Z. Output timestamps must 
>>>>>>>>>>>>>>>>>>> be no earlier than the timestamp of the
>>>>>>>>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed 
>>>>>>>>>>>>>>>>>>> skew (0 milliseconds) and no later than 
>>>>>>>>>>>>>>>>>>> 294247-01-10T04:00:54.775Z. See the 
>>>>>>>>>>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>>>>>>>>>>>>>> for details on changing the allowed skew.
>>>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first
>>>>>>>>>>>>>>>>>>> version this breaks, and it seems like the code in the 
>>>>>>>>>>>>>>>>>>> trace is largely
>>>>>>>>>>>>>>>>>>> added by the PR linked above. The error usually claims a 
>>>>>>>>>>>>>>>>>>> skew of a few
>>>>>>>>>>>>>>>>>>> seconds, but obviously I can’t override
>>>>>>>>>>>>>>>>>>> getAllowedTimestampSkew() on the internal Elastic DoFn,
>>>>>>>>>>>>>>>>>>> and it’s marked deprecated anyway.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the
>>>>>>>>>>>>>>>>>>> code was intending to fix, and additionally, I’d also be 
>>>>>>>>>>>>>>>>>>> happy if someone
>>>>>>>>>>>>>>>>>>> else can reproduce this or knows of similar reports. I feel 
>>>>>>>>>>>>>>>>>>> like what we’re
>>>>>>>>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would
>>>>>>>>>>>>>>>>>>> have thought someone else would have hit this by now.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Emils
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>

Reply via email to