Hi Kostas,

thanks for reaction, comments inline.

On 8/7/19 1:59 PM, Kostas Kloudas wrote:
Hi Jan,

Two pointers that may help you explain the behavior are the following:

1) If you have a custom watermark generator, I do not think that Flink
checks if it emits only monotonically
increasing watermarks. This is the responsibility of the generator itself.
This means that although you operator A
is topologically before operator B, operator A may have a smaller watermark
if your watermark generator allows so.
I do generate watermarks by custom source, but I believe that the generated sequence is monotonic. But still, I'm not sure, that even if it was the case, that the generated watermark actually decreases, would that mean, that downstream operator after source (operator A) would actually "go back in time"?

2) Flink currently does not checkpoint the last seen watermark (
https://issues.apache.org/jira/browse/FLINK-5601).
This means that after restoring, your (event) time is assumed to be
Long.Min until the first new watermark comes.
So if you observed late data not being late anymore or sth similar, then it
may not be that the two operators have
different watermarks but that after restoring event time rolls back to the
"beginning of time".

I actually didn't observe any wrong or unexpected behavior, exceptions or wrong outputs. I just noticed this on Flink's WebUI and it looked strange to me. Could it be just that the WebUI showed older watermark for operator A? Strange was, that the watermarks were my screen long enough to take a screenshot (so at least say 10 seconds displaying watermark of operator A less than the one of operator B). Even if watermarks are not checkpointed, would it still be possible for watermark of operator B to be actually greater? I'm still confused of how this could happen, because (in my understanding) output watermark of operator A should be greater or equal to input watermark of B (because it takes minimum of inputs).

Sorry if I'm too digging into this, but I don't like things I cannot explain, as they might point out to some bugs somewhere. :-) Or that my mental model it not aligned with reality.

Jan


I hope this helps,
Kostas

On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz> wrote:

Hi all,

I have just come across a weird state of operators after restore from
checkpoint. After the restore, two operators that are connected (i.e.
operator A is input of operator B) ended up with watermark of operator A
being less than watermark of operator B. I don't know how to explain
this. Can it be normal or does it signal a bug somewhere? If I
understand Flink's checkpointing correctly, the checkpoint barrier flows
from one operator to another, so the watermark should be aligned.

I'm running a Beam pipeline on Flink 1.8.1.

Am I missing something?

Many thanks for comments,

   Jan


Reply via email to