Hi Jan, After looking at the code, my point 1) is false for *intermediate* tasks and if you are using a watermark assigner. This means that in these cases, Flink checks that the "next" watermark is greater than the "previous" one.
But if your operator A is a source and you emit watermarks from the source, then it can happen that your watermark appears to go backwards on operator A, but operator B does the "correction" by discarding smaller watermarks. That can explain your observation. Cheers, Kostas On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <je...@seznam.cz> wrote: > Hi Kostas, > > thanks for reaction, comments inline. > > On 8/7/19 1:59 PM, Kostas Kloudas wrote: > > Hi Jan, > > > > Two pointers that may help you explain the behavior are the following: > > > > 1) If you have a custom watermark generator, I do not think that Flink > > checks if it emits only monotonically > > increasing watermarks. This is the responsibility of the generator > itself. > > This means that although you operator A > > is topologically before operator B, operator A may have a smaller > watermark > > if your watermark generator allows so. > I do generate watermarks by custom source, but I believe that the > generated sequence is monotonic. But still, I'm not sure, that even if > it was the case, that the generated watermark actually decreases, would > that mean, that downstream operator after source (operator A) would > actually "go back in time"? > > > > 2) Flink currently does not checkpoint the last seen watermark ( > > https://issues.apache.org/jira/browse/FLINK-5601). > > This means that after restoring, your (event) time is assumed to be > > Long.Min until the first new watermark comes. > > So if you observed late data not being late anymore or sth similar, then > it > > may not be that the two operators have > > different watermarks but that after restoring event time rolls back to > the > > "beginning of time". > > I actually didn't observe any wrong or unexpected behavior, exceptions > or wrong outputs. I just noticed this on Flink's WebUI and it looked > strange to me. Could it be just that the WebUI showed older watermark > for operator A? Strange was, that the watermarks were my screen long > enough to take a screenshot (so at least say 10 seconds displaying > watermark of operator A less than the one of operator B). Even if > watermarks are not checkpointed, would it still be possible for > watermark of operator B to be actually greater? I'm still confused of > how this could happen, because (in my understanding) output watermark of > operator A should be greater or equal to input watermark of B (because > it takes minimum of inputs). > > Sorry if I'm too digging into this, but I don't like things I cannot > explain, as they might point out to some bugs somewhere. :-) Or that my > mental model it not aligned with reality. > > Jan > > > > > I hope this helps, > > Kostas > > > > On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz> wrote: > > > >> Hi all, > >> > >> I have just come across a weird state of operators after restore from > >> checkpoint. After the restore, two operators that are connected (i.e. > >> operator A is input of operator B) ended up with watermark of operator A > >> being less than watermark of operator B. I don't know how to explain > >> this. Can it be normal or does it signal a bug somewhere? If I > >> understand Flink's checkpointing correctly, the checkpoint barrier flows > >> from one operator to another, so the watermark should be aligned. > >> > >> I'm running a Beam pipeline on Flink 1.8.1. > >> > >> Am I missing something? > >> > >> Many thanks for comments, > >> > >> Jan > >> > >> >