haishui created FLINK-31632:
-------------------------------

             Summary: watermark aligned idle source can't resume
                 Key: FLINK-31632
                 URL: https://issues.apache.org/jira/browse/FLINK-31632
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.15.4, 1.16.1, 1.17.0
            Reporter: haishui


 
{code:java}
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
        .<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
        .withTimestampAssigner((element, recordTimestamp) -> 
Long.parseLong(element))
        .withWatermarkAlignment("group", Duration.ofMillis(10), 
Duration.ofSeconds(2))
        .withIdleness(Duration.ofSeconds(10)); 
DataStreamSource<String> s1 = env.fromSource(kafkaSource("topic1"), 
watermarkStrategy, "S1");
DataStreamSource<String> s2 = env.fromSource(kafkaSource("topic2"), 
watermarkStrategy, "S2");{code}
send "0" to kafka topic1 and topic2

 

After 10s, source1 and source2 is idle,and logs are

 
{code:java}
09:44:30,403 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
09:44:30,404 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
09:44:32,019 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=9 to subTaskIds=[0]
09:44:32,019 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=9 to subTaskIds=[0]
09:44:32,417 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
09:44:32,418 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
09:44:34,028 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=9 to subTaskIds=[0]
09:44:34,028 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=9 to subTaskIds=[0]
09:44:34,423 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from 
subTaskId=0
09:44:34,424 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from 
subTaskId=0
09:44:36,023 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
09:44:36,023 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
09:44:36,433 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from 
subTaskId=0
09:44:36,433 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from 
subTaskId=0 {code}
send message to topic1 or topic2 now, the message can't be consumed。

 

the reason is: 

when a source is marked idle, the lastEmittedWatermark = Long.MAX_VALUE and 
currentMaxDesiredWatermark = Long.MAX_VALUE + maxAllowedWatermarkDrift in 
org.apache.flink.streaming.api.operators.SourceOperator.
currentMaxDesiredWatermark is negative and always less than lastEmittedWatermark
operatingMode always is WAITING_FOR_ALIGNMENT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to