Hi Jan, I am not sure what is happening. Operator A does not seem to be chained to the source (which produces the watermarks) so the check about increasing watermarks should be also applied there. BTW, I assume that bottom left you mean the one that starts with "activeDevices:takePresent..." (Op. A) and "activeDevices:stepLength..." (Op. B).
I am wondering if it can be that the WebUi is not consistent across different operators. For example, the watermark of Op B was simply not updated in the WebUI. I also cc Chesnay who may have a better insight about the WebUi. Cheers, Kostas On Wed, Aug 7, 2019 at 3:25 PM Jan Lukavský <je...@seznam.cz> wrote: > Code would be a little complicated, because it is wrapped with several > layers of other APIs (Beam being one of them, but there is also other > layer). > > I can provide the job graph [1] a screenshot of the two watermarks [2]. > The watermarks are taken from the two operators on bottom left. > > Essentially, the job reads from Google cloud storage and simultaneously > from Kafka. On cloud storage are stored blob files containing historical > events and these blobs are marked with event time range (e.g. file is > named BLOB_EVENTS_TIMESTAMP1_TIMESTAMP2), and those timestamps are used > to generate watermarks from the batch storage (files are read in sorted > order). > > Does that help, or would you like more details? > > Jan > > [1] https://transfer.sh/v473f/jobgraph.png > [2] https://transfer.sh/iDg1A/watermarks.png > > On 8/7/19 3:04 PM, Kostas Kloudas wrote: > > But are they chained together? Could you provide the code from your job, > at > > least until operator A? > > > > On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <je...@seznam.cz> wrote: > > > >> Actually, operator A is intermediate, source is preceding it. > >> > >> On 8/7/19 2:44 PM, Kostas Kloudas wrote: > >>> Hi Jan, > >>> > >>> After looking at the code, my point 1) is false for *intermediate* > tasks > >>> and if you are > >>> using a watermark assigner. This means that in these cases, Flink > checks > >>> that the > >>> "next" watermark is greater than the "previous" one. > >>> > >>> But if your operator A is a source and you emit watermarks from the > >> source, > >>> then > >>> it can happen that your watermark appears to go backwards on operator > A, > >>> but > >>> operator B does the "correction" by discarding smaller watermarks. That > >> can > >>> explain > >>> your observation. > >>> > >>> Cheers, > >>> Kostas > >>> > >>> On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <je...@seznam.cz> wrote: > >>> > >>>> Hi Kostas, > >>>> > >>>> thanks for reaction, comments inline. > >>>> > >>>> On 8/7/19 1:59 PM, Kostas Kloudas wrote: > >>>>> Hi Jan, > >>>>> > >>>>> Two pointers that may help you explain the behavior are the > following: > >>>>> > >>>>> 1) If you have a custom watermark generator, I do not think that > Flink > >>>>> checks if it emits only monotonically > >>>>> increasing watermarks. This is the responsibility of the generator > >>>> itself. > >>>>> This means that although you operator A > >>>>> is topologically before operator B, operator A may have a smaller > >>>> watermark > >>>>> if your watermark generator allows so. > >>>> I do generate watermarks by custom source, but I believe that the > >>>> generated sequence is monotonic. But still, I'm not sure, that even if > >>>> it was the case, that the generated watermark actually decreases, > would > >>>> that mean, that downstream operator after source (operator A) would > >>>> actually "go back in time"? > >>>>> 2) Flink currently does not checkpoint the last seen watermark ( > >>>>> https://issues.apache.org/jira/browse/FLINK-5601). > >>>>> This means that after restoring, your (event) time is assumed to be > >>>>> Long.Min until the first new watermark comes. > >>>>> So if you observed late data not being late anymore or sth similar, > >> then > >>>> it > >>>>> may not be that the two operators have > >>>>> different watermarks but that after restoring event time rolls back > to > >>>> the > >>>>> "beginning of time". > >>>> I actually didn't observe any wrong or unexpected behavior, exceptions > >>>> or wrong outputs. I just noticed this on Flink's WebUI and it looked > >>>> strange to me. Could it be just that the WebUI showed older watermark > >>>> for operator A? Strange was, that the watermarks were my screen long > >>>> enough to take a screenshot (so at least say 10 seconds displaying > >>>> watermark of operator A less than the one of operator B). Even if > >>>> watermarks are not checkpointed, would it still be possible for > >>>> watermark of operator B to be actually greater? I'm still confused of > >>>> how this could happen, because (in my understanding) output watermark > of > >>>> operator A should be greater or equal to input watermark of B (because > >>>> it takes minimum of inputs). > >>>> > >>>> Sorry if I'm too digging into this, but I don't like things I cannot > >>>> explain, as they might point out to some bugs somewhere. :-) Or that > my > >>>> mental model it not aligned with reality. > >>>> > >>>> Jan > >>>> > >>>>> I hope this helps, > >>>>> Kostas > >>>>> > >>>>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> I have just come across a weird state of operators after restore > from > >>>>>> checkpoint. After the restore, two operators that are connected > (i.e. > >>>>>> operator A is input of operator B) ended up with watermark of > >> operator A > >>>>>> being less than watermark of operator B. I don't know how to explain > >>>>>> this. Can it be normal or does it signal a bug somewhere? If I > >>>>>> understand Flink's checkpointing correctly, the checkpoint barrier > >> flows > >>>>>> from one operator to another, so the watermark should be aligned. > >>>>>> > >>>>>> I'm running a Beam pipeline on Flink 1.8.1. > >>>>>> > >>>>>> Am I missing something? > >>>>>> > >>>>>> Many thanks for comments, > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>> >