In general, I'd also really like to improve my understanding and learn more
about how the employed buffering can cause this skew.  Is it because the
call to "flush" is happening from a different "current window" than the
elements were originally buffered from?  I'm actually thinking that the
PR[1] to "fix" this would have had the side effect of outputting buffered
elements into the window from which "flush" was called rather than the
window from which the buffered data originated. I suppose that could be
problematic, but should at least satisfy the validation code.

[1] https://github.com/apache/beam/pull/16744

On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin <[email protected]> wrote:

> x-post from the associated Jira ticket[0]
>
>
> Fortunately/unfortunately this same issue struck me as well, and I opened
> a PR[1] to use `ProcessContext#output` rather than
> `ProcessContext#outputWithTimestamp`.  I believe that should resolve this
> issue, it has for me when running jobs with a vendored SDK with that change
> included.  Do folks feel this change to be cherry-picked into 2.37.0?
>
> The change also prompted a question to the mailing list[2] about skew
> validation difference between ProcessContext vs FinishBundleContext (where
> there is no ability to compute skew as I understand it).
>
> [0] https://issues.apache.org/jira/browse/BEAM-14064
>
> [1] https://github.com/apache/beam/pull/16744
>
> [2] https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
>
> On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik <[email protected]> wrote:
>
>> This is specifically a case where the @ProcessElement saw window X for
>> element X0 and buffered it into memory and then when processing window Y
>> and element Y0 wanted to flush previously buffered element X0. This all
>> occurred as part of the same bundle.
>>
>> In general, yes, outputting to an earlier window is problematic.
>>
>> On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax <[email protected]> wrote:
>>
>>> Outputting to an earlier window is problematic,as the watermark can
>>> never be correct if a DoFn can move time backwards arbitrarily.
>>>
>>> On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> A good question would be should I be able to output to a different
>>>> window from the current @ProcessElement call, like what we can do from
>>>> @FinishBundle to handle these buffering scenarios.
>>>>
>>>> On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> The issue is that ElasticsearchIO is collecting results from elements
>>>>> in window X and then trying to output them in window Y when flushing the
>>>>> batch. This exposed a bug where elements that were being buffered were
>>>>> being output as part of a different window than what the window that
>>>>> produced them was.
>>>>>
>>>>> This became visible because validation was added recently to ensure
>>>>> that when the pipeline is processing elements in window X that output with
>>>>> a timestamp is valid for window X. Note that this validation only occurs 
>>>>> in
>>>>> @ProcessElement since output is associated with the current window with 
>>>>> the
>>>>> input element that is being processed.
>>>>>
>>>>> It is ok to do this in @FinishBundle since there is no existing
>>>>> windowing context and when you output that element is assigned to an
>>>>> appropriate window.
>>>>>
>>>>> Filed https://issues.apache.org/jira/browse/BEAM-14064
>>>>>
>>>>> On Mon, Mar 7, 2022 at 7:44 AM Emils Solmanis <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi all,
>>>>>> I think we’re hitting a regression in ElasticIO batch writing.
>>>>>>
>>>>>> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably
>>>>>> certain it’s this PR https://github.com/apache/beam/pull/15381
>>>>>>
>>>>>> Our scenario is pretty trivial, we read off Pubsub and write to
>>>>>> Elastic in a streaming job, the config for the source and sink is
>>>>>> respectively
>>>>>>
>>>>>> pipeline.apply(
>>>>>>             PubsubIO.readStrings().fromSubscription(subscription)
>>>>>>         ).apply(ParseJsons.of(OurObject::class.java))
>>>>>>             .setCoder(KryoCoder.of())
>>>>>>
>>>>>> and
>>>>>>
>>>>>> ElasticsearchIO.write()
>>>>>>             .withUseStatefulBatches(true)
>>>>>>             .withMaxParallelRequestsPerWindow(1)
>>>>>>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>>>>>>             // 5 bytes **> KiB **> MiB, so 5 MiB
>>>>>>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>>>>>>             // # of docs
>>>>>>             .withMaxBatchSize(1000)
>>>>>>             .withConnectionConfiguration(
>>>>>>                 ElasticsearchIO.ConnectionConfiguration.create(
>>>>>>                     arrayOf(host),
>>>>>>                     "fubar",
>>>>>>                     "_doc"
>>>>>>                 ).withConnectTimeout(5000)
>>>>>>                     .withSocketTimeout(30000)
>>>>>>             )
>>>>>>             .withRetryConfiguration(
>>>>>>                 ElasticsearchIO.RetryConfiguration.create(
>>>>>>                     10,
>>>>>>                     // the duration is wall clock, against the 
>>>>>> connection and socket timeouts specified
>>>>>>                     // above. I.e., 10 x 30s is gonna be more than 3 
>>>>>> minutes, so if we're getting
>>>>>>                     // 10 socket timeouts in a row, this would ignore 
>>>>>> the "10" part and terminate
>>>>>>                     // after 6. The idea is that in a mixed failure 
>>>>>> mode, you'd get different timeouts
>>>>>>                     // of different durations, and on average 10 x fails 
>>>>>> < 4m.
>>>>>>                     // That said, 4m is arbitrary, so adjust as and when 
>>>>>> needed.
>>>>>>                     Duration.standardMinutes(4)
>>>>>>                 )
>>>>>>             )
>>>>>>             .withIdFn { f: JsonNode -> f["id"].asText() }
>>>>>>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>>>>>>             .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") 
>>>>>> == "delete" }
>>>>>>
>>>>>> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in
>>>>>> the consumer, due to alleged time skew, specifically
>>>>>>
>>>>>> 2022-03-07 10:48:37.886 GMTError message from worker: 
>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp 
>>>>>> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the 
>>>>>> timestamp of the
>>>>>> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
>>>>>> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
>>>>>> DoFn#getAllowedTimestampSkew() Javadoc
>>>>>> for details on changing the allowed skew.
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
>>>>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
>>>>>>
>>>>>> I’ve bisected it and 2.34 works fine, 2.35 is the first version this
>>>>>> breaks, and it seems like the code in the trace is largely added by the 
>>>>>> PR
>>>>>> linked above. The error usually claims a skew of a few seconds, but
>>>>>> obviously I can’t override getAllowedTimestampSkew() on the internal
>>>>>> Elastic DoFn, and it’s marked deprecated anyway.
>>>>>>
>>>>>> I’m happy to raise a JIRA but I’m not 100% sure what the code was
>>>>>> intending to fix, and additionally, I’d also be happy if someone else can
>>>>>> reproduce this or knows of similar reports. I feel like what we’re doing 
>>>>>> is
>>>>>> not *that* uncommon a scenario, so I would have thought someone else
>>>>>> would have hit this by now.
>>>>>>
>>>>>> Best,
>>>>>> Emils
>>>>>>
>>>>>

Reply via email to