This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a763489b9a889d628064e6208e054cf5f8c923ce Author: Efrat Levitan <[email protected]> AuthorDate: Thu Feb 19 16:08:35 2026 +0200 [FLINK-39073][runtime] Defer alignment check for idle splits If a split emits watermarks far into the future and then goes idle, alignment check will incorrectly mark it paused. As max allowed watermark advances, Source operator will transition the split back to active state, (while its still idle) This change aims to fix the issue by deferring the alignment check for idle splits until they break out of idleness. --- .../streaming/api/operators/SourceOperator.java | 30 +++++++++++++- .../SourceOperatorSplitWatermarkAlignmentTest.java | 48 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index b5fff8915da..7703d1c294e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -178,6 +178,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final Map<String, Long> splitCurrentWatermarks = new HashMap<>(); private final Set<String> currentlyPausedSplits = new HashSet<>(); + private final Set<String> currentlyIdleSplits = new HashSet<>(); private final Map<String, InternalSourceSplitMetricGroup> splitMetricGroups = new HashMap<>(); @@ -711,6 +712,15 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void updateCurrentSplitWatermark(String splitId, long watermark) { splitCurrentWatermarks.put(splitId, watermark); + if (!currentlyIdleSplits.contains(splitId)) { + maybePauseSplit(splitId); + } + } + + private void maybePauseSplit(String splitId) { + final long watermark = + splitCurrentWatermarks.getOrDefault( + splitId, Watermark.UNINITIALIZED.getTimestamp()); if (watermark > currentMaxDesiredWatermark && !currentlyPausedSplits.contains(splitId)) { pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); currentlyPausedSplits.add(splitId); @@ -719,10 +729,22 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void updateCurrentSplitIdle(String splitId, boolean idle) { + final InternalSourceSplitMetricGroup splitMetricGroup = + this.getOrCreateSplitMetricGroup(splitId); + if (idle == currentlyIdleSplits.contains(splitId)) { + return; + } if (idle) { - this.getOrCreateSplitMetricGroup(splitId).markIdle(); + LOG.info("[{}] Marking split idle", splitId); + currentlyIdleSplits.add(splitId); + splitMetricGroup.markIdle(); } else { - this.getOrCreateSplitMetricGroup(splitId).markNotIdle(); + LOG.info("[{}] Marking split not idle", splitId); + currentlyIdleSplits.remove(splitId); + splitMetricGroup.markNotIdle(); + // Since we skipped alignment check + // for this split while it was idle: + maybePauseSplit(splitId); } } @@ -731,6 +753,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr splitCurrentWatermarks.remove(splitId); getOrCreateSplitMetricGroup(splitId).onSplitFinished(); this.splitMetricGroups.remove(splitId); + currentlyIdleSplits.remove(splitId); } /** @@ -744,6 +767,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr Collection<String> splitsToResume = new ArrayList<>(); splitCurrentWatermarks.forEach( (splitId, splitWatermark) -> { + if (currentlyIdleSplits.contains(splitId)) { + return; + } if (splitWatermark > currentMaxDesiredWatermark) { splitsToPause.add(splitId); } else if (currentlyPausedSplits.contains(splitId)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 060689f0a55..54e33e2cf20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -481,6 +481,54 @@ class SourceOperatorSplitWatermarkAlignmentTest { assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue(); } + @Test + void testAlignmentCheckIsDeferredForIdleSplits() throws Exception { + final long idleTimeout = 100; + final MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true); + final TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + final SourceOperator<Integer, MockSourceSplit> operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10); + final int allowedWatermark4 = 4; + final int allowedWatermark7 = 7; + split0.addRecord(5); + split0.addRecord(6); + split0.addRecord(7); + split0.addRecord(8); + operator.handleOperatorEvent( + new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer())); + final CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); + + // Emit enough record to fill the sampler buffer + operator.emitNext(actualOutput); + operator.emitNext(actualOutput); + operator.emitNext(actualOutput); + + // Transition the split to idle state: + for (int i = 0; i < 10; i++) { + processingTimeService.advance(idleTimeout); + } + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // Alignment check fires but doesn't pause the idle split + operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark4)); + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // While the split is idle, we advance the allowed watermark to keep the source active + operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark7)); + // The split is still idle: + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // The split emits a record to break out of idleness + operator.emitNext(actualOutput); + + // The split is marked not idle, then immediately paused by the deferred alignment check + assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue(); + } + private void assertOutput( CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) { assertThat(
