This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-2.2 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 43d524dafb05e8d4ce29fe2c29dd41b2913c2f02 Author: Efrat Levitan <[email protected]> AuthorDate: Thu Feb 19 16:08:35 2026 +0200 [FLINK-39073][runtime] Defer alignment check for idle splits If a split emits watermarks far into the future and then goes idle, alignment check will incorrectly mark it paused. As max allowed watermark advances, Source operator will transition the split back to active state, (while its still idle) This change aims to fix the issue by deferring the alignment check for idle splits until they break out of idleness. --- .../streaming/api/operators/SourceOperator.java | 30 +++++++++++++- .../SourceOperatorSplitWatermarkAlignmentTest.java | 48 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 09b12007d33..ea2110554ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -180,6 +180,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final Map<String, Long> splitCurrentWatermarks = new HashMap<>(); private final Set<String> currentlyPausedSplits = new HashSet<>(); + private final Set<String> currentlyIdleSplits = new HashSet<>(); private final Map<String, InternalSourceSplitMetricGroup> splitMetricGroups = new HashMap<>(); @@ -723,6 +724,15 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void updateCurrentSplitWatermark(String splitId, long watermark) { splitCurrentWatermarks.put(splitId, watermark); + if (!currentlyIdleSplits.contains(splitId)) { + maybePauseSplit(splitId); + } + } + + private void maybePauseSplit(String splitId) { + final long watermark = + splitCurrentWatermarks.getOrDefault( + splitId, Watermark.UNINITIALIZED.getTimestamp()); if (watermark > currentMaxDesiredWatermark && !currentlyPausedSplits.contains(splitId)) { pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); currentlyPausedSplits.add(splitId); @@ -731,10 +741,22 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void updateCurrentSplitIdle(String splitId, boolean idle) { + final InternalSourceSplitMetricGroup splitMetricGroup = + this.getOrCreateSplitMetricGroup(splitId); + if (idle == currentlyIdleSplits.contains(splitId)) { + return; + } if (idle) { - this.getOrCreateSplitMetricGroup(splitId).markIdle(); + LOG.info("[{}] Marking split idle", splitId); + currentlyIdleSplits.add(splitId); + splitMetricGroup.markIdle(); } else { - this.getOrCreateSplitMetricGroup(splitId).markNotIdle(); + LOG.info("[{}] Marking split not idle", splitId); + currentlyIdleSplits.remove(splitId); + splitMetricGroup.markNotIdle(); + // Since we skipped alignment check + // for this split while it was idle: + maybePauseSplit(splitId); } } @@ -743,6 +765,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr splitCurrentWatermarks.remove(splitId); getOrCreateSplitMetricGroup(splitId).onSplitFinished(); this.splitMetricGroups.remove(splitId); + currentlyIdleSplits.remove(splitId); } /** @@ -756,6 +779,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr Collection<String> splitsToResume = new ArrayList<>(); splitCurrentWatermarks.forEach( (splitId, splitWatermark) -> { + if (currentlyIdleSplits.contains(splitId)) { + return; + } if (splitWatermark > currentMaxDesiredWatermark) { splitsToPause.add(splitId); } else if (currentlyPausedSplits.contains(splitId)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 35621fbf5bc..6074d6f506e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -482,6 +482,54 @@ class SourceOperatorSplitWatermarkAlignmentTest { assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue(); } + @Test + void testAlignmentCheckIsDeferredForIdleSplits() throws Exception { + final long idleTimeout = 100; + final MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true); + final TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + final SourceOperator<Integer, MockSourceSplit> operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10); + final int allowedWatermark4 = 4; + final int allowedWatermark7 = 7; + split0.addRecord(5); + split0.addRecord(6); + split0.addRecord(7); + split0.addRecord(8); + operator.handleOperatorEvent( + new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer())); + final CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); + + // Emit enough record to fill the sampler buffer + operator.emitNext(actualOutput); + operator.emitNext(actualOutput); + operator.emitNext(actualOutput); + + // Transition the split to idle state: + for (int i = 0; i < 10; i++) { + processingTimeService.advance(idleTimeout); + } + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // Alignment check fires but doesn't pause the idle split + operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark4)); + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // While the split is idle, we advance the allowed watermark to keep the source active + operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark7)); + // The split is still idle: + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // The split emits a record to break out of idleness + operator.emitNext(actualOutput); + + // The split is marked not idle, then immediately paused by the deferred alignment check + assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue(); + } + private void assertOutput( CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) { assertThat(
