Re: Issues with watermark alignment in Flink 1.15

2022-07-14 Thread Jun Qin
Found the reason: it does not work together with 
.withIdleness(Duration.ofSeconds(1))

Isn't this a valid scenario: one subtask has multiple input streams/channels 
where some are idle, others have large watermark skews?

In addition, do we expect that the watermark update interval in:
.withWatermarkAlignment("wm-group", maxDrift, updateInterval)
to be at the milli second level? If so, the following log msg should be at the 
DEBUG level, I think
2022-07-10 06:53:35,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]

to avoid streaming logs filling up the disk space.

Thanks
Jun


> On Jul 10, 2022, at 9:10 AM, Jun Qin  wrote:
> 
> Hi All
> 
> I am trying watermark alignment in Flink 1.15 with:
> 
> watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness(
> Duration.ofMillis(outOfOrderness))
> .withWatermarkAlignment("wm-group", Duration.ofSeconds(10), 
> Duration.ofSeconds(1))
> .withTimestampAssigner(
> (element, timestamp) -> element.getTimestamp())
> .withIdleness(Duration.ofSeconds(1));
> 
> And got the following in DEBUG logs:
> 2022-07-10 06:53:35,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]
> 2022-07-10 06:53:36,606 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from 
> subTaskId=2
> 2022-07-10 06:53:36,619 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from 
> subTaskId=1
> 2022-07-10 06:53:36,639 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from 
> subTaskId=3
> 2022-07-10 06:53:36,702 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from 
> subTaskId=0
> 2022-07-10 06:53:36,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3]
> 2022-07-10 06:53:37,229 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update 
> lock acquire time to keep lease
> 2022-07-10 06:53:37,237 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
> TryAcquireOrRenew return success
> 2022-07-10 06:53:37,237 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
> Successfully renewed lease
> 2022-07-10 06:53:37,603 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=2
> 2022-07-10 06:53:37,605 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=3
> 2022-07-10 06:53:37,616 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=1
> 2022-07-10 06:53:37,630 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=0
> 2022-07-10 06:53:37,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 
> 3]
> 2022-07-10 06:53:38,603 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=2
> 2022-07-10 06:53:38,604 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=3
> 2022-07-10 06:53:38,616 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=1
> 2022-07-10 06:53:38,630 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=0
> 2022-07-10 06:53:38,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 
> 3]
> 
> 
> Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The 
> watermark looks to be correct at beginning, then 

Issues with watermark alignment in Flink 1.15

2022-07-10 Thread Jun Qin
Hi All

I am trying watermark alignment in Flink 1.15 with:

watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness(
Duration.ofMillis(outOfOrderness))
.withWatermarkAlignment("wm-group", Duration.ofSeconds(10), 
Duration.ofSeconds(1))
.withTimestampAssigner(
(element, timestamp) -> element.getTimestamp())
.withIdleness(Duration.ofSeconds(1));

And got the following in DEBUG logs:
2022-07-10 06:53:35,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]
2022-07-10 06:53:36,606 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from subTaskId=2
2022-07-10 06:53:36,619 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from subTaskId=1
2022-07-10 06:53:36,639 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from subTaskId=3
2022-07-10 06:53:36,702 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from subTaskId=0
2022-07-10 06:53:36,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3]
2022-07-10 06:53:37,229 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update 
lock acquire time to keep lease
2022-07-10 06:53:37,237 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
TryAcquireOrRenew return success
2022-07-10 06:53:37,237 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
Successfully renewed lease
2022-07-10 06:53:37,603 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=2
2022-07-10 06:53:37,605 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=3
2022-07-10 06:53:37,616 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=1
2022-07-10 06:53:37,630 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=0
2022-07-10 06:53:37,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3]
2022-07-10 06:53:38,603 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=2
2022-07-10 06:53:38,604 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=3
2022-07-10 06:53:38,616 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=1
2022-07-10 06:53:38,630 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=0
2022-07-10 06:53:38,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3]


Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The 
watermark looks to be correct at beginning, then changed to a something related 
Long.MAX_VALUE… Feels like a buffer overflow issue..

As long as I remove the call .withWatermarkAlignment(), then all worked fine.

Any idea? 

Thanks
Jun