On Wed, Mar 9, 2022 at 10:02 AM Evan Galpin <evan.gal...@gmail.com> wrote:

> But I'm not sure if an IO throwing away windows (in favour of Global)
>> would be acceptable either.  BigQueryIO#write (streaming) also has a notion
>> of "getFailedWrites" with the same intent as what's in ElasticsearchIO;  I
>> wonder how that's implemented?
>>
>
> It appears that BigQueryIO[1] just re-windows inputs into the global
> window and does not concern itself with preserving input windows of
> elements nor holding the watermark.  Is that a good precedent to follow?
> It would certainly simplify the implementation of ElasticsearchIO#bulkIO.
>

Big picture answer: Windowing into the global window happens whether you
remember to do it or not. The world outside of Beam doesn't relate to
Beam's windows. Doing the re-windowing explicitly might make you more
likely to author the best code and/or offer the users the needed API to
reify windowing information and/or document that they need to do it
themselves before applying the write transform.

Even more pedantically big picture answer: holding the watermark is exactly
and only "reserving the right output an element with that timestamp later"
so you can guide your thinking with this. If you don't need to output an
element at a timestamp, don't hold it.

BUT write transforms should almost always output elements describing what
was written. You want to think about the timestamps and windowing on these
elements. I'm not sure what the best practice is here. TBH I could see this
actually functioning as a new "source" where everything is global window
and timestamps describe the moment the data was committed, but not sure
this is easily expressed. What do other IOs do about windowing these
outputs?

Kenn


