Hi Jan,

After looking at the code, my point 1) is false for *intermediate* tasks
and if you are
using a watermark assigner. This means that in these cases, Flink checks
that the
"next" watermark is greater than the "previous" one.

But if your operator A is a source and you emit watermarks from the source,
then
it can happen that your watermark appears to go backwards on operator A,
but
operator B does the "correction" by discarding smaller watermarks. That can
explain
your observation.

Cheers,
Kostas

On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kostas,
>
> thanks for reaction, comments inline.
>
> On 8/7/19 1:59 PM, Kostas Kloudas wrote:
> > Hi Jan,
> >
> > Two pointers that may help you explain the behavior are the following:
> >
> > 1) If you have a custom watermark generator, I do not think that Flink
> > checks if it emits only monotonically
> > increasing watermarks. This is the responsibility of the generator
> itself.
> > This means that although you operator A
> > is topologically before operator B, operator A may have a smaller
> watermark
> > if your watermark generator allows so.
> I do generate watermarks by custom source, but I believe that the
> generated sequence is monotonic. But still, I'm not sure, that even if
> it was the case, that the generated watermark actually decreases, would
> that mean, that downstream operator after source (operator A) would
> actually "go back in time"?
> >
> > 2) Flink currently does not checkpoint the last seen watermark (
> > https://issues.apache.org/jira/browse/FLINK-5601).
> > This means that after restoring, your (event) time is assumed to be
> > Long.Min until the first new watermark comes.
> > So if you observed late data not being late anymore or sth similar, then
> it
> > may not be that the two operators have
> > different watermarks but that after restoring event time rolls back to
> the
> > "beginning of time".
>
> I actually didn't observe any wrong or unexpected behavior, exceptions
> or wrong outputs. I just noticed this on Flink's WebUI and it looked
> strange to me. Could it be just that the WebUI showed older watermark
> for operator A? Strange was, that the watermarks were my screen long
> enough to take a screenshot (so at least say 10 seconds displaying
> watermark of operator A less than the one of operator B). Even if
> watermarks are not checkpointed, would it still be possible for
> watermark of operator B to be actually greater? I'm still confused of
> how this could happen, because (in my understanding) output watermark of
> operator A should be greater or equal to input watermark of B (because
> it takes minimum of inputs).
>
> Sorry if I'm too digging into this, but I don't like things I cannot
> explain, as they might point out to some bugs somewhere. :-) Or that my
> mental model it not aligned with reality.
>
> Jan
>
> >
> > I hope this helps,
> > Kostas
> >
> > On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz> wrote:
> >
> >> Hi all,
> >>
> >> I have just come across a weird state of operators after restore from
> >> checkpoint. After the restore, two operators that are connected (i.e.
> >> operator A is input of operator B) ended up with watermark of operator A
> >> being less than watermark of operator B. I don't know how to explain
> >> this. Can it be normal or does it signal a bug somewhere? If I
> >> understand Flink's checkpointing correctly, the checkpoint barrier flows
> >> from one operator to another, so the watermark should be aligned.
> >>
> >> I'm running a Beam pipeline on Flink 1.8.1.
> >>
> >> Am I missing something?
> >>
> >> Many thanks for comments,
> >>
> >>    Jan
> >>
> >>
>

Reply via email to