This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bda5c7c3f67c3861db1b7a161f41bd1d852f7b8d
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Mar 27 09:28:09 2025 +0100

    [FLINK-37399][runtime][source] Buffer watermarks for watermark alignment
    
    Befor this change, when watermark alignment is enabled, it can prevent 
backlogged jobs
    from using all available resources. Inadvertently watermark alignment 
configured with
    maxAllowedWatermarkDrift and updateInterval was de facto capping the 
backlog processing
    speed to maxAllowedWatermarkDrift (event time) / updateInterval (wall 
clock). For example
    when maxAllowedWatermarkDrift=30s and updateInterval=1s, backlog could not 
be processed
    faster than 30s (event time) / 1s (wall clock). In that case, if job had 1 
day of records
    to process in the backlog (for example after 24h downtime), this backlog 
could not be
    processed more quickly than in 48 minutes, regardless of available 
resources and number
    of actual records.
    
    This change adds SamplingWatermarksRingBuffer that will hide the latency 
between
    SourceOperators and SourceCoordinator. For more information please look 
into the ticket
    and/or FLIP
---
 .../flink/configuration/PipelineOptions.java       | 25 ++++++++
 .../streaming/api/operators/SourceOperator.java    | 74 ++++++++++++++++------
 .../api/operators/SourceOperatorAlignmentTest.java | 65 ++++++++++++++++---
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 71 ++++++++++++++++++---
 .../api/operators/SourceOperatorTest.java          | 15 ++---
 .../api/operators/SourceOperatorTestContext.java   | 21 +++++-
 .../operators/source/TestingSourceOperator.java    |  4 +-
 7 files changed, 226 insertions(+), 49 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 9df2d7201a6..4742b6dbca8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel;
 import org.apache.flink.configuration.description.Description;
@@ -328,4 +329,28 @@ public class PipelineOptions {
                                     + "while still using watermark alignment, 
set this parameter to true. "
                                     + "The default value is false. Note: This 
parameter may be "
                                     + "removed in future releases.");
+
+    @Experimental
+    public static final ConfigOption<Integer> WATERMARK_ALIGNMENT_BUFFER_SIZE =
+            key("pipeline.watermark-alignment.buffer-size")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "Controls size of the ring buffer used to smooth 
out watermark alignment "
+                                    + "due to the inherent latency of the 
alignment process. Allowed watermarks "
+                                    + "are announced at the updateInterval and 
this means they are often out of date "
+                                    + "after the round trip, which means that 
watermark alignment might be pausing splits too much using this outdated 
information. "
+                                    + "To address this problem, when pausing 
consumption of records, "
+                                    + "max allowed watermark is not checked 
against the latest value of the watermark in "
+                                    + "any given split/source, but against the 
oldest value in the ring buffer, that is "
+                                    + "updated at every updateInterval. This 
is the config option that controls "
+                                    + "the size of the ring buffer. The 
default buffer size is 3. Buffer size of 1 "
+                                    + "can result in under utilised job's 
resources when processing backlog of records. "
+                                    + "Size of the buffer de facto delays the 
application of the watermark alignment "
+                                    + "process by that many updateIntervals. 
With the default size 3, splits can produce "
+                                    + "arbitrary amount of records for the 
duration of 3 * updateInterval before watermark "
+                                    + "alignment might pause them. You can set 
the watermarkBufferSize to 0 to restore "
+                                    + "pre Flink 2.3 behaviour with sampling 
disabled and always using the latest watermark. "
+                                    + "The default value of 3 has been chosen 
to cover the round trip delay of watermark alignment "
+                                    + "that is equal to 2 updateIntervals plus 
one more to cover for network/GC or other hiccups. ");
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index eeb1924a653..12723254ffc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
+import org.apache.flink.streaming.api.operators.source.WatermarkSampler;
 import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -87,6 +88,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS;
+import static 
org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -154,8 +156,6 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private DataOutput<OUT> lastInvokedOutput;
 
-    private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
-
     private boolean idle = false;
 
     /** The state that holds the currently assigned splits. */
