[ https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846747#comment-17846747 ]
Yordan Pavlov commented on FLINK-33109: --------------------------------------- Hi [~pnowojski] , yes we do not observe the problem in 1.18 > Watermark alignment not applied after recovery from checkpoint > -------------------------------------------------------------- > > Key: FLINK-33109 > URL: https://issues.apache.org/jira/browse/FLINK-33109 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.17.1 > Reporter: Yordan Pavlov > Priority: Major > Attachments: WatermarkTest-1.scala, > image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png > > > I am observing a problem where after recovery from a checkpoint the Kafka > source watermarks would start to diverge not honoring the watermark alignment > setting I have applied. > I have a Kafka source which reads a topic with 32 partitions. I am applying > the following watermark strategy: > {code:java} > new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => > msg.value.getTimestamp) > .withWatermarkAlignment("alignment-sources-group", > time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code} > > This works great up until my job needs to recover from checkpoint. Once the > recovery takes place, no alignment is taking place any more. This can best be > illustrated by looking at the watermark metrics for various operators in the > image: > !image-2023-09-18-15-40-06-868.png! > > You can see how the watermarks disperse after the recovery. Trying to debug > the problem I noticed that before the failure there would be calls in > > {code:java} > SourceCoordinator::announceCombinedWatermark() > {code} > after the recovery, no calls get there, so no value for > {code:java} > watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code} > is ever read. I can manually fix the problem If I stop the job, clear all > state from Zookeeper and then manually start Flink providing the last > checkpoint with > {code:java} > '–fromSavepoint'{code} > flag. This would cause the SourceCoordinator to be constructed properly and > watermark drift to be checked. Once recovery manually watermarks would again > converge to the allowed drift as seen in the metrics: > !image-2023-09-18-15-46-16-106.png! > > Let me know If I can be helpful by providing any more information. > -- This message was sent by Atlassian Jira (v8.20.10#820010)