>
> [1]
> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java#L313-L318
>
> On Wed, Mar 9, 2022 at 10:13 AM Evan Galpin <evan.gal...@gmail.com> wrote:
>
>> Oops, unfortunate misfire before being done writing my prior email.
>>
>> Thanks Jan for the thoughtful responses!  It definitely sounds as though
>> the ElasticsearchIO#write transform would be safer to either hold the
>> watermark via similar machinery employed by GroupIntoBatches or possibly
>> output everything into a global window; either sounds more safe in terms of
>> data correctness than potentially outputting "on time" data as "late"
>> data.  But I'm not sure if an IO throwing away windows (in favour of
>> Global) would be acceptable either.  BigQueryIO#write (streaming) also has
>> a notion of "getFailedWrites" with the same intent as what's in
>> ElasticsearchIO;  I wonder how that's implemented?
>>
>> I'm keen to follow any further discussion on watermark holds for DoFn's
>> that implement both @StartBundle and @FinishBundle!
>>
>> - Evan
>>
>> On Wed, Mar 9, 2022 at 10:05 AM Evan Galpin <evan.gal...@gmail.com>
>> wrote:
>>
>>> Got it.  So ProcessContext#ouput has the semantics of "using the window
>>> associated with the current @Element, output the data being passed in into
>>> that window."  Makes complete sense.
>>>
>>> On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Interesting discussion. :-)
>>>>
>>>> Answers inline.
>>>> On 3/8/22 22:00, Evan Galpin wrote:
>>>>
>>>> Thanks Jan for confirming that the fix looks alright.  I also found a
>>>> PR[1] that appears to be a good case study of the Timer watermark hold
>>>> technique that you previously mentioned so I'll study that a bit for my own
>>>> understanding and future use.  I was also previously missing the notion
>>>> that a singular bundle could contain elements from many disparate Windows,
>>>> so that's a great upgrade to my own mental model haha.
>>>>
>>>> I have a few follow-up questions for my own understanding (hopefully
>>>> not off topic).
>>>>
>>>>    1. Am I understanding correctly that using ProcessContext#output to
>>>>    output a given element will preserve the element's input window(s)? Or 
>>>> is
>>>>    it the case that when using ProcessContext#output, any buffered elements
>>>>    will all be output to whatever window the most recent element processed
>>>>    by @ProcessElement belongs to?
>>>>
>>>> There is no way Beam can distinguish *how* you produced elements
>>>> emitted from @ProcessElement (if those are result of some in-memory
>>>> buffering or direct result of computation of the single input element
>>>> currently being processed). From that it follows, that window(s) associated
>>>> with the output will be equal to window(s) of the input element currently
>>>> processed (generally, the model allows multiple applications of window
>>>> function, which is why window.maxTimestamp is 1 ms shifted backwards, to be
>>>> still part of the window itself, so that the window functions based on
>>>> timestamps are idempotent).
>>>>
>>>>
>>>>    1. Is it safe to emit outputs from @FinishBundle to windows which
>>>>    may be older than the watermark?  I believe this could still be 
>>>> happening
>>>>    given the current ElasticsearchIO implementation where any buffered
>>>>    elements are output in @FinishBundle using the same window they
>>>>    were associated with on input.  Intuitively, that sounds as though it 
>>>> could
>>>>    be a correctness bug if the watermark had progressed beyond those 
>>>> windows.
>>>>
>>>> This is great question that probably deserves deeper attention.
>>>>
>>>>  a) first of all - the fact, that you potentially could output late
>>>> data, that previously were not late is _potentially_ a correctness bug, if
>>>> you have any downstream logic, that performs any additional grouping and -
>>>> most notably - relies on causality being preserved (which is how our
>>>> universe works, so this is a natural requirement). Changing element from
>>>> "on time" to "late" can result in downstream processing see effect prior to
>>>> cause (in event time!, because the watermark move can cause downstream
>>>> timers to fire). I wrote something about this in [1]. Generally, if there
>>>> is no grouping (or your application logic accounts for the swapping of
>>>> cause and effect), then this is not an issue. If you produce a PCollection
>>>> for user, then you don't know the user does with it and therefore should
>>>> pay attention to it.
>>>>
>>>>  b) second, this effect would not appear if mid-bundle output
>>>> watermarks updates were not possible. I emphasize that I don't know if this
>>>> is allowed in the model, but I *suppose* it is (@kenn, @reuven or @luke can
>>>> you correct me if I'm wrong?). I have some doubts if it is correct, though.
>>>> It seems that it causes generally issues with in-memory buffering and
>>>> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements @StartBundle
>>>> and @FinishBundle it seems, that we should hold output watermark between
>>>> these two calls. But that would turn stateless DoFn into stateful, which is
>>>> ... unfortunate. :)
>>>>
>>>> I hope I made things a little more clear rather than more obfuscated. :)
>>>>
>>>>  Jan
>>>>
>>>> [1] https://twitter.com/janl_apache/status/1478757956263071745
>>>>
>>>>
>>>> It's definitely interesting to think about the idea of enforcing
>>>> watermark update only between bundles, but presumably that would mean quite
>>>> extensive alterations.
>>>>
>>>> - Evan
>>>>
>>>> [1] https://github.com/apache/beam/pull/15249
>>>>
>>>> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Evan,
>>>>>
>>>>> the fix looks good to me, as long as the timestamp of the buffered
>>>>> data need to be preserved downstream. Generally I think it *should* be
>>>>> possible to output in-memory buffered data in @ProcessElement (and
>>>>> @FinishBundle), the case where you need timers is when your buffer needs 
>>>>> to
>>>>> span multiple bundles. It that case it also must be stored in regular 
>>>>> state
>>>>> and not in-memory.
>>>>>
>>>>> It seems to me that there is some logical gap in how we handle the
>>>>> per-bundle buffering. I see two possibilities:
>>>>>
>>>>>  a) either we allow in the model to change the output watermark
>>>>> *while* processing a bundle, - in which case it is logical requirement to
>>>>> have output timestamp from @ProcessElement no earlier than timestamp of 
>>>>> the
>>>>> current element (because that way we preserve the "on time", "late" status
>>>>> of the current element, we don't swap anything), or
>>>>>
>>>>>  b) we enforce output watermark update only in-between of bundles - in
>>>>> that case the requirement could be relaxed that the output timestamp from
>>>>> @ProcessElement might be no earlier than the minimum of timestamps inside
>>>>> the bundle
>>>>>
>>>>> I'm afraid that our current position is a). But in that case it is
>>>>> somewhat questionable if it is semantically correct to use
>>>>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It can
>>>>> move timestamps only to future instant (and inside same window!), which 
>>>>> has
>>>>> little semantic meaning to me. Looks more error prone than useful.
>>>>>
>>>>>  Jan
>>>>> On 3/8/22 15:53, Evan Galpin wrote:
>>>>>
>>>>> Thanks Jan, it's interesting to read about the handling of timestamp
>>>>> in cases employing a buffering pattern.  In the case of the ES write
>>>>> transform, buffered data could be output from ProcessElement or
>>>>> FinishBundle.  It's the case where data is output from ProcessElement that
>>>>> the error reported at the start of this thread shows up.  Based on what 
>>>>> you
>>>>> described, it sounds like the PR[1] I made to "fix" this issue is actually
>>>>> fixing it by transitioning data from being late to being on time and
>>>>> outputting all buffered data into a non-deterministic window where the
>>>>> output heuristic is later satisfied (number of elements buffered or max
>>>>> time since last output).  It seems there's 2 issues as a result:
>>>>>
>>>>> 1. The window to which input elements belong is not being preserved.
>>>>> Presumably we want IOs to leave an element's window unaltered?
>>>>> 2. The watermark of the system might be incorrect given that buffered
>>>>> data is assumed processed and allows the watermark to pass?
>>>>>
>>>>> It would definitely add complexity to the IO to employ timers to
>>>>> address this, but if that's the preferred or only solution I'll put some
>>>>> thought into how to implement that solution.
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>
>>>>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update
>>>>>> output watermark while a bundle is being processed? That seems it could
>>>>>> also cause the "watermark skip" problem, which is definitely an issue 
>>>>>> (and
>>>>>> is probably the reason why the check fails?).
>>>>>> On 3/8/22 09:35, Jan Lukavský wrote:
>>>>>>
>>>>>> The buffering seems incorrect to me. Whenever there is a buffer, we
>>>>>> need to make sure we hold the output watermark, otherwise the watermark
>>>>>> might "jump over" a buffered element transitioning it from "on-time" to
>>>>>> "late", which would be a correctness bug (we can transition elements only
>>>>>> from "late" to "on-time", never the other way around). The alternative is
>>>>>> to use @FinishBundle to do the flushing, but might not be appropriate 
>>>>>> here.
>>>>>>
>>>>>> Currently, the only way to limit the progress of output watermark is
>>>>>> by setting a timer with output timestamp that has the timestamp of the
>>>>>> earliest element in the buffer. There was a thread that was discussing 
>>>>>> this
>>>>>> in more details [1].
>>>>>>
>>>>>>  Jan
>>>>>>
>>>>>> [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
>>>>>> On 3/7/22 19:54, Evan Galpin wrote:
>>>>>>
>>>>>> In general, I'd also really like to improve my understanding and
>>>>>> learn more about how the employed buffering can cause this skew.  Is it
>>>>>> because the call to "flush" is happening from a different "current 
>>>>>> window"
>>>>>> than the elements were originally buffered from?  I'm actually thinking
>>>>>> that the PR[1] to "fix" this would have had the side effect of outputting
>>>>>> buffered elements into the window from which "flush" was called rather 
>>>>>> than
>>>>>> the window from which the buffered data originated. I suppose that could 
>>>>>> be
>>>>>> problematic, but should at least satisfy the validation code.
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>
>>>>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <evan.gal...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> x-post from the associated Jira ticket[0]
>>>>>>>
>>>>>>>
>>>>>>> Fortunately/unfortunately this same issue struck me as well, and I
>>>>>>> opened a PR[1] to use `ProcessContext#output` rather than
>>>>>>> `ProcessContext#outputWithTimestamp`.  I believe that should resolve 
>>>>>>> this
>>>>>>> issue, it has for me when running jobs with a vendored SDK with that 
>>>>>>> change
>>>>>>> included.  Do folks feel this change to be cherry-picked into 2.37.0?
>>>>>>>
>>>>>>> The change also prompted a question to the mailing list[2] about
>>>>>>> skew validation difference between ProcessContext vs FinishBundleContext
>>>>>>> (where there is no ability to compute skew as I understand it).
>>>>>>>
>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/16744
>>>>>>>
>>>>>>> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>>>>>>
>>>>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> This is specifically a case where the @ProcessElement saw window X
>>>>>>>> for element X0 and buffered it into memory and then when processing 
>>>>>>>> window
>>>>>>>> Y and element Y0 wanted to flush previously buffered element X0. This 
>>>>>>>> all
>>>>>>>> occurred as part of the same bundle.
>>>>>>>>
>>>>>>>> In general, yes, outputting to an earlier window is problematic.
>>>>>>>>
>>>>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Outputting to an earlier window is problematic,as the watermark
>>>>>>>>> can never be correct if a DoFn can move time backwards arbitrarily.
>>>>>>>>>
>>>>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> A good question would be should I be able to output to a
>>>>>>>>>> different window from the current @ProcessElement call, like what we 
>>>>>>>>>> can do
>>>>>>>>>> from @FinishBundle to handle these buffering scenarios.
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> The issue is that ElasticsearchIO is collecting results from
>>>>>>>>>>> elements in window X and then trying to output them in window Y when
>>>>>>>>>>> flushing the batch. This exposed a bug where elements that were 
>>>>>>>>>>> being
>>>>>>>>>>> buffered were being output as part of a different window than what 
>>>>>>>>>>> the
>>>>>>>>>>> window that produced them was.
>>>>>>>>>>>
>>>>>>>>>>> This became visible because validation was added recently to
>>>>>>>>>>> ensure that when the pipeline is processing elements in window X 
>>>>>>>>>>> that
>>>>>>>>>>> output with a timestamp is valid for window X. Note that this 
>>>>>>>>>>> validation
>>>>>>>>>>> only occurs in @ProcessElement since output is associated with the 
>>>>>>>>>>> current
>>>>>>>>>>> window with the input element that is being processed.
>>>>>>>>>>>
>>>>>>>>>>> It is ok to do this in @FinishBundle since there is no existing
>>>>>>>>>>> windowing context and when you output that element is assigned to an
>>>>>>>>>>> appropriate window.
>>>>>>>>>>>
>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>>>>>>> emils.solma...@rvu.co.uk> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>> I think we’re hitting a regression in ElasticIO batch writing.
>>>>>>>>>>>>
>>>>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m
>>>>>>>>>>>> reasonably certain it’s this PR
>>>>>>>>>>>> https://github.com/apache/beam/pull/15381
>>>>>>>>>>>>
>>>>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to
>>>>>>>>>>>> Elastic in a streaming job, the config for the source and sink is
>>>>>>>>>>>> respectively
>>>>>>>>>>>>
>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>             PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>>
>>>>>>>>>>>> ElasticsearchIO.write()
>>>>>>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>>>>>>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>>>>>>             // # of docs
>>>>>>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>>>>>>             .withConnectionConfiguration(
>>>>>>>>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>>>>>>                     arrayOf(host),
>>>>>>>>>>>>                     "fubar",
>>>>>>>>>>>>                     "_doc"
>>>>>>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>>>>>>             )
>>>>>>>>>>>>             .withRetryConfiguration(
>>>>>>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>>>>>>                     10,
>>>>>>>>>>>>                     // the duration is wall clock, against the 
>>>>>>>>>>>> connection and socket timeouts specified
>>>>>>>>>>>>                     // above. I.e., 10 x 30s is gonna be more than 
>>>>>>>>>>>> 3 minutes, so if we're getting
>>>>>>>>>>>>                     // 10 socket timeouts in a row, this would 
>>>>>>>>>>>> ignore the "10" part and terminate
>>>>>>>>>>>>                     // after 6. The idea is that in a mixed 
>>>>>>>>>>>> failure mode, you'd get different timeouts
>>>>>>>>>>>>                     // of different durations, and on average 10 x 
>>>>>>>>>>>> fails < 4m.
>>>>>>>>>>>>                     // That said, 4m is arbitrary, so adjust as 
>>>>>>>>>>>> and when needed.
>>>>>>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>>>>>>                 )
>>>>>>>>>>>>             )
>>>>>>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>>>>>>             .withIndexFn { f: JsonNode -> 
>>>>>>>>>>>> f["schema_name"].asText() }
>>>>>>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>>>>>>
>>>>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a
>>>>>>>>>>>> bug in the consumer, due to alleged time skew, specifically
>>>>>>>>>>>>
>>>>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp 
>>>>>>>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier 
>>>>>>>>>>>> than the timestamp of the
>>>>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
>>>>>>>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
>>>>>>>>>>>> the DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>>>>>>> for details on changing the allowed skew.
>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>>>>>>
>>>>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version
>>>>>>>>>>>> this breaks, and it seems like the code in the trace is largely 
>>>>>>>>>>>> added by
>>>>>>>>>>>> the PR linked above. The error usually claims a skew of a few 
>>>>>>>>>>>> seconds, but
>>>>>>>>>>>> obviously I can’t override getAllowedTimestampSkew() on the
>>>>>>>>>>>> internal Elastic DoFn, and it’s marked deprecated anyway.
>>>>>>>>>>>>
>>>>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code
>>>>>>>>>>>> was intending to fix, and additionally, I’d also be happy if 
>>>>>>>>>>>> someone else
>>>>>>>>>>>> can reproduce this or knows of similar reports. I feel like what 
>>>>>>>>>>>> we’re
>>>>>>>>>>>> doing is not *that* uncommon a scenario, so I would have
>>>>>>>>>>>> thought someone else would have hit this by now.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Emils
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to