>
> BigQuery...implementation could have preserved the windowing information
> and restored it later before outputting those records into the
> successful/failed write PCollections.


@Luke Cwik <[email protected]> By "preserve" do you mean creating an
in-memory data structure to hold windowing info and make use of that info
later?  Would that also imply holding the watermark so that any windows
in-memory could not be "passed over" before being restored?  I'd be keen to
hear more about how one would ideally preserve and restore window
information.

I agree that outputting to arbitrary windows is Bad™.  It's worth noting
that RedisIO[1] also has this pattern of arbitrary window output via the
same mechanism that ElasticsearchIO employs.

[1]
https://github.com/apache/beam/blob/0d007a7336e3bbeaa159111f3888f118dc56d0fe/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L433-L455

On Thu, Mar 10, 2022 at 2:37 PM Reuven Lax <[email protected]> wrote:

> IMO the fact that BigQueryIO doesn't preserve windowing when outputting
> successful or failed elements was a bug.
>
> On Thu, Mar 10, 2022 at 11:34 AM Luke Cwik <[email protected]> wrote:
>
>> I believe preserving the input window and element timestamp makes the
>> most sense. Outputting in the global window seems like a distant second.
>> Outputting in an arbitrary different window or with an arbitrary timestamp
>> is incorrect.
>>
>> BigQuery outputs successful and failed writes in the global window but
>> the implementation could have preserved the windowing information and
>> restored it later before outputting those records into the
>> successful/failed write PCollections. Based upon the code it seems like
>> global windows was used to get around an implementation detail that is
>> sub-optimal within Dataflow's GroupIntoBatches implementation.
>>
>> On Thu, Mar 10, 2022 at 6:26 AM Kenneth Knowles <[email protected]> wrote:
>>
>>>
>>>
>>> On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <[email protected]>
>>> wrote:
>>>
>>>> But I'm not sure if an IO throwing away windows (in favour of Global)
>>>>> would be acceptable either.  BigQueryIO#write (streaming) also has a 
>>>>> notion
>>>>> of "getFailedWrites" with the same intent as what's in ElasticsearchIO;  I
>>>>> wonder how that's implemented?
>>>>>
>>>>
>>>> It appears that BigQueryIO[1] just re-windows inputs into the global
>>>> window and does not concern itself with preserving input windows of
>>>> elements nor holding the watermark.  Is that a good precedent to follow?
>>>> It would certainly simplify the implementation of ElasticsearchIO#bulkIO.
>>>>
>>>
>>> Big picture answer: Windowing into the global window happens whether you
>>> remember to do it or not. The world outside of Beam doesn't relate to
>>> Beam's windows. Doing the re-windowing explicitly might make you more
>>> likely to author the best code and/or offer the users the needed API to
>>> reify windowing information and/or document that they need to do it
>>> themselves before applying the write transform.
>>>
>>> Even more pedantically big picture answer: holding the watermark is
>>> exactly and only "reserving the right output an element with that timestamp
>>> later" so you can guide your thinking with this. If you don't need to
>>> output an element at a timestamp, don't hold it.
>>>
>>> BUT write transforms should almost always output elements describing
>>> what was written. You want to think about the timestamps and windowing on
>>> these elements. I'm not sure what the best practice is here. TBH I could
>>> see this actually functioning as a new "source" where everything is global
>>> window and timestamps describe the moment the data was committed, but not
>>> sure this is easily expressed. What do other IOs do about windowing these
>>> outputs?
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318
>>>>
>>>> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <[email protected]>
>>>> wrote:
>>>>
>>>>> Oops, unfortunate misfire before being done writing my prior email.
>>>>>
>>>>> Thanks Jan for the thoughtful responses!  It definitely sounds as
>>>>> though the ElasticsearchIO#write transform would be safer to either hold
>>>>> the watermark via similar machinery employed by GroupIntoBatches or
>>>>> possibly output everything into a global window; either sounds more safe 
>>>>> in
>>>>> terms of data correctness than potentially outputting "on time" data as
>>>>> "late" data.  But I'm not sure if an IO throwing away windows (in favour 
>>>>> of
>>>>> Global) would be acceptable either.  BigQueryIO#write (streaming) also has
>>>>> a notion of "getFailedWrites" with the same intent as what's in
>>>>> ElasticsearchIO;  I wonder how that's implemented?
>>>>>
>>>>> I'm keen to follow any further discussion on watermark holds for
>>>>> DoFn's that implement both @StartBundle and @FinishBundle!
>>>>>
>>>>> - Evan
>>>>>
>>>>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Got it.  So ProcessContext#ouput has the semantics of "using the
>>>>>> window associated with the current @Element, output the data being passed
>>>>>> in into that window."  Makes complete sense.
>>>>>>
>>>>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>>> Interesting discussion. :-)
>>>>>>>
>>>>>>> Answers inline.
>>>>>>> On 3/8/22 22:00, Evan Galpin wrote:
>>>>>>>
>>>>>>> Thanks Jan for confirming that the fix looks alright.  I also found
>>>>>>> a PR[1] that appears to be a good case study of the Timer watermark hold
>>>>>>> technique that you previously mentioned so I'll study that a bit for my 
>>>>>>> own
>>>>>>> understanding and future use.  I was also previously missing the notion
>>>>>>> that a singular bundle could contain elements from many disparate 
>>>>>>> Windows,
>>>>>>> so that's a great upgrade to my own mental model haha.
>>>>>>>
>>>>>>> I have a few follow-up questions for my own understanding (hopefully
>>>>>>> not off topic).
>>>>>>>
>>>>>>>    1. Am I understanding correctly that using ProcessContext#output
>>>>>>>    to output a given element will preserve the element's input 
>>>>>>> window(s)? Or
>>>>>>>    is it the case that when using ProcessContext#output, any buffered 
>>>>>>> elements
>>>>>>>    will all be output to whatever window the most recent element 
>>>>>>> processed
>>>>>>>    by @ProcessElement belongs to?
>>>>>>>
>>>>>>> There is no way Beam can distinguish *how* you produced elements
>>>>>>> emitted from @ProcessElement (if those are result of some in-memory
>>>>>>> buffering or direct result of computation of the single input element
>>>>>>> currently being processed). From that it follows, that window(s) 
>>>>>>> associated
>>>>>>> with the output will be equal to window(s) of the input element 
>>>>>>> currently
>>>>>>> processed (generally, the model allows multiple applications of window
>>>>>>> function, which is why window.maxTimestamp is 1 ms shifted backwards, 
>>>>>>> to be
>>>>>>> still part of the window itself, so that the window functions based on
>>>>>>> timestamps are idempotent).
>>>>>>>
>>>>>>>
>>>>>>>    1. Is it safe to emit outputs from @FinishBundle to windows
>>>>>>>    which may be older than the watermark?  I believe this could still be
>>>>>>>    happening given the current ElasticsearchIO implementation where any
>>>>>>>    buffered elements are output in @FinishBundle using the same
>>>>>>>    window they were associated with on input.  Intuitively, that sounds 
>>>>>>> as
>>>>>>>    though it could be a correctness bug if the watermark had progressed 
>>>>>>> beyond
>>>>>>>    those windows.
>>>>>>>
>>>>>>> This is great question that probably deserves deeper attention.
>>>>>>>
>>>>>>>  a) first of all - the fact, that you potentially could output late
>>>>>>> data, that previously were not late is _potentially_ a correctness bug, 
>>>>>>> if
>>>>>>> you have any downstream logic, that performs any additional grouping 
>>>>>>> and -
>>>>>>> most notably - relies on causality being preserved (which is how our
>>>>>>> universe works, so this is a natural requirement). Changing element from
>>>>>>> "on time" to "late" can result in downstream processing see effect 
>>>>>>> prior to
>>>>>>> cause (in event time!, because the watermark move can cause downstream
>>>>>>> timers to fire). I wrote something about this in [1]. Generally, if 
>>>>>>> there
>>>>>>> is no grouping (or your application logic accounts for the swapping of
>>>>>>> cause and effect), then this is not an issue. If you produce a 
>>>>>>> PCollection
>>>>>>> for user, then you don't know the user does with it and therefore should
>>>>>>> pay attention to it.
>>>>>>>
>>>>>>>  b) second, this effect would not appear if mid-bundle output
>>>>>>> watermarks updates were not possible. I emphasize that I don't know if 
>>>>>>> this
>>>>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or @luke 
>>>>>>> can
>>>>>>> you correct me if I'm wrong?). I have some doubts if it is correct, 
>>>>>>> though.
>>>>>>> It seems that it causes generally issues with in-memory buffering and
>>>>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements 
>>>>>>> @StartBundle
>>>>>>> and @FinishBundle it seems, that we should hold output watermark between
>>>>>>> these two calls. But that would turn stateless DoFn into stateful, 
>>>>>>> which is
>>>>>>> ... unfortunate. :)
>>>>>>>
>>>>>>> I hope I made things a little more clear rather than more
>>>>>>> obfuscated. :)
>>>>>>>
>>>>>>>  Jan
>>>>>>>
>>>>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745
>>>>>>>
>>>>>>>
>>>>>>> It's definitely interesting to think about the idea of enforcing
>>>>>>> watermark update only between bundles, but presumably that would mean 
>>>>>>> quite
>>>>>>> extensive alterations.
>>>>>>>
>>>>>>> - Evan
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/15249
>>>>>>>
>>>>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Evan,
>>>>>>>>
>>>>>>>> the fix looks good to me, as long as the timestamp of the buffered
>>>>>>>> data need to be preserved downstream. Generally I think it *should* be
>>>>>>>> possible to output in-memory buffered data in @ProcessElement (and
>>>>>>>> @FinishBundle), the case where you need timers is when your buffer 
>>>>>>>> needs to
>>>>>>>> span multiple bundles. It that case it also must be stored in regular 
>>>>>>>> state
>>>>>>>> and not in-memory.
>>>>>>>>
>>>>>>>> It seems to me that there is some logical gap in how we handle the
>>>>>>>> per-bundle buffering. I see two possibilities:
>>>>>>>>
>>>>>>>>  a) either we allow in the model to change the output watermark
>>>>>>>> *while* processing a bundle, - in which case it is logical requirement 
>>>>>>>> to
>>>>>>>> have output timestamp from @ProcessElement no earlier than timestamp 
>>>>>>>> of the
>>>>>>>> current element (because that way we preserve the "on time", "late" 
>>>>>>>> status
>>>>>>>> of the current element, we don't swap anything), or
>>>>>>>>
>>>>>>>>  b) we enforce output watermark update only in-between of bundles -
>>>>>>>> in that case the requirement could be relaxed that the output timestamp
>>>>>>>> from @ProcessElement might be no earlier than the minimum of timestamps
>>>>>>>> inside the bundle
>>>>>>>>
>>>>>>>> I'm afraid that our current position is a). But in that case it is
>>>>>>>> somewhat questionable if it is semantically correct to use
>>>>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It 
>>>>>>>> can
>>>>>>>> move timestamps only to future instant (and inside same window!), 
>>>>>>>> which has
>>>>>>>> little semantic meaning to me. Looks more error prone than useful.
>>>>>>>>
>>>>>>>>  Jan
>>>>>>>> On 3/8/22 15:53, Evan Galpin wrote:
>>>>>>>>
>>>>>>>> Thanks Jan, it's interesting to read about the handling of
>>>>>>>> timestamp in cases employing a buffering pattern.  In the case of the 
>>>>>>>> ES
>>>>>>>> write transform, buffered data could be output from ProcessElement or
>>>>>>>> FinishBundle.  It's the case where data is output from ProcessElement 
>>>>>>>> that
>>>>>>>> the error reported at the start of this thread shows up.  Based on 
>>>>>>>> what you
>>>>>>>> described, it sounds like the PR[1] I made to "fix" this issue is 
>>>>>>>> actually
>>>>>>>> fixing it by transitioning data from being late to being on time and
>>>>>>>> outputting all buffered data into a non-deterministic window where the
>>>>>>>> output heuristic is later satisfied (number of elements buffered or max
>>>>>>>> time since last output).  It seems there's 2 issues as a result:
>>>>>>>>
>>>>>>>> 1. The window to which input elements belong is not being
>>>>>>>> preserved. Presumably we want IOs to leave an element's window 
>>>>>>>> unaltered?
>>>>>>>> 2. The watermark of the system might be incorrect given that
>>>>>>>> buffered data is assumed processed and allows the watermark to pass?
>>>>>>>>
>>>>>>>> It would definitely add complexity to the IO to employ timers to
>>>>>>>> address this, but if that's the preferred or only solution I'll put 
>>>>>>>> some
>>>>>>>> thought into how to implement that solution.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Evan
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>
>>>>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to
>>>>>>>>> update output watermark while a bundle is being processed? That seems 
>>>>>>>>> it
>>>>>>>>> could also cause the "watermark skip" problem, which is definitely an 
>>>>>>>>> issue
>>>>>>>>> (and is probably the reason why the check fails?).
>>>>>>>>> On 3/8/22 09:35, Jan Lukavský wrote:
>>>>>>>>>
>>>>>>>>> The buffering seems incorrect to me. Whenever there is a buffer,
>>>>>>>>> we need to make sure we hold the output watermark, otherwise the 
>>>>>>>>> watermark
>>>>>>>>> might "jump over" a buffered element transitioning it from "on-time" 
>>>>>>>>> to
>>>>>>>>> "late", which would be a correctness bug (we can transition elements 
>>>>>>>>> only
>>>>>>>>> from "late" to "on-time", never the other way around). The 
>>>>>>>>> alternative is
>>>>>>>>> to use @FinishBundle to do the flushing, but might not be appropriate 
>>>>>>>>> here.
>>>>>>>>>
>>>>>>>>> Currently, the only way to limit the progress of output watermark
>>>>>>>>> is by setting a timer with output timestamp that has the timestamp of 
>>>>>>>>> the
>>>>>>>>> earliest element in the buffer. There was a thread that was 
>>>>>>>>> discussing this
>>>>>>>>> in more details [1].
>>>>>>>>>
>>>>>>>>>  Jan
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
>>>>>>>>> On 3/7/22 19:54, Evan Galpin wrote:
>>>>>>>>>
>>>>>>>>> In general, I'd also really like to improve my understanding and
>>>>>>>>> learn more about how the employed buffering can cause this skew.  Is 
>>>>>>>>> it
>>>>>>>>> because the call to "flush" is happening from a different "current 
>>>>>>>>> window"
>>>>>>>>> than the elements were originally buffered from?  I'm actually 
>>>>>>>>> thinking
>>>>>>>>> that the PR[1] to "fix" this would have had the side effect of 
>>>>>>>>> outputting
>>>>>>>>> buffered elements into the window from which "flush" was called 
>>>>>>>>> rather than
>>>>>>>>> the window from which the buffered data originated. I suppose that 
>>>>>>>>> could be
>>>>>>>>> problematic, but should at least satisfy the validation code.
>>>>>>>>>
>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>
>>>>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> x-post from the associated Jira ticket[0]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Fortunately/unfortunately this same issue struck me as well, and
>>>>>>>>>> I opened a PR[1] to use `ProcessContext#output` rather than
>>>>>>>>>> `ProcessContext#outputWithTimestamp`.  I believe that should resolve 
>>>>>>>>>> this
>>>>>>>>>> issue, it has for me when running jobs with a vendored SDK with that 
>>>>>>>>>> change
>>>>>>>>>> included.  Do folks feel this change to be cherry-picked into 2.37.0?
>>>>>>>>>>
>>>>>>>>>> The change also prompted a question to the mailing list[2] about
>>>>>>>>>> skew validation difference between ProcessContext vs 
>>>>>>>>>> FinishBundleContext
>>>>>>>>>> (where there is no ability to compute skew as I understand it).
>>>>>>>>>>
>>>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>>>>
>>>>>>>>>> [2]
>>>>>>>>>> https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is specifically a case where the @ProcessElement saw window
>>>>>>>>>>> X for element X0 and buffered it into memory and then when 
>>>>>>>>>>> processing
>>>>>>>>>>> window Y and element Y0 wanted to flush previously buffered element 
>>>>>>>>>>> X0.
>>>>>>>>>>> This all occurred as part of the same bundle.
>>>>>>>>>>>
>>>>>>>>>>> In general, yes, outputting to an earlier window is problematic.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Outputting to an earlier window is problematic,as the watermark
>>>>>>>>>>>> can never be correct if a DoFn can move time backwards arbitrarily.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> A good question would be should I be able to output to a
>>>>>>>>>>>>> different window from the current @ProcessElement call, like what 
>>>>>>>>>>>>> we can do
>>>>>>>>>>>>> from @FinishBundle to handle these buffering scenarios.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results from
>>>>>>>>>>>>>> elements in window X and then trying to output them in window Y 
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>> flushing the batch. This exposed a bug where elements that were 
>>>>>>>>>>>>>> being
>>>>>>>>>>>>>> buffered were being output as part of a different window than 
>>>>>>>>>>>>>> what the
>>>>>>>>>>>>>> window that produced them was.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This became visible because validation was added recently to
>>>>>>>>>>>>>> ensure that when the pipeline is processing elements in window X 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>> output with a timestamp is valid for window X. Note that this 
>>>>>>>>>>>>>> validation
>>>>>>>>>>>>>> only occurs in @ProcessElement since output is associated with 
>>>>>>>>>>>>>> the current
>>>>>>>>>>>>>> window with the input element that is being processed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no
>>>>>>>>>>>>>> existing windowing context and when you output that element is 
>>>>>>>>>>>>>> assigned to
>>>>>>>>>>>>>> an appropriate window.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch
>>>>>>>>>>>>>>> writing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m
>>>>>>>>>>>>>>> reasonably certain it’s this PR
>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/15381
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write
>>>>>>>>>>>>>>> to Elastic in a streaming job, the config for the source and 
>>>>>>>>>>>>>>> sink is
>>>>>>>>>>>>>>> respectively
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>             
>>>>>>>>>>>>>>> PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>>>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>>>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ElasticsearchIO.write()
>>>>>>>>>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>>>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>>>>>>>>>             
>>>>>>>>>>>>>>> .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>>>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>>>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>>>>>>>>>             // # of docs
>>>>>>>>>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>>>>>>>>>             .withConnectionConfiguration(
>>>>>>>>>>>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>>>>>>>>>                     arrayOf(host),
>>>>>>>>>>>>>>>                     "fubar",
>>>>>>>>>>>>>>>                     "_doc"
>>>>>>>>>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>>>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>             .withRetryConfiguration(
>>>>>>>>>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>>>>>>>>>                     10,
>>>>>>>>>>>>>>>                     // the duration is wall clock, against the 
>>>>>>>>>>>>>>> connection and socket timeouts specified
>>>>>>>>>>>>>>>                     // above. I.e., 10 x 30s is gonna be more 
>>>>>>>>>>>>>>> than 3 minutes, so if we're getting
>>>>>>>>>>>>>>>                     // 10 socket timeouts in a row, this would 
>>>>>>>>>>>>>>> ignore the "10" part and terminate
>>>>>>>>>>>>>>>                     // after 6. The idea is that in a mixed 
>>>>>>>>>>>>>>> failure mode, you'd get different timeouts
>>>>>>>>>>>>>>>                     // of different durations, and on average 
>>>>>>>>>>>>>>> 10 x fails < 4m.
>>>>>>>>>>>>>>>                     // That said, 4m is arbitrary, so adjust as 
>>>>>>>>>>>>>>> and when needed.
>>>>>>>>>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>>>>>>>>>                 )
>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>>>>>>>>>             .withIndexFn { f: JsonNode -> 
>>>>>>>>>>>>>>> f["schema_name"].asText() }
>>>>>>>>>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>>>>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit
>>>>>>>>>>>>>>> a bug in the consumer, due to alleged time skew, specifically
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with 
>>>>>>>>>>>>>>> timestamp 2022-03-07T10:43:38.640Z. Output timestamps must be 
>>>>>>>>>>>>>>> no earlier than the timestamp of the
>>>>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew 
>>>>>>>>>>>>>>> (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. 
>>>>>>>>>>>>>>> See the DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>>>>>>>>>> for details on changing the allowed skew.
>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first
>>>>>>>>>>>>>>> version this breaks, and it seems like the code in the trace is 
>>>>>>>>>>>>>>> largely
>>>>>>>>>>>>>>> added by the PR linked above. The error usually claims a skew 
>>>>>>>>>>>>>>> of a few
>>>>>>>>>>>>>>> seconds, but obviously I can’t override
>>>>>>>>>>>>>>> getAllowedTimestampSkew() on the internal Elastic DoFn, and
>>>>>>>>>>>>>>> it’s marked deprecated anyway.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the
>>>>>>>>>>>>>>> code was intending to fix, and additionally, I’d also be happy 
>>>>>>>>>>>>>>> if someone
>>>>>>>>>>>>>>> else can reproduce this or knows of similar reports. I feel 
>>>>>>>>>>>>>>> like what we’re
>>>>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have
>>>>>>>>>>>>>>> thought someone else would have hit this by now.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Emils
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to