@@ -177,8 +177,6 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private final List<SplitT> splitsToInitializeOutput = new ArrayList<>();
 
-    private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
-
     private final Set<String> currentlyPausedSplits = new HashSet<>();
 
     private boolean waitingForCheckpoint;
@@ -200,6 +198,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private long currentMaxDesiredWatermark = 
Watermark.MAX_WATERMARK.getTimestamp();
 
+    /**
+     * {@link #currentMaxDesiredWatermark} is checked against the minimum of 
sample split
+     * watermarks.
+     */
+    private final Map<String, WatermarkSampler> sampledSplitWatermarks = new 
HashMap<>();
+
+    private final WatermarkSampler sampledLatestWatermark;
+
     /** Can be not completed only in {@link 
OperatingMode#WAITING_FOR_ALIGNMENT} mode. */
     private CompletableFuture<Void> waitingForAlignmentFuture =
             CompletableFuture.completedFuture(null);
@@ -208,6 +214,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private final boolean allowUnalignedSourceSplits;
 
+    private final int watermarkBufferSize;
+
     private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
 
     /**
@@ -237,6 +245,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             boolean supportsSplitReassignmentOnRecovery,
             boolean pauseSourcesUntilFirstCheckpoint) {
         super(parameters);
+        this.watermarkBufferSize = 
configuration.get(WATERMARK_ALIGNMENT_BUFFER_SIZE);
+        this.sampledLatestWatermark = new 
WatermarkSampler(watermarkBufferSize);
         this.readerFactory = checkNotNull(readerFactory);
         this.operatorEventGateway = checkNotNull(operatorEventGateway);
         this.splitSerializer = checkNotNull(splitSerializer);
@@ -379,14 +389,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     }
 
     protected InternalSourceSplitMetricGroup 
getOrCreateSplitMetricGroup(String splitId) {
+        sampledSplitWatermarks.computeIfAbsent(
+                splitId, k -> new WatermarkSampler(watermarkBufferSize));
         if (!this.splitMetricGroups.containsKey(splitId)) {
             InternalSourceSplitMetricGroup splitMetricGroup =
                     InternalSourceSplitMetricGroup.wrap(
                             getMetricGroup(),
                             splitId,
-                            () ->
-                                    splitCurrentWatermarks.getOrDefault(
-                                            splitId, 
Watermark.UNINITIALIZED.getTimestamp()));
+                            () -> 
sampledSplitWatermarks.get(splitId).getLatest());
             splitMetricGroup.markSplitStart();
             this.splitMetricGroups.put(splitId, splitMetricGroup);
         }
@@ -535,7 +545,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                     // introduces a small performance regression (probably 
because of an extra
                     // virtual call)
                     processingTimeService.scheduleWithFixedDelay(
-                            time -> emitLatestWatermark(),
+                            time -> sampleAndEmitLatestWatermark(),
                             watermarkAlignmentParams.getUpdateInterval(),
                             watermarkAlignmentParams.getUpdateInterval());
                 }
@@ -551,8 +561,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                 return DataInputStatus.END_OF_DATA;
             case DATA_FINISHED:
                 if (watermarkAlignmentParams.isEnabled()) {
-                    latestWatermark = Watermark.MAX_WATERMARK.getTimestamp();
-                    emitLatestWatermark();
+                    
this.sampledLatestWatermark.addLatest(Watermark.MAX_WATERMARK.getTimestamp());
+                    sampleAndEmitLatestWatermark();
                 }
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.END_OF_INPUT;
@@ -612,8 +622,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
     }
 
+    private void sampleAndEmitLatestWatermark() {
+        sampleLatestWatermark();
+        emitLatestWatermark();
+    }
+
     private void emitLatestWatermark() {
         checkState(currentMainOutput != null);
+        long latestWatermark = sampledLatestWatermark.getLatest();
         if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
             return;
         }
@@ -622,6 +638,15 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                         idle ? Watermark.MAX_WATERMARK.getTimestamp() : 
latestWatermark));
     }
 
+    private void sampleLatestWatermark() {
+        sampledSplitWatermarks.values().forEach(WatermarkSampler::sample);
+        sampledLatestWatermark.sample();
+
+        // as we updated sampled latest watermarks, we should check watermark 
alignment status
+        checkWatermarkAlignment();
+        checkSplitWatermarkAlignment();
+    }
+
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         if (waitingForCheckpoint) {
@@ -748,14 +773,19 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void updateCurrentEffectiveWatermark(long watermark) {
-        latestWatermark = watermark;
+        sampledLatestWatermark.addLatest(watermark);
         checkWatermarkAlignment();
     }
 
     @Override
     public void updateCurrentSplitWatermark(String splitId, long watermark) {
-        splitCurrentWatermarks.put(splitId, watermark);
-        if (watermark > currentMaxDesiredWatermark && 
!currentlyPausedSplits.contains(splitId)) {
+        WatermarkSampler splitWatermarkSampler = 
checkNotNull(sampledSplitWatermarks.get(splitId));
+        splitWatermarkSampler.addLatest(watermark);
+        long oldestSampledWatermark = splitWatermarkSampler.getOldestSample();
+        // oldestSampledWatermark can be only updated after adding new latest 
if sampling capacity
+        // is 0, but we still need to handle that
+        if (oldestSampledWatermark > currentMaxDesiredWatermark
+                && !currentlyPausedSplits.contains(splitId)) {
             pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
             currentlyPausedSplits.add(splitId);
         }
@@ -772,9 +802,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void splitFinished(String splitId) {
-        splitCurrentWatermarks.remove(splitId);
         getOrCreateSplitMetricGroup(splitId).onSplitFinished();
         this.splitMetricGroups.remove(splitId);
+        sampledSplitWatermarks.remove(splitId);
     }
 
     /**
@@ -786,9 +816,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private void checkSplitWatermarkAlignment() {
         Collection<String> splitsToPause = new ArrayList<>();
         Collection<String> splitsToResume = new ArrayList<>();
-        splitCurrentWatermarks.forEach(
-                (splitId, splitWatermark) -> {
-                    if (splitWatermark > currentMaxDesiredWatermark) {
+        sampledSplitWatermarks.forEach(
+                (splitId, splitWatermarks) -> {
+                    if (splitWatermarks.getOldestSample() > 
currentMaxDesiredWatermark) {
                         splitsToPause.add(splitId);
                     } else if (currentlyPausedSplits.contains(splitId)) {
                         splitsToResume.add(splitId);
@@ -805,6 +835,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private void pauseOrResumeSplits(
             Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
         try {
+            LOG.info(
+                    "pauseOrResumeSplits [splitsToPause={}][splitsToResume={}]"
+                            + 
"[currentMaxDesiredWatermark={}][latestWatermark={}][oldestWatermark={}]",
+                    splitsToPause,
+                    splitsToResume,
+                    currentMaxDesiredWatermark,
+                    sampledLatestWatermark.getLatest(),
+                    sampledLatestWatermark.getOldestSample());
             sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
             eventTimeLogic.pauseOrResumeSplits(splitsToPause, splitsToResume);
             reportPausedOrResumed(splitsToPause, splitsToResume);
@@ -844,7 +882,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     }
 
     private boolean shouldWaitForAlignment() {
-        return currentMaxDesiredWatermark < latestWatermark;
+        return currentMaxDesiredWatermark < 
sampledLatestWatermark.getOldestSample();
     }
 
     private void registerReader(List<SplitT> splits) throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index 67f271994d5..455ddb7abce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -33,17 +33,22 @@ import 
org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
 import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for {@link SourceOperator} watermark alignment. */
 @SuppressWarnings("serial")
