Interesting discussion. :-)
Answers inline.
On 3/8/22 22:00, Evan Galpin wrote:
Thanks Jan for confirming that the fix looks alright. I also found a
PR[1] that appears to be a good case study of the Timer watermark hold
technique that you previously mentioned so I'll study that a bit for
my own understanding and future use. I was also previously missing
the notion that a singular bundle could contain elements from many
disparate Windows, so that's a great upgrade to my own mental model haha.
I have a few follow-up questions for my own understanding (hopefully
not off topic).
1. Am I understanding correctly that using ProcessContext#output to
output a given element will preserve the element's input
window(s)? Or is it the case that when using
ProcessContext#output, any buffered elements will all be output to
whatever window the most recent element processed
by @ProcessElement belongs to?
There is no way Beam can distinguish *how* you produced elements emitted
from @ProcessElement (if those are result of some in-memory buffering or
direct result of computation of the single input element currently being
processed). From that it follows, that window(s) associated with the
output will be equal to window(s) of the input element currently
processed (generally, the model allows multiple applications of window
function, which is why window.maxTimestamp is 1 ms shifted backwards, to
be still part of the window itself, so that the window functions based
on timestamps are idempotent).
1. Is it safe to emit outputs from @FinishBundle to windows which may
be older than the watermark? I believe this could still be
happening given the current ElasticsearchIO implementation where
any buffered elements are output in @FinishBundle using the same
window they were associated with on input. Intuitively, that
sounds as though it could be a correctness bug if the watermark
had progressed beyond those windows.
This is great question that probably deserves deeper attention.
a) first of all - the fact, that you potentially could output late
data, that previously were not late is _potentially_ a correctness bug,
if you have any downstream logic, that performs any additional grouping
and - most notably - relies on causality being preserved (which is how
our universe works, so this is a natural requirement). Changing element
from "on time" to "late" can result in downstream processing see effect
prior to cause (in event time!, because the watermark move can cause
downstream timers to fire). I wrote something about this in [1].
Generally, if there is no grouping (or your application logic accounts
for the swapping of cause and effect), then this is not an issue. If you
produce a PCollection for user, then you don't know the user does with
it and therefore should pay attention to it.
b) second, this effect would not appear if mid-bundle output
watermarks updates were not possible. I emphasize that I don't know if
this is allowed in the model, but I *suppose* it is (@kenn, @reuven or
@luke can you correct me if I'm wrong?). I have some doubts if it is
correct, though. It seems that it causes generally issues with in-memory
buffering and 'outputWithTimestamp' in stateless DoFns. If a DoFn
implements @StartBundle and @FinishBundle it seems, that we should hold
output watermark between these two calls. But that would turn stateless
DoFn into stateful, which is ... unfortunate. :)
I hope I made things a little more clear rather than more obfuscated. :)
Jan
[1] https://twitter.com/janl_apache/status/1478757956263071745
It's definitely interesting to think about the idea of enforcing
watermark update only between bundles, but presumably that would mean
quite extensive alterations.
- Evan
[1] https://github.com/apache/beam/pull/15249
On Tue, Mar 8, 2022 at 10:29 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi Evan,
the fix looks good to me, as long as the timestamp of the buffered
data need to be preserved downstream. Generally I think it
*should* be possible to output in-memory buffered data in
@ProcessElement (and @FinishBundle), the case where you need
timers is when your buffer needs to span multiple bundles. It that
case it also must be stored in regular state and not in-memory.
It seems to me that there is some logical gap in how we handle the
per-bundle buffering. I see two possibilities:
a) either we allow in the model to change the output watermark
*while* processing a bundle, - in which case it is logical
requirement to have output timestamp from @ProcessElement no
earlier than timestamp of the current element (because that way we
preserve the "on time", "late" status of the current element, we
don't swap anything), or
b) we enforce output watermark update only in-between of bundles
- in that case the requirement could be relaxed that the output
timestamp from @ProcessElement might be no earlier than the
minimum of timestamps inside the bundle
I'm afraid that our current position is a). But in that case it is
somewhat questionable if it is semantically correct to use
outputWithTimestamp() in @ProcessElement of stateless DoFn at all.
It can move timestamps only to future instant (and inside same
window!), which has little semantic meaning to me. Looks more
error prone than useful.
Jan
On 3/8/22 15:53, Evan Galpin wrote:
Thanks Jan, it's interesting to read about the handling of
timestamp in cases employing a buffering pattern. In the case of
the ES write transform, buffered data could be output from
ProcessElement or FinishBundle. It's the case where data is
output from ProcessElement that the error reported at the start
of this thread shows up. Based on what you described, it sounds
like the PR[1] I made to "fix" this issue is actually fixing it
by transitioning data from being late to being on time and
outputting all buffered data into a non-deterministic window
where the output heuristic is later satisfied (number of elements
buffered or max time since last output). It seems there's 2
issues as a result:
1. The window to which input elements belong is not being
preserved. Presumably we want IOs to leave an element's window
unaltered?
2. The watermark of the system might be incorrect given that
buffered data is assumed processed and allows the watermark to pass?
It would definitely add complexity to the IO to employ timers to
address this, but if that's the preferred or only solution I'll
put some thought into how to implement that solution.
Thanks,
Evan
[1] https://github.com/apache/beam/pull/16744
On Tue, Mar 8, 2022 at 3:47 AM Jan Lukavský <je...@seznam.cz> wrote:
Ah, sorry, the code flushes in @FinishBundle. Is it allowed
to update output watermark while a bundle is being processed?
That seems it could also cause the "watermark skip" problem,
which is definitely an issue (and is probably the reason why
the check fails?).
On 3/8/22 09:35, Jan Lukavský wrote:
The buffering seems incorrect to me. Whenever there is a
buffer, we need to make sure we hold the output watermark,
otherwise the watermark might "jump over" a buffered element
transitioning it from "on-time" to "late", which would be a
correctness bug (we can transition elements only from "late"
to "on-time", never the other way around). The alternative
is to use @FinishBundle to do the flushing, but might not be
appropriate here.
Currently, the only way to limit the progress of output
watermark is by setting a timer with output timestamp that
has the timestamp of the earliest element in the buffer.
There was a thread that was discussing this in more details [1].
Jan
[1]
https://lists.apache.org/thread/y4n5rnhfmh47orhhr92og2r0plvgopl8
On 3/7/22 19:54, Evan Galpin wrote:
In general, I'd also really like to improve my
understanding and learn more about how the employed
buffering can cause this skew. Is it because the call to
"flush" is happening from a different "current window" than
the elements were originally buffered from? I'm actually
thinking that the PR[1] to "fix" this would have had the
side effect of outputting buffered elements into the window
from which "flush" was called rather than the window from
which the buffered data originated. I suppose that could be
problematic, but should at least satisfy the validation code.
[1] https://github.com/apache/beam/pull/16744
On Mon, Mar 7, 2022 at 1:39 PM Evan Galpin
<evan.gal...@gmail.com> wrote:
x-post from the associated Jira ticket[0]
Fortunately/unfortunately this same issue struck me as
well, and I opened a PR[1] to use
`ProcessContext#output` rather than
`ProcessContext#outputWithTimestamp`. I believe that
should resolve this issue, it has for me when running
jobs with a vendored SDK with that change included. Do
folks feel this change to be cherry-picked into 2.37.0?
The change also prompted a question to the mailing
list[2] about skew validation difference between
ProcessContext vs FinishBundleContext (where there is
no ability to compute skew as I understand it).
[0] https://issues.apache.org/jira/browse/BEAM-14064
[1] https://github.com/apache/beam/pull/16744
[2]
https://lists.apache.org/thread/33kj1yjmn6kkvpl4vz02vnfyn2bpzycp
On Mon, Mar 7, 2022 at 12:41 PM Luke Cwik
<lc...@google.com> wrote:
This is specifically a case where the
@ProcessElement saw window X for element X0 and
buffered it into memory and then when processing
window Y and element Y0 wanted to flush
previously buffered element X0. This all occurred
as part of the same bundle.
In general, yes, outputting to an earlier window is
problematic.
On Mon, Mar 7, 2022 at 9:32 AM Reuven Lax
<re...@google.com> wrote:
Outputting to an earlier window is
problematic,as the watermark can never be
correct if a DoFn can move time backwards
arbitrarily.
On Mon, Mar 7, 2022 at 9:01 AM Luke Cwik
<lc...@google.com> wrote:
A good question would be should I be able
to output to a different window from the
current @ProcessElement call, like what we
can do from @FinishBundle to handle these
buffering scenarios.
On Mon, Mar 7, 2022 at 8:53 AM Luke Cwik
<lc...@google.com> wrote:
The issue is that ElasticsearchIO is
collecting results from elements in
window X and then trying to output them
in window Y when flushing the batch.
This exposed a bug where elements that
were being buffered were being output
as part of a different window than what
the window that produced them was.
This became visible because
validation was added recently to ensure
that when the pipeline is processing
elements in window X that output with a
timestamp is valid for window X. Note
that this validation only occurs in
@ProcessElement since output is
associated with the current window with
the input element that is being processed.
It is ok to do this in @FinishBundle
since there is no existing windowing
context and when you output that
element is assigned to an appropriate
window.
Filed
https://issues.apache.org/jira/browse/BEAM-14064
On Mon, Mar 7, 2022 at 7:44 AM Emils
Solmanis <emils.solma...@rvu.co.uk> wrote:
Hi all,
I think we’re hitting a regression
in ElasticIO batch writing.
We’ve bisected it to being
introduced in 2.35.0, and I’m
reasonably certain it’s this PR
https://github.com/apache/beam/pull/15381
Our scenario is pretty trivial, we
read off Pubsub and write to
Elastic in a streaming job, the
config for the source and sink is
respectively
|pipeline.apply(
PubsubIO.readStrings().fromSubscription(subscription)
).apply(ParseJsons.of(OurObject::class.java))
.setCoder(KryoCoder.of()) |
and
|ElasticsearchIO.write()
.withUseStatefulBatches(true)
.withMaxParallelRequestsPerWindow(1)
.withMaxBufferingDuration(Duration.standardSeconds(30))
// 5 bytes **> KiB **> MiB, so 5
MiB .withMaxBatchSizeBytes(5L *
1024 * 1024) // # of docs
.withMaxBatchSize(1000)
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
arrayOf(host), "fubar", "_doc"
).withConnectTimeout(5000)
.withSocketTimeout(30000) )
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(
10, // the duration is wall clock,
against the connection and socket
timeouts specified // above. I.e.,
10 x 30s is gonna be more than 3
minutes, so if we're getting // 10
socket timeouts in a row, this
would ignore the "10" part and
terminate // after 6. The idea is
that in a mixed failure mode, you'd
get different timeouts // of
different durations, and on average
10 x fails < 4m. // That said, 4m
is arbitrary, so adjust as and when
needed. Duration.standardMinutes(4)
) ) .withIdFn { f: JsonNode ->
f["id"].asText() } .withIndexFn {
f: JsonNode ->
f["schema_name"].asText() }
.withIsDeleteFn { f: JsonNode ->
f["_action"].asText("noop") ==
"delete" } |
We recently tried upgrading 2.33 to
2.36 and immediately hit a bug in
the consumer, due to alleged time
skew, specifically
|2022-03-07 10:48:37.886 GMTError
message from worker:
java.lang.IllegalArgumentException:
Cannot output with timestamp
2022-03-07T10:43:38.640Z. Output
timestamps must be no earlier than
the timestamp of the current input
(2022-03-07T10:43:43.562Z) minus
the allowed skew (0 milliseconds)
and no later than
294247-01-10T04:00:54.775Z. See the
DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the
allowed skew.
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
|
I’ve bisected it and 2.34 works
fine, 2.35 is the first version
this breaks, and it seems like the
code in the trace is largely added
by the PR linked above. The error
usually claims a skew of a few
seconds, but obviously I can’t
override
|getAllowedTimestampSkew()| on the
internal Elastic DoFn, and it’s
marked deprecated anyway.
I’m happy to raise a JIRA but I’m
not 100% sure what the code was
intending to fix, and additionally,
I’d also be happy if someone else
can reproduce this or knows of
similar reports. I feel like what
we’re doing is not *that* uncommon
a scenario, so I would have thought
someone else would have hit this by
now.
Best,
Emils