This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a763489b9a889d628064e6208e054cf5f8c923ce
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu Feb 19 16:08:35 2026 +0200

    [FLINK-39073][runtime] Defer alignment check for idle splits
    
    If a split emits watermarks far into the future and then goes idle, 
alignment check will incorrectly mark it paused.
    As max allowed watermark advances, Source operator will transition the 
split back to active state, (while its still idle)
    This change aims to fix the issue by deferring the alignment check for idle 
splits until they break out of idleness.
---
 .../streaming/api/operators/SourceOperator.java    | 30 +++++++++++++-
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 48 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b5fff8915da..7703d1c294e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -178,6 +178,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
 
     private final Set<String> currentlyPausedSplits = new HashSet<>();
+    private final Set<String> currentlyIdleSplits = new HashSet<>();
 
     private final Map<String, InternalSourceSplitMetricGroup> 
splitMetricGroups = new HashMap<>();
 
@@ -711,6 +712,15 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     @Override
     public void updateCurrentSplitWatermark(String splitId, long watermark) {
         splitCurrentWatermarks.put(splitId, watermark);
+        if (!currentlyIdleSplits.contains(splitId)) {
+            maybePauseSplit(splitId);
+        }
+    }
+
+    private void maybePauseSplit(String splitId) {
+        final long watermark =
+                splitCurrentWatermarks.getOrDefault(
+                        splitId, Watermark.UNINITIALIZED.getTimestamp());
         if (watermark > currentMaxDesiredWatermark && 
!currentlyPausedSplits.contains(splitId)) {
             pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
             currentlyPausedSplits.add(splitId);
@@ -719,10 +729,22 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void updateCurrentSplitIdle(String splitId, boolean idle) {
+        final InternalSourceSplitMetricGroup splitMetricGroup =
+                this.getOrCreateSplitMetricGroup(splitId);
+        if (idle == currentlyIdleSplits.contains(splitId)) {
+            return;
+        }
         if (idle) {
-            this.getOrCreateSplitMetricGroup(splitId).markIdle();
+            LOG.info("[{}] Marking split idle", splitId);
+            currentlyIdleSplits.add(splitId);
+            splitMetricGroup.markIdle();
         } else {
-            this.getOrCreateSplitMetricGroup(splitId).markNotIdle();
+            LOG.info("[{}] Marking split not idle", splitId);
+            currentlyIdleSplits.remove(splitId);
+            splitMetricGroup.markNotIdle();
+            // Since we skipped alignment check
+            // for this split while it was idle:
+            maybePauseSplit(splitId);
         }
     }
 
@@ -731,6 +753,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         splitCurrentWatermarks.remove(splitId);
         getOrCreateSplitMetricGroup(splitId).onSplitFinished();
         this.splitMetricGroups.remove(splitId);
+        currentlyIdleSplits.remove(splitId);
     }
 
     /**
@@ -744,6 +767,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         Collection<String> splitsToResume = new ArrayList<>();
         splitCurrentWatermarks.forEach(
                 (splitId, splitWatermark) -> {
+                    if (currentlyIdleSplits.contains(splitId)) {
+                        return;
+                    }
                     if (splitWatermark > currentMaxDesiredWatermark) {
                         splitsToPause.add(splitId);
                     } else if (currentlyPausedSplits.contains(splitId)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 060689f0a55..54e33e2cf20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -481,6 +481,54 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
     }
 
+    @Test
+    void testAlignmentCheckIsDeferredForIdleSplits() throws Exception {
+        final long idleTimeout = 100;
+        final MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
true, true);
+        final TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        final SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, processingTimeService, idleTimeout);
+
+        final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+        final int allowedWatermark4 = 4;
+        final int allowedWatermark7 = 7;
+        split0.addRecord(5);
+        split0.addRecord(6);
+        split0.addRecord(7);
+        split0.addRecord(8);
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(Arrays.asList(split0), new 
MockSourceSplitSerializer()));
+        final CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
+
+        // Emit enough record to fill the sampler buffer
+        operator.emitNext(actualOutput);
+        operator.emitNext(actualOutput);
+        operator.emitNext(actualOutput);
+
+        // Transition the split to idle state:
+        for (int i = 0; i < 10; i++) {
+            processingTimeService.advance(idleTimeout);
+        }
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+        // Alignment check fires but doesn't pause the idle split
+        operator.handleOperatorEvent(new 
WatermarkAlignmentEvent(allowedWatermark4));
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+        // While the split is idle, we advance the allowed watermark to keep 
the source active
+        operator.handleOperatorEvent(new 
WatermarkAlignmentEvent(allowedWatermark7));
+        // The split is still idle:
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+        // The split emits a record to break out of idleness
+        operator.emitNext(actualOutput);
+
+        // The split is marked not idle, then immediately paused by the 
deferred alignment check
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+    }
+
     private void assertOutput(
             CollectingDataOutput<Integer> actualOutput, List<Integer> 
expectedOutput) {
         assertThat(

Reply via email to