+@ExtendWith(ParameterizedTestExtension.class)
 class SourceOperatorAlignmentTest {
 
     private static final int updateIntervalMillis = 1;
@@ -60,6 +66,13 @@ class SourceOperatorAlignmentTest {
     @Nullable private SourceOperatorTestContext context;
     @Nullable private SourceOperator<Integer, MockSourceSplit> operator;
 
+    @Parameter public int watermarkBufferSize;
+
+    @Parameters(name = "watermarkBufferSize: {0}")
+    public static Collection<Integer> parameters() {
+        return Arrays.asList(0, 1, 3);
+    }
+
     @BeforeEach
     void setup() throws Exception {
         context =
@@ -71,6 +84,7 @@ class SourceOperatorAlignmentTest {
                                                 "group1",
                                                 Duration.ofMillis(100),
                                                 
Duration.ofMillis(updateIntervalMillis)))
+                        .setWatermarkBufferSize(watermarkBufferSize)
                         .build();
         operator = context.getOperator();
     }
@@ -82,8 +96,14 @@ class SourceOperatorAlignmentTest {
         operator = null;
     }
 
-    @Test
+    @TestTemplate
     void testWatermarkAlignment() throws Exception {
+        // how many steps to advance before all watermark samples are 
overwritten
+        int sampleWatermarksStep1 = Math.max(watermarkBufferSize - 1, 0);
+        // advance one more step for the final sample to be overwritten (or 
don't advance if
+        // buffering is disabled)
+        int sampleWatermarksStep2 = Math.min(1, watermarkBufferSize);
+
         operator.initializeState(context.createStateContext());
         operator.open();
         MockSourceSplit newSplit = new MockSourceSplit(2);
@@ -108,6 +128,10 @@ class SourceOperatorAlignmentTest {
         assertOutput(actualOutput, expectedOutput);
         assertThat(operator.isAvailable()).isTrue();
 
+        sampleWatermarks(context.getTimeService(), sampleWatermarksStep1 + 
sampleWatermarksStep2);
+        // WatermarkAlignmentEvent hasn't been yet sent, so the operator is 
still available
+        assertThat(operator.isAvailable()).isTrue();
+
         operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 - 1));
 
         assertThat(operator.isAvailable()).isFalse();
@@ -120,6 +144,14 @@ class SourceOperatorAlignmentTest {
 
         assertThat(operator.isAvailable()).isTrue();
         operator.emitNext(actualOutput);
+
+        if (sampleWatermarksStep1 > 0) {
+            sampleWatermarks(context.getTimeService(), sampleWatermarksStep1);
+            assertThat(operator.isAvailable()).isTrue();
+        }
+
+        sampleWatermarks(context.getTimeService(), sampleWatermarksStep2);
+
         // Try to poll a record second time. Technically speaking previous 
emitNext call could have
         // already switch the operator status to unavailable, but that's an 
implementation detail.
         // However, this second call can not emit anything and should after 
that second call
@@ -132,8 +164,16 @@ class SourceOperatorAlignmentTest {
         assertThat(operator.isAvailable()).isFalse();
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @TestTemplate
+    void testWatermarkAlignmentWithIdleness() throws Exception {
+        testWatermarkAlignmentWithIdleness(false);
+    }
+
+    @TestTemplate
+    void testWatermarkAlignmentWithIdlenessAllSubtasksIdle() throws Exception {
+        testWatermarkAlignmentWithIdleness(true);
+    }
+
     void testWatermarkAlignmentWithIdleness(boolean allSubtasksIdle) throws 
Exception {
         // we use a separate context, because we need to enable idleness
         try (SourceOperatorTestContext context =
@@ -213,7 +253,7 @@ class SourceOperatorAlignmentTest {
         }
     }
 
-    @Test
+    @TestTemplate
     void testWatermarkAlignmentWithoutSplit() throws Exception {
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -246,7 +286,7 @@ class SourceOperatorAlignmentTest {
         assertOutput(actualOutput, expectedOutput);
     }
 
-    @Test
+    @TestTemplate
     void testStopWhileWaitingForWatermarkAlignment() throws Exception {
         testWatermarkAlignment();
 
@@ -257,7 +297,7 @@ class SourceOperatorAlignmentTest {
         assertThat(operator.isAvailable()).isTrue();
     }
 
-    @Test
+    @TestTemplate
     void testReportedWatermarkDoNotDecrease() throws Exception {
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -287,7 +327,7 @@ class SourceOperatorAlignmentTest {
         assertLatestReportedWatermarkEvent(record1);
     }
 
-    @Test
+    @TestTemplate
     void testWatermarkAlignmentWhileSubtaskFinished() throws Exception {
         operator.initializeState(context.createStateContext());
         operator.getReaderState().clear();
@@ -317,6 +357,11 @@ class SourceOperatorAlignmentTest {
         
assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp());
     }
 
+    private void sampleWatermarks(TestProcessingTimeService timeService, int 
times)
+            throws Exception {
+        timeService.advance(updateIntervalMillis * times);
+    }
+
     private void assertOutput(
             CollectingDataOutput<Integer> actualOutput, List<Integer> 
expectedOutput) {
         assertThat(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 2c26b920a77..742a8bc6d43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -70,6 +70,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -78,15 +79,23 @@ class SourceOperatorSplitWatermarkAlignmentTest {
 
     private static final int updateIntervalMillis = 1;
 
-    @Test
-    void testSplitWatermarkAlignment() throws Exception {
-
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1, 3})
+    public void testSplitWatermarkAlignment(int watermarkRingBufferCapacity) 
throws Exception {
+        // how many steps to advance before all watermark samples are 
overwritten
+        int sampleWatermarksStep1 = Math.max(watermarkRingBufferCapacity - 1, 
0);
+        // advance one more step for the final sample to be overwritten (or 
don't advance if
+        // buffering is disabled)
+        int sampleWatermarksStep2 = Math.min(1, watermarkRingBufferCapacity);
+
+        TestProcessingTimeService timeService = new 
TestProcessingTimeService();
         MockSourceReader sourceReader =
                 new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);
-        Environment env = getTestingEnvironment();
+        Configuration configuration = new Configuration();
+        configuration.set(WATERMARK_ALIGNMENT_BUFFER_SIZE, 
watermarkRingBufferCapacity);
         SourceOperator<Integer, MockSourceSplit> operator =
-                createAndOpenSourceOperatorWithIdleness(
-                        sourceReader, new TestProcessingTimeService(), 0);
+                createAndOpenSourceOperatorWithIdlenessAndEnv(
+                        sourceReader, timeService, configuration, 0, 
getTestingEnvironment());
 
         MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
         MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
@@ -101,9 +110,17 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
 
         operator.emitNext(dataOutput); // split 1 emits 5
+        assertThat(sourceReader.getPausedSplits()).doesNotContain("0");
 
         operator.handleOperatorEvent(
                 new WatermarkAlignmentEvent(4)); // pause by coordinator 
message
+
+        if (sampleWatermarksStep1 > 0) {
+            sampleWatermarks(timeService, sampleWatermarksStep1);
+            assertThat(sourceReader.getPausedSplits()).doesNotContain("0");
+        }
+
+        sampleWatermarks(timeService, sampleWatermarksStep2);
         assertThat(sourceReader.getPausedSplits()).containsExactly("0");
 
         operator.handleOperatorEvent(new WatermarkAlignmentEvent(5));
@@ -112,10 +129,22 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         operator.emitNext(dataOutput); // split 1 emits 11
         operator.emitNext(dataOutput); // split 2 emits 3
 
+        if (sampleWatermarksStep1 > 0) {
+            sampleWatermarks(timeService, sampleWatermarksStep1);
+            assertThat(sourceReader.getPausedSplits()).doesNotContain("0");
+        }
+
+        sampleWatermarks(timeService, sampleWatermarksStep2);
         assertThat(sourceReader.getPausedSplits()).containsExactly("0");
 
-        operator.emitNext(dataOutput); // split 2 emits 6
+        operator.emitNext(dataOutput); // split 2 emits 12
+
+        if (sampleWatermarksStep1 > 0) {
+            sampleWatermarks(timeService, sampleWatermarksStep1);
+            assertThat(sourceReader.getPausedSplits()).containsExactly("0");
+        }
 
+        sampleWatermarks(timeService, sampleWatermarksStep2);
         assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1");
     }
 
@@ -251,6 +280,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
 
         operator.emitNext(dataOutput); // split0 emits first (and only) record 
(maxEmittedWatermark)
+        sampleAllWatermarks(processingTimeService);
 
         operator.handleOperatorEvent(
                 new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks 
split0
@@ -370,9 +400,11 @@ class SourceOperatorSplitWatermarkAlignmentTest {
 
         operator.emitNext(dataOutput); // split0 emits 5
         operator.emitNext(dataOutput); // split1 emits 3
+        sampleAllWatermarks(processingTimeService);
 
         operator.handleOperatorEvent(
                 new WatermarkAlignmentEvent(allowedWatermark4)); // blocks 
split0
+
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
         
assertThat(operator.getSplitMetricGroup(split1.splitId()).isActive()).isTrue();
 
@@ -390,6 +422,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         
assertThat(operator.getSplitMetricGroup(split1.splitId()).isIdle()).isTrue();
 
         operator.emitNext(dataOutput); // split1 emits 8
+        sampleAllWatermarks(processingTimeService);
         
assertThat(operator.getSplitMetricGroup(split1.splitId()).isPaused()).isTrue();
 
         operator.handleOperatorEvent(
@@ -420,6 +453,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
 
         operator.emitNext(actualOutput);
+        sampleAllWatermarks(processingTimeService);
         assertOutput(actualOutput, Arrays.asList(5));
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
 
@@ -447,6 +481,8 @@ class SourceOperatorSplitWatermarkAlignmentTest {
 
         // testing transition idle -> paused
         operator.emitNext(actualOutput);
+        sampleAllWatermarks(processingTimeService);
+
         assertOutput(actualOutput, Arrays.asList(5, 6));
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isFalse();
@@ -465,6 +501,17 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
     }
 
+    private void sampleAllWatermarks(TestProcessingTimeService timeService) 
throws Exception {
+        sampleWatermarks(timeService, 
WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue());
+    }
+
+    private void sampleWatermarks(TestProcessingTimeService timeService, int 
times)
+            throws Exception {
+        for (int i = 0; i < times; i++) {
+            timeService.advance(updateIntervalMillis);
+        }
+    }
+
     private void assertOutput(
             CollectingDataOutput<Integer> actualOutput, List<Integer> 
expectedOutput) {
         assertThat(
@@ -483,7 +530,11 @@ class SourceOperatorSplitWatermarkAlignmentTest {
             throws Exception {
 
         return createAndOpenSourceOperatorWithIdlenessAndEnv(
-                sourceReader, processingTimeService, idleTimeout, 
getTestingEnvironment());
+                sourceReader,
+                processingTimeService,
+                new Configuration(),
+                idleTimeout,
+                getTestingEnvironment());
     }
 
     private SourceOperator<Integer, MockSourceSplit>
@@ -504,12 +555,13 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                         .addTask(createExecutionAttemptId(), "test");
         env.setTaskMetricGroup(metricGroup);
         return createAndOpenSourceOperatorWithIdlenessAndEnv(
-                sourceReader, processingTimeService, idleTimeout, env);
+                sourceReader, processingTimeService, new Configuration(), 
idleTimeout, env);
     }
 
     private SourceOperator<Integer, MockSourceSplit> 
createAndOpenSourceOperatorWithIdlenessAndEnv(
             MockSourceReader sourceReader,
             TestProcessingTimeService processingTimeService,
+            Configuration configuration,
             long idleTimeout,
             Environment env)
             throws Exception {
@@ -536,6 +588,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                         sourceReader,
                         watermarkStrategy,
                         processingTimeService,
+                        configuration,
                         new MockOperatorEventGateway(),
                         1,
                         5,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 554e253e1fa..8ba9304344b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.CollectorOutput;
-import org.apache.flink.streaming.util.MockOutput;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
@@ -127,12 +126,11 @@ class SourceOperatorTest {
     void testOpen() throws Exception {
         for (boolean supportsSplitReassignmentOnRecovery : new boolean[] 
{true, false}) {
             try (SourceOperatorTestContext context =
-                    new SourceOperatorTestContext(
-                            false,
-                            false,
-                            WatermarkStrategy.noWatermarks(),
-                            new MockOutput<>(new ArrayList<>()),
-                            supportsSplitReassignmentOnRecovery)) {
+                    SourceOperatorTestContext.builder()
+                            
.setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint)
+                            .setSupportsSplitReassignmentOnRecovery(
+                                    supportsSplitReassignmentOnRecovery)
+                            .build()) {
                 SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
                 // Initialize the operator.
                 operator.initializeState(context.createStateContext());
@@ -260,7 +258,8 @@ class SourceOperatorTest {
                         .setOutput(new CollectorOutput<>(out))
                         
.setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint)
                         .setPreInit(
-                                // recover with some state, so the source will 
pause until a checkpoint
+                                // recover with some state, so the source will 
pause until a
+                                // checkpoint
                                 // to speedup recovery (if 
pauseSourcesUntilCheckpoint)
                                 (stateManager, operatorID) -> {
                                     long checkpointID = 1L;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index 2f88a71036a..5e3087759cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.execution.Environment;
@@ -66,6 +67,7 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
         return new Builder();
     }
 
+    /** Builder to construct {@link SourceOperatorTestContext}. */
     public static class Builder {
         private boolean idle = false;
         private boolean usePerSplitOutputs = false;
@@ -74,6 +76,8 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
         private boolean supportsSplitReassignmentOnRecovery = false;
         private boolean pauseSourcesUntilFirstCheckpoint = false;
         private BiConsumer<TestTaskStateManager, OperatorID> preInit = (ign0, 
ign1) -> {};
+        private int watermarkBufferSize =
+                PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue();
 
         public Builder setIdle(boolean idle) {
             this.idle = idle;
@@ -95,7 +99,8 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
             return this;
         }
 
-        public Builder setSupportsSplitReassignmentOnRecovery(boolean 
supportsSplitReassignmentOnRecovery) {
+        public Builder setSupportsSplitReassignmentOnRecovery(
+                boolean supportsSplitReassignmentOnRecovery) {
             this.supportsSplitReassignmentOnRecovery = 
supportsSplitReassignmentOnRecovery;
             return this;
         }
@@ -111,7 +116,14 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
             return this;
         }
 
+        public Builder setWatermarkBufferSize(int watermarkBufferSize) {
+            this.watermarkBufferSize = watermarkBufferSize;
+            return this;
+        }
+
         public SourceOperatorTestContext build() throws Exception {
+            Configuration configuration = new Configuration();
+            configuration.set(PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE, 
watermarkBufferSize);
             return new SourceOperatorTestContext(
                     idle,
                     usePerSplitOutputs,
@@ -119,7 +131,8 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
                     output,
                     supportsSplitReassignmentOnRecovery,
                     pauseSourcesUntilFirstCheckpoint,
-                    preInit);
+                    preInit,
+                    configuration);
         }
     }
 
@@ -136,7 +149,8 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
             Output<StreamRecord<Integer>> output,
             boolean supportsSplitReassignmentOnRecovery,
             boolean pauseSourcesUntilFirstCheckpoint,
-            BiConsumer<TestTaskStateManager, OperatorID> preInit)
+            BiConsumer<TestTaskStateManager, OperatorID> preInit,
+            Configuration configuration)
             throws Exception {
         mockSourceReader =
                 new MockSourceReader(
@@ -161,6 +175,7 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
                         mockSourceReader,
                         watermarkStrategy,
                         timeService,
+                        configuration,
                         mockGateway,
                         SUBTASK_INDEX,
                         5,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index fed65a42ccc..da22db8d0e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -72,6 +72,7 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                 reader,
                 watermarkStrategy,
                 timeService,
+                new Configuration(),
                 new MockOperatorEventGateway(),
                 1,
                 5,
@@ -85,6 +86,7 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
             SourceReader<T, MockSourceSplit> reader,
             WatermarkStrategy<T> watermarkStrategy,
             ProcessingTimeService timeService,
+            Configuration configuration,
             OperatorEventGateway eventGateway,
             int subtaskIndex,
             int parallelism,
@@ -99,7 +101,7 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                 new MockSourceSplitSerializer(),
                 watermarkStrategy,
                 timeService,
-                new Configuration(),
+                configuration,
                 "localhost",
                 emitProgressiveWatermarks,
                 () -> false,


Reply via email to