This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bda5c7c3f67c3861db1b7a161f41bd1d852f7b8d Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Mar 27 09:28:09 2025 +0100 [FLINK-37399][runtime][source] Buffer watermarks for watermark alignment Befor this change, when watermark alignment is enabled, it can prevent backlogged jobs from using all available resources. Inadvertently watermark alignment configured with maxAllowedWatermarkDrift and updateInterval was de facto capping the backlog processing speed to maxAllowedWatermarkDrift (event time) / updateInterval (wall clock). For example when maxAllowedWatermarkDrift=30s and updateInterval=1s, backlog could not be processed faster than 30s (event time) / 1s (wall clock). In that case, if job had 1 day of records to process in the backlog (for example after 24h downtime), this backlog could not be processed more quickly than in 48 minutes, regardless of available resources and number of actual records. This change adds SamplingWatermarksRingBuffer that will hide the latency between SourceOperators and SourceCoordinator. For more information please look into the ticket and/or FLIP --- .../flink/configuration/PipelineOptions.java | 25 ++++++++ .../streaming/api/operators/SourceOperator.java | 74 ++++++++++++++++------ .../api/operators/SourceOperatorAlignmentTest.java | 65 ++++++++++++++++--- .../SourceOperatorSplitWatermarkAlignmentTest.java | 71 ++++++++++++++++++--- .../api/operators/SourceOperatorTest.java | 15 ++--- .../api/operators/SourceOperatorTestContext.java | 21 +++++- .../operators/source/TestingSourceOperator.java | 4 +- 7 files changed, 226 insertions(+), 49 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index 9df2d7201a6..4742b6dbca8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel; import org.apache.flink.configuration.description.Description; @@ -328,4 +329,28 @@ public class PipelineOptions { + "while still using watermark alignment, set this parameter to true. " + "The default value is false. Note: This parameter may be " + "removed in future releases."); + + @Experimental + public static final ConfigOption<Integer> WATERMARK_ALIGNMENT_BUFFER_SIZE = + key("pipeline.watermark-alignment.buffer-size") + .intType() + .defaultValue(3) + .withDescription( + "Controls size of the ring buffer used to smooth out watermark alignment " + + "due to the inherent latency of the alignment process. Allowed watermarks " + + "are announced at the updateInterval and this means they are often out of date " + + "after the round trip, which means that watermark alignment might be pausing splits too much using this outdated information. " + + "To address this problem, when pausing consumption of records, " + + "max allowed watermark is not checked against the latest value of the watermark in " + + "any given split/source, but against the oldest value in the ring buffer, that is " + + "updated at every updateInterval. This is the config option that controls " + + "the size of the ring buffer. The default buffer size is 3. Buffer size of 1 " + + "can result in under utilised job's resources when processing backlog of records. " + + "Size of the buffer de facto delays the application of the watermark alignment " + + "process by that many updateIntervals. With the default size 3, splits can produce " + + "arbitrary amount of records for the duration of 3 * updateInterval before watermark " + + "alignment might pause them. You can set the watermarkBufferSize to 0 to restore " + + "pre Flink 2.3 behaviour with sampling disabled and always using the latest watermark. " + + "The default value of 3 has been chosen to cover the round trip delay of watermark alignment " + + "that is equal to 2 updateIntervals plus one more to cover for network/GC or other hiccups. "); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index eeb1924a653..12723254ffc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks; +import org.apache.flink.streaming.api.operators.source.WatermarkSampler; import org.apache.flink.streaming.api.operators.util.PausableRelativeClock; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.api.watermark.Watermark; @@ -87,6 +88,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS; +import static org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -154,8 +156,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private DataOutput<OUT> lastInvokedOutput; - private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp(); - private boolean idle = false; /** The state that holds the currently assigned splits. */ @@ -177,8 +177,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final List<SplitT> splitsToInitializeOutput = new ArrayList<>(); - private final Map<String, Long> splitCurrentWatermarks = new HashMap<>(); - private final Set<String> currentlyPausedSplits = new HashSet<>(); private boolean waitingForCheckpoint; @@ -200,6 +198,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private long currentMaxDesiredWatermark = Watermark.MAX_WATERMARK.getTimestamp(); + /** + * {@link #currentMaxDesiredWatermark} is checked against the minimum of sample split + * watermarks. + */ + private final Map<String, WatermarkSampler> sampledSplitWatermarks = new HashMap<>(); + + private final WatermarkSampler sampledLatestWatermark; + /** Can be not completed only in {@link OperatingMode#WAITING_FOR_ALIGNMENT} mode. */ private CompletableFuture<Void> waitingForAlignmentFuture = CompletableFuture.completedFuture(null); @@ -208,6 +214,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final boolean allowUnalignedSourceSplits; + private final int watermarkBufferSize; + private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords; /** @@ -237,6 +245,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr boolean supportsSplitReassignmentOnRecovery, boolean pauseSourcesUntilFirstCheckpoint) { super(parameters); + this.watermarkBufferSize = configuration.get(WATERMARK_ALIGNMENT_BUFFER_SIZE); + this.sampledLatestWatermark = new WatermarkSampler(watermarkBufferSize); this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); this.splitSerializer = checkNotNull(splitSerializer); @@ -379,14 +389,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } protected InternalSourceSplitMetricGroup getOrCreateSplitMetricGroup(String splitId) { + sampledSplitWatermarks.computeIfAbsent( + splitId, k -> new WatermarkSampler(watermarkBufferSize)); if (!this.splitMetricGroups.containsKey(splitId)) { InternalSourceSplitMetricGroup splitMetricGroup = InternalSourceSplitMetricGroup.wrap( getMetricGroup(), splitId, - () -> - splitCurrentWatermarks.getOrDefault( - splitId, Watermark.UNINITIALIZED.getTimestamp())); + () -> sampledSplitWatermarks.get(splitId).getLatest()); splitMetricGroup.markSplitStart(); this.splitMetricGroups.put(splitId, splitMetricGroup); } @@ -535,7 +545,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr // introduces a small performance regression (probably because of an extra // virtual call) processingTimeService.scheduleWithFixedDelay( - time -> emitLatestWatermark(), + time -> sampleAndEmitLatestWatermark(), watermarkAlignmentParams.getUpdateInterval(), watermarkAlignmentParams.getUpdateInterval()); } @@ -551,8 +561,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr return DataInputStatus.END_OF_DATA; case DATA_FINISHED: if (watermarkAlignmentParams.isEnabled()) { - latestWatermark = Watermark.MAX_WATERMARK.getTimestamp(); - emitLatestWatermark(); + this.sampledLatestWatermark.addLatest(Watermark.MAX_WATERMARK.getTimestamp()); + sampleAndEmitLatestWatermark(); } sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_INPUT; @@ -612,8 +622,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } } + private void sampleAndEmitLatestWatermark() { + sampleLatestWatermark(); + emitLatestWatermark(); + } + private void emitLatestWatermark() { checkState(currentMainOutput != null); + long latestWatermark = sampledLatestWatermark.getLatest(); if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) { return; } @@ -622,6 +638,15 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr idle ? Watermark.MAX_WATERMARK.getTimestamp() : latestWatermark)); } + private void sampleLatestWatermark() { + sampledSplitWatermarks.values().forEach(WatermarkSampler::sample); + sampledLatestWatermark.sample(); + + // as we updated sampled latest watermarks, we should check watermark alignment status + checkWatermarkAlignment(); + checkSplitWatermarkAlignment(); + } + @Override public void snapshotState(StateSnapshotContext context) throws Exception { if (waitingForCheckpoint) { @@ -748,14 +773,19 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void updateCurrentEffectiveWatermark(long watermark) { - latestWatermark = watermark; + sampledLatestWatermark.addLatest(watermark); checkWatermarkAlignment(); } @Override public void updateCurrentSplitWatermark(String splitId, long watermark) { - splitCurrentWatermarks.put(splitId, watermark); - if (watermark > currentMaxDesiredWatermark && !currentlyPausedSplits.contains(splitId)) { + WatermarkSampler splitWatermarkSampler = checkNotNull(sampledSplitWatermarks.get(splitId)); + splitWatermarkSampler.addLatest(watermark); + long oldestSampledWatermark = splitWatermarkSampler.getOldestSample(); + // oldestSampledWatermark can be only updated after adding new latest if sampling capacity + // is 0, but we still need to handle that + if (oldestSampledWatermark > currentMaxDesiredWatermark + && !currentlyPausedSplits.contains(splitId)) { pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); currentlyPausedSplits.add(splitId); } @@ -772,9 +802,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void splitFinished(String splitId) { - splitCurrentWatermarks.remove(splitId); getOrCreateSplitMetricGroup(splitId).onSplitFinished(); this.splitMetricGroups.remove(splitId); + sampledSplitWatermarks.remove(splitId); } /** @@ -786,9 +816,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private void checkSplitWatermarkAlignment() { Collection<String> splitsToPause = new ArrayList<>(); Collection<String> splitsToResume = new ArrayList<>(); - splitCurrentWatermarks.forEach( - (splitId, splitWatermark) -> { - if (splitWatermark > currentMaxDesiredWatermark) { + sampledSplitWatermarks.forEach( + (splitId, splitWatermarks) -> { + if (splitWatermarks.getOldestSample() > currentMaxDesiredWatermark) { splitsToPause.add(splitId); } else if (currentlyPausedSplits.contains(splitId)) { splitsToResume.add(splitId); @@ -805,6 +835,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private void pauseOrResumeSplits( Collection<String> splitsToPause, Collection<String> splitsToResume) { try { + LOG.info( + "pauseOrResumeSplits [splitsToPause={}][splitsToResume={}]" + + "[currentMaxDesiredWatermark={}][latestWatermark={}][oldestWatermark={}]", + splitsToPause, + splitsToResume, + currentMaxDesiredWatermark, + sampledLatestWatermark.getLatest(), + sampledLatestWatermark.getOldestSample()); sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume); eventTimeLogic.pauseOrResumeSplits(splitsToPause, splitsToResume); reportPausedOrResumed(splitsToPause, splitsToResume); @@ -844,7 +882,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } private boolean shouldWaitForAlignment() { - return currentMaxDesiredWatermark < latestWatermark; + return currentMaxDesiredWatermark < sampledLatestWatermark.getOldestSample(); } private void registerReader(List<SplitT> splits) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index 67f271994d5..455ddb7abce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -33,17 +33,22 @@ import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; import org.apache.flink.streaming.runtime.io.DataInputStatus; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -53,6 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link SourceOperator} watermark alignment. */ @SuppressWarnings("serial") +@ExtendWith(ParameterizedTestExtension.class) class SourceOperatorAlignmentTest { private static final int updateIntervalMillis = 1; @@ -60,6 +66,13 @@ class SourceOperatorAlignmentTest { @Nullable private SourceOperatorTestContext context; @Nullable private SourceOperator<Integer, MockSourceSplit> operator; + @Parameter public int watermarkBufferSize; + + @Parameters(name = "watermarkBufferSize: {0}") + public static Collection<Integer> parameters() { + return Arrays.asList(0, 1, 3); + } + @BeforeEach void setup() throws Exception { context = @@ -71,6 +84,7 @@ class SourceOperatorAlignmentTest { "group1", Duration.ofMillis(100), Duration.ofMillis(updateIntervalMillis))) + .setWatermarkBufferSize(watermarkBufferSize) .build(); operator = context.getOperator(); } @@ -82,8 +96,14 @@ class SourceOperatorAlignmentTest { operator = null; } - @Test + @TestTemplate void testWatermarkAlignment() throws Exception { + // how many steps to advance before all watermark samples are overwritten + int sampleWatermarksStep1 = Math.max(watermarkBufferSize - 1, 0); + // advance one more step for the final sample to be overwritten (or don't advance if + // buffering is disabled) + int sampleWatermarksStep2 = Math.min(1, watermarkBufferSize); + operator.initializeState(context.createStateContext()); operator.open(); MockSourceSplit newSplit = new MockSourceSplit(2); @@ -108,6 +128,10 @@ class SourceOperatorAlignmentTest { assertOutput(actualOutput, expectedOutput); assertThat(operator.isAvailable()).isTrue(); + sampleWatermarks(context.getTimeService(), sampleWatermarksStep1 + sampleWatermarksStep2); + // WatermarkAlignmentEvent hasn't been yet sent, so the operator is still available + assertThat(operator.isAvailable()).isTrue(); + operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 - 1)); assertThat(operator.isAvailable()).isFalse(); @@ -120,6 +144,14 @@ class SourceOperatorAlignmentTest { assertThat(operator.isAvailable()).isTrue(); operator.emitNext(actualOutput); + + if (sampleWatermarksStep1 > 0) { + sampleWatermarks(context.getTimeService(), sampleWatermarksStep1); + assertThat(operator.isAvailable()).isTrue(); + } + + sampleWatermarks(context.getTimeService(), sampleWatermarksStep2); + // Try to poll a record second time. Technically speaking previous emitNext call could have // already switch the operator status to unavailable, but that's an implementation detail. // However, this second call can not emit anything and should after that second call @@ -132,8 +164,16 @@ class SourceOperatorAlignmentTest { assertThat(operator.isAvailable()).isFalse(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) + @TestTemplate + void testWatermarkAlignmentWithIdleness() throws Exception { + testWatermarkAlignmentWithIdleness(false); + } + + @TestTemplate + void testWatermarkAlignmentWithIdlenessAllSubtasksIdle() throws Exception { + testWatermarkAlignmentWithIdleness(true); + } + void testWatermarkAlignmentWithIdleness(boolean allSubtasksIdle) throws Exception { // we use a separate context, because we need to enable idleness try (SourceOperatorTestContext context = @@ -213,7 +253,7 @@ class SourceOperatorAlignmentTest { } } - @Test + @TestTemplate void testWatermarkAlignmentWithoutSplit() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); @@ -246,7 +286,7 @@ class SourceOperatorAlignmentTest { assertOutput(actualOutput, expectedOutput); } - @Test + @TestTemplate void testStopWhileWaitingForWatermarkAlignment() throws Exception { testWatermarkAlignment(); @@ -257,7 +297,7 @@ class SourceOperatorAlignmentTest { assertThat(operator.isAvailable()).isTrue(); } - @Test + @TestTemplate void testReportedWatermarkDoNotDecrease() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); @@ -287,7 +327,7 @@ class SourceOperatorAlignmentTest { assertLatestReportedWatermarkEvent(record1); } - @Test + @TestTemplate void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { operator.initializeState(context.createStateContext()); operator.getReaderState().clear(); @@ -317,6 +357,11 @@ class SourceOperatorAlignmentTest { assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp()); } + private void sampleWatermarks(TestProcessingTimeService timeService, int times) + throws Exception { + timeService.advance(updateIntervalMillis * times); + } + private void assertOutput( CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) { assertThat( diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 2c26b920a77..742a8bc6d43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -70,6 +70,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.assertj.core.api.Assertions.assertThat; @@ -78,15 +79,23 @@ class SourceOperatorSplitWatermarkAlignmentTest { private static final int updateIntervalMillis = 1; - @Test - void testSplitWatermarkAlignment() throws Exception { - + @ParameterizedTest + @ValueSource(ints = {0, 1, 3}) + public void testSplitWatermarkAlignment(int watermarkRingBufferCapacity) throws Exception { + // how many steps to advance before all watermark samples are overwritten + int sampleWatermarksStep1 = Math.max(watermarkRingBufferCapacity - 1, 0); + // advance one more step for the final sample to be overwritten (or don't advance if + // buffering is disabled) + int sampleWatermarksStep2 = Math.min(1, watermarkRingBufferCapacity); + + TestProcessingTimeService timeService = new TestProcessingTimeService(); MockSourceReader sourceReader = new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); - Environment env = getTestingEnvironment(); + Configuration configuration = new Configuration(); + configuration.set(WATERMARK_ALIGNMENT_BUFFER_SIZE, watermarkRingBufferCapacity); SourceOperator<Integer, MockSourceSplit> operator = - createAndOpenSourceOperatorWithIdleness( - sourceReader, new TestProcessingTimeService(), 0); + createAndOpenSourceOperatorWithIdlenessAndEnv( + sourceReader, timeService, configuration, 0, getTestingEnvironment()); MockSourceSplit split1 = new MockSourceSplit(0, 0, 10); MockSourceSplit split2 = new MockSourceSplit(1, 10, 20); @@ -101,9 +110,17 @@ class SourceOperatorSplitWatermarkAlignmentTest { CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); operator.emitNext(dataOutput); // split 1 emits 5 + assertThat(sourceReader.getPausedSplits()).doesNotContain("0"); operator.handleOperatorEvent( new WatermarkAlignmentEvent(4)); // pause by coordinator message + + if (sampleWatermarksStep1 > 0) { + sampleWatermarks(timeService, sampleWatermarksStep1); + assertThat(sourceReader.getPausedSplits()).doesNotContain("0"); + } + + sampleWatermarks(timeService, sampleWatermarksStep2); assertThat(sourceReader.getPausedSplits()).containsExactly("0"); operator.handleOperatorEvent(new WatermarkAlignmentEvent(5)); @@ -112,10 +129,22 @@ class SourceOperatorSplitWatermarkAlignmentTest { operator.emitNext(dataOutput); // split 1 emits 11 operator.emitNext(dataOutput); // split 2 emits 3 + if (sampleWatermarksStep1 > 0) { + sampleWatermarks(timeService, sampleWatermarksStep1); + assertThat(sourceReader.getPausedSplits()).doesNotContain("0"); + } + + sampleWatermarks(timeService, sampleWatermarksStep2); assertThat(sourceReader.getPausedSplits()).containsExactly("0"); - operator.emitNext(dataOutput); // split 2 emits 6 + operator.emitNext(dataOutput); // split 2 emits 12 + + if (sampleWatermarksStep1 > 0) { + sampleWatermarks(timeService, sampleWatermarksStep1); + assertThat(sourceReader.getPausedSplits()).containsExactly("0"); + } + sampleWatermarks(timeService, sampleWatermarksStep2); assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1"); } @@ -251,6 +280,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); operator.emitNext(dataOutput); // split0 emits first (and only) record (maxEmittedWatermark) + sampleAllWatermarks(processingTimeService); operator.handleOperatorEvent( new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks split0 @@ -370,9 +400,11 @@ class SourceOperatorSplitWatermarkAlignmentTest { operator.emitNext(dataOutput); // split0 emits 5 operator.emitNext(dataOutput); // split1 emits 3 + sampleAllWatermarks(processingTimeService); operator.handleOperatorEvent( new WatermarkAlignmentEvent(allowedWatermark4)); // blocks split0 + assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue(); assertThat(operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue(); @@ -390,6 +422,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { assertThat(operator.getSplitMetricGroup(split1.splitId()).isIdle()).isTrue(); operator.emitNext(dataOutput); // split1 emits 8 + sampleAllWatermarks(processingTimeService); assertThat(operator.getSplitMetricGroup(split1.splitId()).isPaused()).isTrue(); operator.handleOperatorEvent( @@ -420,6 +453,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); operator.emitNext(actualOutput); + sampleAllWatermarks(processingTimeService); assertOutput(actualOutput, Arrays.asList(5)); assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue(); @@ -447,6 +481,8 @@ class SourceOperatorSplitWatermarkAlignmentTest { // testing transition idle -> paused operator.emitNext(actualOutput); + sampleAllWatermarks(processingTimeService); + assertOutput(actualOutput, Arrays.asList(5, 6)); assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue(); assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isFalse(); @@ -465,6 +501,17 @@ class SourceOperatorSplitWatermarkAlignmentTest { assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue(); } + private void sampleAllWatermarks(TestProcessingTimeService timeService) throws Exception { + sampleWatermarks(timeService, WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue()); + } + + private void sampleWatermarks(TestProcessingTimeService timeService, int times) + throws Exception { + for (int i = 0; i < times; i++) { + timeService.advance(updateIntervalMillis); + } + } + private void assertOutput( CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) { assertThat( @@ -483,7 +530,11 @@ class SourceOperatorSplitWatermarkAlignmentTest { throws Exception { return createAndOpenSourceOperatorWithIdlenessAndEnv( - sourceReader, processingTimeService, idleTimeout, getTestingEnvironment()); + sourceReader, + processingTimeService, + new Configuration(), + idleTimeout, + getTestingEnvironment()); } private SourceOperator<Integer, MockSourceSplit> @@ -504,12 +555,13 @@ class SourceOperatorSplitWatermarkAlignmentTest { .addTask(createExecutionAttemptId(), "test"); env.setTaskMetricGroup(metricGroup); return createAndOpenSourceOperatorWithIdlenessAndEnv( - sourceReader, processingTimeService, idleTimeout, env); + sourceReader, processingTimeService, new Configuration(), idleTimeout, env); } private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdlenessAndEnv( MockSourceReader sourceReader, TestProcessingTimeService processingTimeService, + Configuration configuration, long idleTimeout, Environment env) throws Exception { @@ -536,6 +588,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { sourceReader, watermarkStrategy, processingTimeService, + configuration, new MockOperatorEventGateway(), 1, 5, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 554e253e1fa..8ba9304344b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -49,7 +49,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.CollectorOutput; -import org.apache.flink.streaming.util.MockOutput; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; @@ -127,12 +126,11 @@ class SourceOperatorTest { void testOpen() throws Exception { for (boolean supportsSplitReassignmentOnRecovery : new boolean[] {true, false}) { try (SourceOperatorTestContext context = - new SourceOperatorTestContext( - false, - false, - WatermarkStrategy.noWatermarks(), - new MockOutput<>(new ArrayList<>()), - supportsSplitReassignmentOnRecovery)) { + SourceOperatorTestContext.builder() + .setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint) + .setSupportsSplitReassignmentOnRecovery( + supportsSplitReassignmentOnRecovery) + .build()) { SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); // Initialize the operator. operator.initializeState(context.createStateContext()); @@ -260,7 +258,8 @@ class SourceOperatorTest { .setOutput(new CollectorOutput<>(out)) .setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint) .setPreInit( - // recover with some state, so the source will pause until a checkpoint + // recover with some state, so the source will pause until a + // checkpoint // to speedup recovery (if pauseSourcesUntilCheckpoint) (stateManager, operatorID) -> { long checkpointID = 1L; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index 2f88a71036a..5e3087759cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.execution.Environment; @@ -66,6 +67,7 @@ public class SourceOperatorTestContext implements AutoCloseable { return new Builder(); } + /** Builder to construct {@link SourceOperatorTestContext}. */ public static class Builder { private boolean idle = false; private boolean usePerSplitOutputs = false; @@ -74,6 +76,8 @@ public class SourceOperatorTestContext implements AutoCloseable { private boolean supportsSplitReassignmentOnRecovery = false; private boolean pauseSourcesUntilFirstCheckpoint = false; private BiConsumer<TestTaskStateManager, OperatorID> preInit = (ign0, ign1) -> {}; + private int watermarkBufferSize = + PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue(); public Builder setIdle(boolean idle) { this.idle = idle; @@ -95,7 +99,8 @@ public class SourceOperatorTestContext implements AutoCloseable { return this; } - public Builder setSupportsSplitReassignmentOnRecovery(boolean supportsSplitReassignmentOnRecovery) { + public Builder setSupportsSplitReassignmentOnRecovery( + boolean supportsSplitReassignmentOnRecovery) { this.supportsSplitReassignmentOnRecovery = supportsSplitReassignmentOnRecovery; return this; } @@ -111,7 +116,14 @@ public class SourceOperatorTestContext implements AutoCloseable { return this; } + public Builder setWatermarkBufferSize(int watermarkBufferSize) { + this.watermarkBufferSize = watermarkBufferSize; + return this; + } + public SourceOperatorTestContext build() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE, watermarkBufferSize); return new SourceOperatorTestContext( idle, usePerSplitOutputs, @@ -119,7 +131,8 @@ public class SourceOperatorTestContext implements AutoCloseable { output, supportsSplitReassignmentOnRecovery, pauseSourcesUntilFirstCheckpoint, - preInit); + preInit, + configuration); } } @@ -136,7 +149,8 @@ public class SourceOperatorTestContext implements AutoCloseable { Output<StreamRecord<Integer>> output, boolean supportsSplitReassignmentOnRecovery, boolean pauseSourcesUntilFirstCheckpoint, - BiConsumer<TestTaskStateManager, OperatorID> preInit) + BiConsumer<TestTaskStateManager, OperatorID> preInit, + Configuration configuration) throws Exception { mockSourceReader = new MockSourceReader( @@ -161,6 +175,7 @@ public class SourceOperatorTestContext implements AutoCloseable { mockSourceReader, watermarkStrategy, timeService, + configuration, mockGateway, SUBTASK_INDEX, 5, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index fed65a42ccc..da22db8d0e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -72,6 +72,7 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> reader, watermarkStrategy, timeService, + new Configuration(), new MockOperatorEventGateway(), 1, 5, @@ -85,6 +86,7 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, + Configuration configuration, OperatorEventGateway eventGateway, int subtaskIndex, int parallelism, @@ -99,7 +101,7 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> new MockSourceSplitSerializer(), watermarkStrategy, timeService, - new Configuration(), + configuration, "localhost", emitProgressiveWatermarks, () -> false,
