Hi Jan,

I am not sure what is happening. Operator A does not seem to be chained to
the source (which produces the watermarks) so
the check about increasing watermarks should be also applied there. BTW, I
assume that bottom left you mean the one that
starts with "activeDevices:takePresent..." (Op. A)  and
"activeDevices:stepLength..." (Op. B).

I am wondering if it can be that the WebUi is not consistent across
different operators.
For example, the watermark of Op B was simply not updated in the WebUI.

I also cc Chesnay who may have a better insight about the WebUi.

Cheers,
Kostas


On Wed, Aug 7, 2019 at 3:25 PM Jan Lukavský <je...@seznam.cz> wrote:

> Code would be a little complicated, because it is wrapped with several
> layers of other APIs (Beam being one of them, but there is also other
> layer).
>
> I can provide the job graph [1]  a screenshot of the two watermarks [2].
> The watermarks are taken from the two operators on bottom left.
>
> Essentially, the job reads from Google cloud storage and simultaneously
> from Kafka. On cloud storage are stored blob files containing historical
> events and these blobs are marked with event time range (e.g. file is
> named BLOB_EVENTS_TIMESTAMP1_TIMESTAMP2), and those timestamps are used
> to generate watermarks from the batch storage (files are read in sorted
> order).
>
> Does that help, or would you like more details?
>
> Jan
>
> [1] https://transfer.sh/v473f/jobgraph.png
> [2] https://transfer.sh/iDg1A/watermarks.png
>
> On 8/7/19 3:04 PM, Kostas Kloudas wrote:
> > But are they chained together? Could you provide the code from your job,
> at
> > least until operator A?
> >
> > On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <je...@seznam.cz> wrote:
> >
> >> Actually, operator A is intermediate, source is preceding it.
> >>
> >> On 8/7/19 2:44 PM, Kostas Kloudas wrote:
> >>> Hi Jan,
> >>>
> >>> After looking at the code, my point 1) is false for *intermediate*
> tasks
> >>> and if you are
> >>> using a watermark assigner. This means that in these cases, Flink
> checks
> >>> that the
> >>> "next" watermark is greater than the "previous" one.
> >>>
> >>> But if your operator A is a source and you emit watermarks from the
> >> source,
> >>> then
> >>> it can happen that your watermark appears to go backwards on operator
> A,
> >>> but
> >>> operator B does the "correction" by discarding smaller watermarks. That
> >> can
> >>> explain
> >>> your observation.
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>> On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>
> >>>> Hi Kostas,
> >>>>
> >>>> thanks for reaction, comments inline.
> >>>>
> >>>> On 8/7/19 1:59 PM, Kostas Kloudas wrote:
> >>>>> Hi Jan,
> >>>>>
> >>>>> Two pointers that may help you explain the behavior are the
> following:
> >>>>>
> >>>>> 1) If you have a custom watermark generator, I do not think that
> Flink
> >>>>> checks if it emits only monotonically
> >>>>> increasing watermarks. This is the responsibility of the generator
> >>>> itself.
> >>>>> This means that although you operator A
> >>>>> is topologically before operator B, operator A may have a smaller
> >>>> watermark
> >>>>> if your watermark generator allows so.
> >>>> I do generate watermarks by custom source, but I believe that the
> >>>> generated sequence is monotonic. But still, I'm not sure, that even if
> >>>> it was the case, that the generated watermark actually decreases,
> would
> >>>> that mean, that downstream operator after source (operator A) would
> >>>> actually "go back in time"?
> >>>>> 2) Flink currently does not checkpoint the last seen watermark (
> >>>>> https://issues.apache.org/jira/browse/FLINK-5601).
> >>>>> This means that after restoring, your (event) time is assumed to be
> >>>>> Long.Min until the first new watermark comes.
> >>>>> So if you observed late data not being late anymore or sth similar,
> >> then
> >>>> it
> >>>>> may not be that the two operators have
> >>>>> different watermarks but that after restoring event time rolls back
> to
> >>>> the
> >>>>> "beginning of time".
> >>>> I actually didn't observe any wrong or unexpected behavior, exceptions
> >>>> or wrong outputs. I just noticed this on Flink's WebUI and it looked
> >>>> strange to me. Could it be just that the WebUI showed older watermark
> >>>> for operator A? Strange was, that the watermarks were my screen long
> >>>> enough to take a screenshot (so at least say 10 seconds displaying
> >>>> watermark of operator A less than the one of operator B). Even if
> >>>> watermarks are not checkpointed, would it still be possible for
> >>>> watermark of operator B to be actually greater? I'm still confused of
> >>>> how this could happen, because (in my understanding) output watermark
> of
> >>>> operator A should be greater or equal to input watermark of B (because
> >>>> it takes minimum of inputs).
> >>>>
> >>>> Sorry if I'm too digging into this, but I don't like things I cannot
> >>>> explain, as they might point out to some bugs somewhere. :-) Or that
> my
> >>>> mental model it not aligned with reality.
> >>>>
> >>>> Jan
> >>>>
> >>>>> I hope this helps,
> >>>>> Kostas
> >>>>>
> >>>>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> I have just come across a weird state of operators after restore
> from
> >>>>>> checkpoint. After the restore, two operators that are connected
> (i.e.
> >>>>>> operator A is input of operator B) ended up with watermark of
> >> operator A
> >>>>>> being less than watermark of operator B. I don't know how to explain
> >>>>>> this. Can it be normal or does it signal a bug somewhere? If I
> >>>>>> understand Flink's checkpointing correctly, the checkpoint barrier
> >> flows
> >>>>>> from one operator to another, so the watermark should be aligned.
> >>>>>>
> >>>>>> I'm running a Beam pipeline on Flink 1.8.1.
> >>>>>>
> >>>>>> Am I missing something?
> >>>>>>
> >>>>>> Many thanks for comments,
> >>>>>>
> >>>>>>      Jan
> >>>>>>
> >>>>>>
>

Reply via email to