Found the reason: it does not work together with .withIdleness(Duration.ofSeconds(1))
Isn't this a valid scenario: one subtask has multiple input streams/channels where some are idle, others have large watermark skews? In addition, do we expect that the watermark update interval in: .withWatermarkAlignment("wm-group", maxDrift, updateInterval) to be at the milli second level? If so, the following log msg should be at the DEBUG level, I think 2022-07-10 06:53:35,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[] to avoid streaming logs filling up the disk space. Thanks Jun > On Jul 10, 2022, at 9:10 AM, Jun Qin <qinjunje...@gmail.com> wrote: > > Hi All > > I am trying watermark alignment in Flink 1.15 with: > > watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness( > Duration.ofMillis(outOfOrderness)) > .withWatermarkAlignment("wm-group", Duration.ofSeconds(10), > Duration.ofSeconds(1)) > .withTimestampAssigner( > (element, timestamp) -> element.getTimestamp()) > .withIdleness(Duration.ofSeconds(1)); > > And got the following in DEBUG logs: > 2022-07-10 06:53:35,713 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[] > 2022-07-10 06:53:36,606 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from > subTaskId=2 > 2022-07-10 06:53:36,619 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from > subTaskId=1 > 2022-07-10 06:53:36,639 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from > subTaskId=3 > 2022-07-10 06:53:36,702 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from > subTaskId=0 > 2022-07-10 06:53:36,713 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3] > 2022-07-10 06:53:37,229 DEBUG > shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update > lock acquire time to keep lease > 2022-07-10 06:53:37,237 DEBUG > shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - > TryAcquireOrRenew return success > 2022-07-10 06:53:37,237 DEBUG > shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - > Successfully renewed lease > 2022-07-10 06:53:37,603 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=2 > 2022-07-10 06:53:37,605 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=3 > 2022-07-10 06:53:37,616 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=1 > 2022-07-10 06:53:37,630 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=0 > 2022-07-10 06:53:37,713 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, > 3] > 2022-07-10 06:53:38,603 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=2 > 2022-07-10 06:53:38,604 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=3 > 2022-07-10 06:53:38,616 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=1 > 2022-07-10 06:53:38,630 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 07:12:55.807) from subTaskId=0 > 2022-07-10 06:53:38,713 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, > 3] > > > Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The > watermark looks to be correct at beginning, then changed to a something > related Long.MAX_VALUE… Feels like a buffer overflow issue.. > > As long as I remove the call .withWatermarkAlignment(), then all worked fine. > > Any idea? > > Thanks > Jun > >