For completeness, this still happens with Flink 1.14.4

Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Sent: Friday, March 11, 2022 12:21 AM
To: user@flink.apache.org <user@flink.apache.org>
Cc: pnowoj...@apache.org <pnowoj...@apache.org>
Subject: Re: Interval join operator is not forwarding watermarks correctly

I think I managed to create a reproducible example [1], I think it's due to the 
use of window + join + window. When I run the test, I never see the print 
output, but if I uncomment part of the code in the watermark generator to mark 
it as idle more quickly, it starts working after a while.

[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Sent: Thursday, March 10, 2022 7:47 PM
To: user@flink.apache.org <user@flink.apache.org>
Cc: pnowoj...@apache.org <pnowoj...@apache.org>
Subject: RE: Interval join operator is not forwarding watermarks correctly


I found [1] and [2], which are closed, but could be related?



[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934



Regards,

Alexis.



From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly



Hello,



I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the 
interval join in my pipeline is no longer working. More specifically, I have a 
sliding window after the interval join, and the window isn’t firing. After many 
tests, I ended up creating a custom operator that extends IntervalJoinOperator 
and I overrode processWatermark1() and processWatermark2() to add logs and 
check when they are called. I can see that processWatermark1() isn’t called.



For completeness, this is how I use my custom operator:



joinOperator = new CustomIntervalJoinOperator(…);



stream1.connect(stream2)

    .keyBy(selector1, selector2)

    .transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);



---



Some more information in case it’s relevant:



- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom strategies. I 
also log watermark creation, and I can see that watermarks are indeed emitted 
as expected in both streams.



Strangely, my watermark strategies mark themselves idle if they don’t receive 
new events after 10 minutes, and if I send some events and wait 10 minutes, 
processWatermark1() is called! On the other hand, if I continuously send 
events, it is never called.



Is this a known issue?



Regards,

Alexis.


Reply via email to