Got it.  So ProcessContext#ouput has the semantics of "using the window
associated with the current @Element, output the data being passed in into
that window."  Makes complete sense.

On Wed, Mar 9, 2022 at 4:15 AM Jan Lukavský <[email protected]> wrote:

> Interesting discussion. :-)
>
> Answers inline.
> On 3/8/22 22:00, Evan Galpin wrote:
>
> Thanks Jan for confirming that the fix looks alright.  I also found a
> PR[1] that appears to be a good case study of the Timer watermark hold
> technique that you previously mentioned so I'll study that a bit for my own
> understanding and future use.  I was also previously missing the notion
> that a singular bundle could contain elements from many disparate Windows,
> so that's a great upgrade to my own mental model haha.
>
> I have a few follow-up questions for my own understanding (hopefully not
> off topic).
>
>    1. Am I understanding correctly that using ProcessContext#output to
>    output a given element will preserve the element's input window(s)? Or is
>    it the case that when using ProcessContext#output, any buffered elements
>    will all be output to whatever window the most recent element processed
>    by @ProcessElement belongs to?
>
> There is no way Beam can distinguish *how* you produced elements emitted
> from @ProcessElement (if those are result of some in-memory buffering or
> direct result of computation of the single input element currently being
> processed). From that it follows, that window(s) associated with the output
> will be equal to window(s) of the input element currently processed
> (generally, the model allows multiple applications of window function,
> which is why window.maxTimestamp is 1 ms shifted backwards, to be still
> part of the window itself, so that the window functions based on timestamps
> are idempotent).
>
>
>    1. Is it safe to emit outputs from @FinishBundle to windows which may
>    be older than the watermark?  I believe this could still be happening given
>    the current ElasticsearchIO implementation where any buffered elements are
>    output in @FinishBundle using the same window they were associated
>    with on input.  Intuitively, that sounds as though it could be a
>    correctness bug if the watermark had progressed beyond those windows.
>
> This is great question that probably deserves deeper attention.
>
>  a) first of all - the fact, that you potentially could output late data,
> that previously were not late is _potentially_ a correctness bug, if you
> have any downstream logic, that performs any additional grouping and - most
> notably - relies on causality being preserved (which is how our universe
> works, so this is a natural requirement). Changing element from "on time"
> to "late" can result in downstream processing see effect prior to cause (in
> event time!, because the watermark move can cause downstream timers to
> fire). I wrote something about this in [1]. Generally, if there is no
> grouping (or your application logic accounts for the swapping of cause and
> effect), then this is not an issue. If you produce a PCollection for user,
> then you don't know the user does with it and therefore should pay
> attention to it.
>
>  b) second, this effect would not appear if mid-bundle output watermarks
> updates were not possible. I emphasize that I don't know if this is allowed
> in the model, but I *suppose* it is (@kenn, @reuven or @luke can you
> correct me if I'm wrong?). I have some doubts if it is correct, though. It
> seems that it causes generally issues with in-memory buffering and
> 'outputWithTimestamp' in stateless DoFns. If a DoFn implements @StartBundle
> and @FinishBundle it seems, that we should hold output watermark between
> these two calls. But that would turn stateless DoFn into stateful, which is
> ... unfortunate. :)
>
> I hope I made things a little more clear rather than more obfuscated. :)
>
>  Jan
>
> [1] https://twitter.com/janl_apache/status/1478757956263071745
>
>
> It's definitely interesting to think about the idea of enforcing watermark
> update only between bundles, but presumably that would mean quite extensive
> alterations.
>
> - Evan
>
> [1] https://github.com/apache/beam/pull/15249
>
> On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi Evan,
>>
>> the fix looks good to me, as long as the timestamp of the buffered data
>> need to be preserved downstream. Generally I think it *should* be possible
>> to output in-memory buffered data in @ProcessElement (and @FinishBundle),
>> the case where you need timers is when your buffer needs to span multiple
>> bundles. It that case it also must be stored in regular state and not
>> in-memory.
>>
>> It seems to me that there is some logical gap in how we handle the
>> per-bundle buffering. I see two possibilities:
>>
>>  a) either we allow in the model to change the output watermark *while*
>> processing a bundle, - in which case it is logical requirement to have
>> output timestamp from @ProcessElement no earlier than timestamp of the
>> current element (because that way we preserve the "on time", "late" status
>> of the current element, we don't swap anything), or
>>
>>  b) we enforce output watermark update only in-between of bundles - in
>> that case the requirement could be relaxed that the output timestamp from
>> @ProcessElement might be no earlier than the minimum of timestamps inside
>> the bundle
>>
>> I'm afraid that our current position is a). But in that case it is
>> somewhat questionable if it is semantically correct to use
>> outputWithTimestamp() in @ProcessElement of stateless DoFn at all. It can
>> move timestamps only to future instant (and inside same window!), which has
>> little semantic meaning to me. Looks more error prone than useful.
>>
>>  Jan
>> On 3/8/22 15:53, Evan Galpin wrote:
>>
>> Thanks Jan, it's interesting to read about the handling of timestamp in
>> cases employing a buffering pattern.  In the case of the ES write
>> transform, buffered data could be output from ProcessElement or
>> FinishBundle.  It's the case where data is output from ProcessElement that
>> the error reported at the start of this thread shows up.  Based on what you
>> described, it sounds like the PR[1] I made to "fix" this issue is actually
>> fixing it by transitioning data from being late to being on time and
>> outputting all buffered data into a non-deterministic window where the
>> output heuristic is later satisfied (number of elements buffered or max
>> time since last output).  It seems there's 2 issues as a result:
>>
>> 1. The window to which input elements belong is not being preserved.
>> Presumably we want IOs to leave an element's window unaltered?
>> 2. The watermark of the system might be incorrect given that buffered
>> data is assumed processed and allows the watermark to pass?
>>
>> It would definitely add complexity to the IO to employ timers to address
>> this, but if that's the preferred or only solution I'll put some thought
>> into how to implement that solution.
>>
>> Thanks,
>> Evan
>>
>> [1] https://github.com/apache/beam/pull/16744
>>
>> On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Ah, sorry, the code flushes in @FinishBundle. Is it allowed to update
>>> output watermark while a bundle is being processed? That seems it could
>>> also cause the "watermark skip" problem, which is definitely an issue (and
>>> is probably the reason why the check fails?).
>>> On 3/8/22 09:35, Jan Lukavský wrote:
>>>
>>> The buffering seems incorrect to me. Whenever there is a buffer, we need
>>> to make sure we hold the output watermark, otherwise the watermark might
>>> "jump over" a buffered element transitioning it from "on-time" to "late",
>>> which would be a correctness bug (we can transition elements only from
>>> "late" to "on-time", never the other way around). The alternative is to use
>>> @FinishBundle to do the flushing, but might not be appropriate here.
>>>
>>> Currently, the only way to limit the progress of output watermark is by
>>> setting a timer with output timestamp that has the timestamp of the
>>> earliest element in the buffer. There was a thread that was discussing this
>>> in more details [1].
>>>
>>>  Jan
>>>
>>> [1] https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
>>> On 3/7/22 19:54, Evan Galpin wrote:
>>>
>>> In general, I'd also really like to improve my understanding and learn
>>> more about how the employed buffering can cause this skew.  Is it because
>>> the call to "flush" is happening from a different "current window" than the
>>> elements were originally buffered from?  I'm actually thinking that the
>>> PR[1] to "fix" this would have had the side effect of outputting buffered
>>> elements into the window from which "flush" was called rather than the
>>> window from which the buffered data originated. I suppose that could be
>>> problematic, but should at least satisfy the validation code.
>>>
>>> [1] https://github.com/apache/beam/pull/16744
>>>
>>> On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <[email protected]>
>>> wrote:
>>>
>>>> x-post from the associated Jira ticket[0]
>>>>
>>>>
>>>> Fortunately/unfortunately this same issue struck me as well, and I
>>>> opened a PR[1] to use `ProcessContext#output` rather than
>>>> `ProcessContext#outputWithTimestamp`.  I believe that should resolve this
>>>> issue, it has for me when running jobs with a vendored SDK with that change
>>>> included.  Do folks feel this change to be cherry-picked into 2.37.0?
>>>>
>>>> The change also prompted a question to the mailing list[2] about skew
>>>> validation difference between ProcessContext vs FinishBundleContext (where
>>>> there is no ability to compute skew as I understand it).
>>>>
>>>> [0] https://issues.apache.org/jira/browse/BEAM-14064
>>>>
>>>> [1] https://github.com/apache/beam/pull/16744
>>>>
>>>> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>>>>
>>>> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> This is specifically a case where the @ProcessElement saw window X for
>>>>> element X0 and buffered it into memory and then when processing window Y
>>>>> and element Y0 wanted to flush previously buffered element X0. This all
>>>>> occurred as part of the same bundle.
>>>>>
>>>>> In general, yes, outputting to an earlier window is problematic.
>>>>>
>>>>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> Outputting to an earlier window is problematic,as the watermark can
>>>>>> never be correct if a DoFn can move time backwards arbitrarily.
>>>>>>
>>>>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> A good question would be should I be able to output to a different
>>>>>>> window from the current @ProcessElement call, like what we can do from
>>>>>>> @FinishBundle to handle these buffering scenarios.
>>>>>>>
>>>>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>>> The issue is that ElasticsearchIO is collecting results from
>>>>>>>> elements in window X and then trying to output them in window Y when
>>>>>>>> flushing the batch. This exposed a bug where elements that were being
>>>>>>>> buffered were being output as part of a different window than what the
>>>>>>>> window that produced them was.
>>>>>>>>
>>>>>>>> This became visible because validation was added recently to ensure
>>>>>>>> that when the pipeline is processing elements in window X that output 
>>>>>>>> with
>>>>>>>> a timestamp is valid for window X. Note that this validation only 
>>>>>>>> occurs in
>>>>>>>> @ProcessElement since output is associated with the current window 
>>>>>>>> with the
>>>>>>>> input element that is being processed.
>>>>>>>>
>>>>>>>> It is ok to do this in @FinishBundle since there is no existing
>>>>>>>> windowing context and when you output that element is assigned to an
>>>>>>>> appropriate window.
>>>>>>>>
>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>>>>
>>>>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>> I think we’re hitting a regression in ElasticIO batch writing.
>>>>>>>>>
>>>>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m
>>>>>>>>> reasonably certain it’s this PR
>>>>>>>>> https://github.com/apache/beam/pull/15381
>>>>>>>>>
>>>>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to
>>>>>>>>> Elastic in a streaming job, the config for the source and sink is
>>>>>>>>> respectively
>>>>>>>>>
>>>>>>>>> pipeline.apply(
>>>>>>>>>             PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>>>>             .setCoder(KryoCoder.of())
>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>> ElasticsearchIO.write()
>>>>>>>>>             .withUseStatefulBatches(true)
>>>>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>>>>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>>>>             // # of docs
>>>>>>>>>             .withMaxBatchSize(1000)
>>>>>>>>>             .withConnectionConfiguration(
>>>>>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>>>>                     arrayOf(host),
>>>>>>>>>                     "fubar",
>>>>>>>>>                     "_doc"
>>>>>>>>>                 ).withConnectTimeout(5000)
>>>>>>>>>                     .withSocketTimeout(30000)
>>>>>>>>>             )
>>>>>>>>>             .withRetryConfiguration(
>>>>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>>>>                     10,
>>>>>>>>>                     // the duration is wall clock, against the 
>>>>>>>>> connection and socket timeouts specified
>>>>>>>>>                     // above. I.e., 10 x 30s is gonna be more than 3 
>>>>>>>>> minutes, so if we're getting
>>>>>>>>>                     // 10 socket timeouts in a row, this would ignore 
>>>>>>>>> the "10" part and terminate
>>>>>>>>>                     // after 6. The idea is that in a mixed failure 
>>>>>>>>> mode, you'd get different timeouts
>>>>>>>>>                     // of different durations, and on average 10 x 
>>>>>>>>> fails < 4m.
>>>>>>>>>                     // That said, 4m is arbitrary, so adjust as and 
>>>>>>>>> when needed.
>>>>>>>>>                     Duration.standardMinutes(4)
>>>>>>>>>                 )
>>>>>>>>>             )
>>>>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>>>>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>>>>>>>>>             .withIsDeleteFn { f: JsonNode -> 
>>>>>>>>> f["_action"].asText("noop") == "delete" }
>>>>>>>>>
>>>>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug
>>>>>>>>> in the consumer, due to alleged time skew, specifically
>>>>>>>>>
>>>>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp 
>>>>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than 
>>>>>>>>> the timestamp of the
>>>>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
>>>>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>>>>>> for details on changing the allowed skew.
>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>>>>
>>>>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version
>>>>>>>>> this breaks, and it seems like the code in the trace is largely added 
>>>>>>>>> by
>>>>>>>>> the PR linked above. The error usually claims a skew of a few 
>>>>>>>>> seconds, but
>>>>>>>>> obviously I can’t override getAllowedTimestampSkew() on the
>>>>>>>>> internal Elastic DoFn, and it’s marked deprecated anyway.
>>>>>>>>>
>>>>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was
>>>>>>>>> intending to fix, and additionally, I’d also be happy if someone else 
>>>>>>>>> can
>>>>>>>>> reproduce this or knows of similar reports. I feel like what we’re 
>>>>>>>>> doing is
>>>>>>>>> not *that* uncommon a scenario, so I would have thought someone
>>>>>>>>> else would have hit this by now.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Emils
>>>>>>>>>
>>>>>>>>

Reply via email to