Hi Petr,

I just stumbled across this (slightly older) mail. Your example on pastebin is 
not available anymore but I’m guessing you have roughly these two topologies:

1.

Source1 -> Map1 -> ExtractTimestamps -| 
                                                                  | ->  Map3 …
Source2 -> Map2 -> ExtractTimestamps -|

The union is not visible at the graph level, it’s implicit in the combination 
of the two input streams.

2.

Source1 -> Map1 -| 
                              | -> ExtractTimestamps -> Map3 …
Source2 -> Map2 -|

The union is not visible at the graph level, it’s implicit in the combination 
of the two input streams.

I’m also guessing that you have a timestamp/watermark assigner where the 
watermark is the highest-seen timestamp minus some lateness bound. I think the 
behaviour is not necessarily an artefact of the Flink implementation (with maps 
and extractors being fused together) but results from the graph itself and how 
watermarks are defined and how the extractor works: in the first case, each 
stream (before the union) has its own watermark and the watermark at Map3 is 
the minimum over those watermarks. This explains why a lower watermark on the 
one stream holds back the watermark in total at Map3. In the second case, the 
two streams are unioned together before extracting a timestamp/watermark and 
the choice of timestamp extractor (which takes the highest-seen timestamp) 
means that the watermark now advances “faster” because there is logically not a 
slower, separate stream anymore.

Is that analysis correct? Does my description roughly make sense?

Best,
Aljoscha

> On 6. May 2017, at 15:00, Petr Novotnik <petr.novot...@firma.seznam.cz> wrote:
> 
> Hello Flinkers,
> 
> Given this small example program:
> 
>> https://pastebin.com/30JbbgpH
> 
> I'd expect the output:
> 
>> one|three
>> two|four
> 
> However, I consistently receive ...
> 
>> one
>> two|four
> 
> ... due to "three" being considered a late-comer which then gets
> discarded. When I remove `assignTimestampsAndWatermarks` after the
> `union` and place it separately on each of the union's inputs, i.e.
> before the `union`, I get what I expect.
> 
> Now, after digging through Flink's source code, this behavior actually
> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
> operators form one task). Though, from a user/api perspective, it is at
> least surprising.
> 
> I wanted to ask whether kind of behavior is known, indented or maybe
> something to be improved to avoid the gotcha?
> 
> Many thanks in advance,
> Pete.
> 

Reply via email to