This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c90b9e4eb68ab9f3c8430d05ae1bc5196ce6c2f
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Jan 19 00:55:22 2026 +0100

    [FLINK-38939][runtime] Pause Sources until the first checkpoint barrier is 
received
    
    This allows to prioritize processing of recovered records
    (when recovering from an unaligned checkpoint)
---
 .../generated/checkpointing_configuration.html     |   6 +
 .../flink/configuration/CheckpointingOptions.java  |  12 ++
 .../tasks/CheckpointCoordinatorConfiguration.java  |  17 +-
 .../api/environment/CheckpointConfig.java          |  13 ++
 .../flink/streaming/api/graph/StreamGraph.java     |   2 +
 .../api/graph/StreamingJobGraphGenerator.java      |  13 ++
 .../streaming/api/operators/SourceOperator.java    |  34 +++-
 .../api/operators/SourceOperatorFactory.java       |  17 +-
 .../api/operators/SourceOperatorAlignmentTest.java |   6 +-
 .../SourceOperatorSplitWatermarkAlignmentTest.java |   2 +
 .../api/operators/SourceOperatorTest.java          | 190 +++++++++++++++------
 .../api/operators/SourceOperatorTestContext.java   |  65 ++++++-
 .../operators/source/TestingSourceOperator.java    |   9 +-
 .../apache/flink/streaming/util/MockOutput.java    |   4 +
 .../tasks/StreamTaskMailboxTestHarness.java        |   2 +-
 .../OperatorEventSendingCheckpointITCase.java      |  22 ++-
 .../CheckpointIntervalDuringBacklogITCase.java     |  10 +-
 .../UnalignedCheckpointFailureHandlingITCase.java  |   3 +-
 .../NumberSequenceSourceWithWaitForCheckpoint.java |  20 ++-
 19 files changed, 361 insertions(+), 86 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index cb38355a92a..6383e0bfc34 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -194,5 +194,11 @@
             <td>Integer</td>
             <td>The default size of the write buffer for the checkpoint 
streams that write to file systems. The actual write buffer size is determined 
to be the maximum of the value of this option and option 
'execution.checkpointing.data-inline-threshold'.</td>
         </tr>
+        <tr>
+            <td><h5>pipeline.sources.pause-until-first-checkpoint</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Don't pull any data from sources until the first checkpoint is 
triggered. This might be helpful in reducing recovery times in cases where 
recovered records from unaligned checkpoint compete with new incoming records 
for processing. Incompatible with value 0 (disabled) for 
execution.checkpointing.interval-during-backlog</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 9921ffc8f5f..ce205077474 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -627,6 +627,18 @@ public class CheckpointingOptions {
                                                     "the important 
considerations"))
                                     .build());
 
+    public static final ConfigOption<Boolean> 
PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT =
+            key("pipeline.sources.pause-until-first-checkpoint")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Don't pull any data from sources until the first 
checkpoint is triggered. "
+                                    + "This might be helpful in reducing 
recovery times in cases where "
+                                    + "recovered records from unaligned 
checkpoint compete with new incoming records for processing. "
+                                    + "Incompatible with value 0 (disabled) 
for "
+                                    + 
CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG
+                                            .key());
+
     // TODO: deprecated
     // Currently, both two file merging mechanism can work simultaneously:
     //  1. If UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE=1 and
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index cc755fc17a4..c0f4c07bba4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -73,6 +73,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
 
     private final boolean recoverOutputOnDownstreamTask;
 
+    private final boolean pauseSourcesUntilFirstCheckpoint;
+
     /**
      * @deprecated use {@link #builder()}.
      */
@@ -101,6 +103,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                 0,
                 checkpointIdOfIgnoredInFlightData,
                 false,
+                false,
                 false);
     }
 
@@ -117,7 +120,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
             long alignedCheckpointTimeout,
             long checkpointIdOfIgnoredInFlightData,
             boolean enableCheckpointsAfterTasksFinish,
-            boolean recoverOutputOnDownstreamTask) {
+            boolean recoverOutputOnDownstreamTask,
+            boolean pauseSourcesUntilFirstCheckpoint) {
 
         if (checkpointIntervalDuringBacklog < MINIMAL_CHECKPOINT_TIME) {
             // interval of max value means disable periodic checkpoint
@@ -149,6 +153,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
         this.checkpointIdOfIgnoredInFlightData = 
checkpointIdOfIgnoredInFlightData;
         this.enableCheckpointsAfterTasksFinish = 
enableCheckpointsAfterTasksFinish;
         this.recoverOutputOnDownstreamTask = recoverOutputOnDownstreamTask;
+        this.pauseSourcesUntilFirstCheckpoint = 
pauseSourcesUntilFirstCheckpoint;
     }
 
     public long getCheckpointInterval() {
@@ -297,6 +302,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
         private long checkpointIdOfIgnoredInFlightData;
         private boolean enableCheckpointsAfterTasksFinish;
         private boolean recoverOutputOnDownstreamTask;
+        private boolean pauseSourcesUntilFirstCheckpoint;
 
         public CheckpointCoordinatorConfiguration build() {
             return new CheckpointCoordinatorConfiguration(
@@ -312,7 +318,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                     alignedCheckpointTimeout,
                     checkpointIdOfIgnoredInFlightData,
                     enableCheckpointsAfterTasksFinish,
-                    recoverOutputOnDownstreamTask);
+                    recoverOutputOnDownstreamTask,
+                    pauseSourcesUntilFirstCheckpoint);
         }
 
         public CheckpointCoordinatorConfigurationBuilder setCheckpointInterval(
@@ -386,6 +393,12 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
             return this;
         }
 
+        public CheckpointCoordinatorConfigurationBuilder 
setPauseSourcesUntilFirstCheckpoint(
+                boolean pauseSourcesUntilFirstCheckpoint1) {
+            pauseSourcesUntilFirstCheckpoint = 
pauseSourcesUntilFirstCheckpoint1;
+            return this;
+        }
+
         public CheckpointCoordinatorConfigurationBuilder 
setRecoverOutputOnDownstreamTask(
                 boolean recoverOutputOnDownstreamTask) {
             this.recoverOutputOnDownstreamTask = recoverOutputOnDownstreamTask;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 42e2be79cf3..8d0982a772d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -640,6 +640,19 @@ public class CheckpointConfig implements 
java.io.Serializable {
         configuration
                 .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
                 .ifPresent(this::setCheckpointDirectory);
+        configuration
+                
.getOptional(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT)
+                .ifPresent(this::setPauseSourcesUntilFirstCheckpoint);
+    }
+
+    public void setPauseSourcesUntilFirstCheckpoint(boolean 
pauseCheckpointsIfTasksNotRunning) {
+        this.configuration.set(
+                CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT,
+                pauseCheckpointsIfTasksNotRunning);
+    }
+
+    public boolean isPauseSourcesUntilFirstCheckpoint() {
+        return 
this.configuration.get(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT);
     }
 
     private void setCheckpointDirectory(String checkpointDirectory) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 887c22a395e..db80a3bca41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -394,6 +394,8 @@ public class StreamGraph implements Pipeline, ExecutionPlan 
{
                                 jobConfiguration.get(
                                         CheckpointingOptions
                                                 
.UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM))
+                        .setPauseSourcesUntilFirstCheckpoint(
+                                cfg.isPauseSourcesUntilFirstCheckpoint())
                         .build(),
                 serializedStateBackend,
                 getJobConfiguration()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 14ee3fdffc1..aa503e3b20b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
 import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
 import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import 
org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
 import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
@@ -536,6 +537,18 @@ public class StreamingJobGraphGenerator {
                 }
             }
         }
+
+        if (checkpointConfig.isCheckpointingEnabled()
+                && checkpointConfig.getCheckpointIntervalDuringBacklog()
+                        == 
CheckpointCoordinatorConfiguration.DISABLED_CHECKPOINT_INTERVAL
+                && checkpointConfig.isPauseSourcesUntilFirstCheckpoint()) {
+            throw new IllegalArgumentException(
+                    "Pausing sources until first checkpoint is incompatible 
with disabling checkpoints during backlog processing. "
+                            + "Please review and choose whether you require "
+                            + 
CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT.key()
+                            + " or "
+                            + 
CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG.key());
+        }
     }
 
     private static boolean hasCustomPartitioner(StreamNode node) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 09b12007d33..eeb1924a653 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -181,6 +181,10 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private final Set<String> currentlyPausedSplits = new HashSet<>();
 
+    private boolean waitingForCheckpoint;
+
+    private final CompletableFuture<Void> checkpointsStartedFuture;
+
     private final Map<String, InternalSourceSplitMetricGroup> 
splitMetricGroups = new HashMap<>();
 
     private enum OperatingMode {
@@ -230,7 +234,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             boolean emitProgressiveWatermarks,
             CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
             Map<String, Boolean> watermarkIsAlignedMap,
-            boolean supportsSplitReassignmentOnRecovery) {
+            boolean supportsSplitReassignmentOnRecovery,
+            boolean pauseSourcesUntilFirstCheckpoint) {
         super(parameters);
         this.readerFactory = checkNotNull(readerFactory);
         this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -246,6 +251,13 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
         this.watermarkIsAlignedMap = watermarkIsAlignedMap;
         this.supportsSplitReassignmentOnRecovery = 
supportsSplitReassignmentOnRecovery;
+        this.waitingForCheckpoint = pauseSourcesUntilFirstCheckpoint;
+        //noinspection unchecked
+        this.checkpointsStartedFuture =
+                waitingForCheckpoint
+                        ? new CompletableFuture<>()
+                        : (CompletableFuture<Void>) AVAILABLE;
+        LOG.info("SourceOperator initialized, wait for 1st checkpoint: {}", 
waitingForCheckpoint);
     }
 
     @Override
@@ -487,6 +499,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception {
+        if (waitingForCheckpoint && operatingMode == 
OperatingMode.SOURCE_DRAINED) {
+            return DataInputStatus.END_OF_DATA;
+        }
         // guarding an assumptions we currently make due to the fact that 
certain classes
         // assume a constant output, this assumption does not need to stand if 
we emitted all
         // records. In that case the output will change to FinishedDataOutput
@@ -512,6 +527,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws 
Exception {
         switch (operatingMode) {
             case OUTPUT_NOT_INITIALIZED:
+                if (waitingForCheckpoint) {
+                    return DataInputStatus.NOTHING_AVAILABLE;
+                }
                 if (watermarkAlignmentParams.isEnabled()) {
                     // Only wrap the output when watermark alignment is 
enabled, as otherwise this
                     // introduces a small performance regression (probably 
because of an extra
@@ -606,6 +624,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
+        if (waitingForCheckpoint) {
+            waitingForCheckpoint = false;
+            checkpointsStartedFuture.complete(null);
+            LOG.info("Source un-paused (checkpoint barrier received)");
+        }
         long checkpointId = context.getCheckpointId();
         LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);
         readerState.update(sourceReader.snapshotState(checkpointId));
@@ -617,6 +640,10 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             case WAITING_FOR_ALIGNMENT:
                 return availabilityHelper.update(waitingForAlignmentFuture);
             case OUTPUT_NOT_INITIALIZED:
+                return availabilityHelper.update(
+                        waitingForCheckpoint
+                                ? checkpointsStartedFuture
+                                : sourceReader.isAvailable());
             case READING:
                 return availabilityHelper.update(sourceReader.isAvailable());
             case SOURCE_STOPPED:
@@ -634,6 +661,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         final ListState<byte[]> rawState =
                 
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
         readerState = new SimpleVersionedListState<>(rawState, 
splitSerializer);
+        if (waitingForCheckpoint && !context.isRestored()) {
+            LOG.debug("Not a recovery, won't wait for the checkpoint to emit 
records");
+            waitingForCheckpoint = false;
+            checkpointsStartedFuture.complete(null);
+        }
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 6d1cbc88897..dbe5a302190 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
 import 
org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -130,7 +131,15 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                         emitProgressiveWatermarks,
                         
parameters.getContainingTask().getCanEmitBatchOfRecords(),
                         getSourceWatermarkDeclarations(),
-                        source instanceof SupportsSplitReassignmentOnRecovery);
+                        source instanceof SupportsSplitReassignmentOnRecovery,
+                        CheckpointingOptions.isCheckpointingEnabled(
+                                        
parameters.getContainingTask().getJobConfiguration())
+                                && parameters
+                                        .getContainingTask()
+                                        .getJobConfiguration()
+                                        .get(
+                                                CheckpointingOptions
+                                                        
.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT));
 
         
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, 
sourceOperator);
 
@@ -202,7 +211,8 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                     boolean emitProgressiveWatermarks,
                     CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
                     Collection<? extends WatermarkDeclaration> 
watermarkDeclarations,
-                    boolean supportsSplitReassignmentOnRecovery) {
+                    boolean supportsSplitReassignmentOnRecovery,
+                    boolean pauseSourcesUntilFirstCheckpoint) {
 
         // jumping through generics hoops: cast the generics away to then cast 
them back more
         // strictly typed
@@ -235,6 +245,7 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                 emitProgressiveWatermarks,
                 canEmitBatchOfRecords,
                 watermarkIsAlignedMap,
-                supportsSplitReassignmentOnRecovery);
+                supportsSplitReassignmentOnRecovery,
+                pauseSourcesUntilFirstCheckpoint);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index 30b8fdd5063..b531d9b5d18 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -66,7 +66,8 @@ class SourceOperatorAlignmentTest {
                         WatermarkStrategy.forGenerator(ctx -> new 
PunctuatedGenerator())
                                 .withTimestampAssigner((r, t) -> r)
                                 .withWatermarkAlignment(
-                                        "group1", Duration.ofMillis(100), 
Duration.ofMillis(1)));
+                                        "group1", Duration.ofMillis(100), 
Duration.ofMillis(1)),
+                        false);
         operator = context.getOperator();
     }
 
@@ -140,7 +141,8 @@ class SourceOperatorAlignmentTest {
                                                         
PunctuatedGenerator.GenerationMode.ODD))
                                 .withWatermarkAlignment(
                                         "group1", Duration.ofMillis(100), 
Duration.ofMillis(1))
-                                .withTimestampAssigner((r, t) -> r))) {
+                                .withTimestampAssigner((r, t) -> r),
+                        false)) {
             final SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
             operator.initializeState(context.createStateContext());
             operator.open();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 35621fbf5bc..b5246f7b518 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -100,6 +100,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                         1,
                         5,
                         true,
+                        false,
                         false);
         operator.initializeState(
                 new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
@@ -549,6 +550,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                         1,
                         5,
                         true,
+                        false,
                         false);
         operator.initializeState(
                 new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index aa4d2cf7081..cd44bc08700 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -23,6 +23,10 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
@@ -31,8 +35,10 @@ import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
@@ -44,28 +50,33 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.CollectorOutput;
 import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /** Unit test for {@link SourceOperator}. */
 @SuppressWarnings("serial")
+@ExtendWith(ParameterizedTestExtension.class)
 class SourceOperatorTest {
 
     @Nullable private SourceOperatorTestContext context;
@@ -73,9 +84,16 @@ class SourceOperatorTest {
     @Nullable private MockSourceReader mockSourceReader;
     @Nullable private MockOperatorEventGateway mockGateway;
 
+    @Parameter public boolean pauseSourcesUntilCheckpoint;
+
+    @Parameters(name = "pauseSourcesUntilCheckpoint = {0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(true, false);
+    }
+
     @BeforeEach
     void setup() throws Exception {
-        context = new SourceOperatorTestContext();
+        context = new SourceOperatorTestContext(false, 
pauseSourcesUntilCheckpoint);
         operator = context.getOperator();
         mockSourceReader = context.getSourceReader();
         mockGateway = context.getGateway();
@@ -90,7 +108,7 @@ class SourceOperatorTest {
         mockGateway = null;
     }
 
-    @Test
+    @TestTemplate
     void testInitializeState() throws Exception {
         StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
@@ -102,49 +120,50 @@ class SourceOperatorTest {
                 .isNotNull();
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testOpen(boolean supportsSplitReassignmentOnRecovery) throws 
Exception {
-        try (SourceOperatorTestContext context =
-                new SourceOperatorTestContext(
-                        false,
-                        false,
-                        WatermarkStrategy.noWatermarks(),
-                        new MockOutput<>(new ArrayList<>()),
-                        supportsSplitReassignmentOnRecovery)) {
-            SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
-            // Initialize the operator.
-            operator.initializeState(context.createStateContext());
-            // Open the operator.
-            operator.open();
-            // The source reader should have been assigned a split.
-            if (supportsSplitReassignmentOnRecovery) {
-                
assertThat(context.getSourceReader().getAssignedSplits()).isEmpty();
-            } else {
-                assertThat(context.getSourceReader().getAssignedSplits())
-                        .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
-            }
-
-            // The source reader should have started.
-            assertThat(context.getSourceReader().isStarted()).isTrue();
-
-            // A ReaderRegistrationRequest should have been sent.
-            assertThat(context.getGateway().getEventsSent()).hasSize(1);
-            OperatorEvent operatorEvent = 
context.getGateway().getEventsSent().get(0);
-            
assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
-            ReaderRegistrationEvent registrationEvent = 
(ReaderRegistrationEvent) operatorEvent;
-            assertThat(registrationEvent.subtaskId())
-                    .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
-            if (supportsSplitReassignmentOnRecovery) {
-                assertThat(registrationEvent.splits(new 
MockSourceSplitSerializer()))
-                        .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
-            } else {
-                assertThat(registrationEvent.splits(new 
MockSourceSplitSerializer())).isEmpty();
+    @TestTemplate
+    void testOpen() throws Exception {
+        for (boolean supportsSplitReassignmentOnRecovery : new boolean[] 
{true, false}) {
+            try (SourceOperatorTestContext context =
+                    new SourceOperatorTestContext(
+                            false,
+                            false,
+                            WatermarkStrategy.noWatermarks(),
+                            new MockOutput<>(new ArrayList<>()),
+                            supportsSplitReassignmentOnRecovery)) {
+                SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
+                // Initialize the operator.
+                operator.initializeState(context.createStateContext());
+                // Open the operator.
+                operator.open();
+                // The source reader should have been assigned a split.
+                if (supportsSplitReassignmentOnRecovery) {
+                    
assertThat(context.getSourceReader().getAssignedSplits()).isEmpty();
+                } else {
+                    assertThat(context.getSourceReader().getAssignedSplits())
+                            
.containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+                }
+
+                // The source reader should have started.
+                assertThat(context.getSourceReader().isStarted()).isTrue();
+
+                // A ReaderRegistrationRequest should have been sent.
+                assertThat(context.getGateway().getEventsSent()).hasSize(1);
+                OperatorEvent operatorEvent = 
context.getGateway().getEventsSent().get(0);
+                
assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
+                ReaderRegistrationEvent registrationEvent = 
(ReaderRegistrationEvent) operatorEvent;
+                assertThat(registrationEvent.subtaskId())
+                        .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+                if (supportsSplitReassignmentOnRecovery) {
+                    assertThat(registrationEvent.splits(new 
MockSourceSplitSerializer()))
+                            
.containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+                } else {
+                    assertThat(registrationEvent.splits(new 
MockSourceSplitSerializer())).isEmpty();
+                }
             }
         }
     }
 
-    @Test
+    @TestTemplate
     void testStop() throws Exception {
         // Initialize the operator.
         operator.initializeState(context.createStateContext());
@@ -166,7 +185,7 @@ class SourceOperatorTest {
         assertThat(sourceStopped).isDone();
     }
 
-    @Test
+    @TestTemplate
     void testHandleAddSplitsEvent() throws Exception {
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -179,7 +198,7 @@ class SourceOperatorTest {
                 .containsExactly(SourceOperatorTestContext.MOCK_SPLIT, 
newSplit);
     }
 
-    @Test
+    @TestTemplate
     void testHandleAddSourceEvent() throws Exception {
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -189,7 +208,7 @@ class SourceOperatorTest {
         
assertThat(mockSourceReader.getReceivedSourceEvents()).containsExactly(event);
     }
 
-    @Test
+    @TestTemplate
     void testSnapshotState() throws Exception {
         StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
@@ -206,7 +225,7 @@ class SourceOperatorTest {
         
assertThat(splitsInState).containsExactly(SourceOperatorTestContext.MOCK_SPLIT, 
newSplit);
     }
 
-    @Test
+    @TestTemplate
     void testNotifyCheckpointComplete() throws Exception {
         StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
@@ -216,7 +235,7 @@ class SourceOperatorTest {
         
assertThat(mockSourceReader.getCompletedCheckpoints().get(0)).isEqualTo(100L);
     }
 
-    @Test
+    @TestTemplate
     void testNotifyCheckpointAborted() throws Exception {
         StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
@@ -226,7 +245,65 @@ class SourceOperatorTest {
         
assertThat(mockSourceReader.getAbortedCheckpoints().get(0)).isEqualTo(100L);
     }
 
-    @Test
+    @TestTemplate
+    public void testPausingUntilCheckpoint() throws Exception {
+        final List<StreamElement> out = new ArrayList<>();
+        try (SourceOperatorTestContext context =
+                new SourceOperatorTestContext(
+                        false,
+                        false,
+                        WatermarkStrategy.<Integer>forMonotonousTimestamps()
+                                .withTimestampAssigner((element, 
recordTimestamp) -> element),
+                        new CollectorOutput<>(out),
+                        false,
+                        pauseSourcesUntilCheckpoint,
+                        // recover with some state, so the source will pause 
until a checkpoint
+                        // to speedup recovery (if pauseSourcesUntilCheckpoint)
+                        (stateManager, operatorID) -> {
+                            long checkpointID = 1L;
+                            stateManager.setReportedCheckpointId(checkpointID);
+                            
stateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
+                                    singletonMap(
+                                            checkpointID,
+                                            new TaskStateSnapshot(
+                                                    singletonMap(
+                                                            operatorID,
+                                                            
OperatorSubtaskState.builder()
+                                                                    
.build()))));
+                        })) {
+
+            final SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
+            operator.open();
+
+            MockSourceSplit split = new MockSourceSplit(0);
+            split.addRecord(0);
+            operator.handleOperatorEvent(
+                    new AddSplitEvent<>(
+                            Collections.singletonList(split), new 
MockSourceSplitSerializer()));
+
+            operator.emitNext(new DataOutputToOutput<>(operator.output));
+
+            if (pauseSourcesUntilCheckpoint) {
+                assertThat(out).isEmpty();
+                assertThat(operator.isAvailable()).isFalse();
+                // un-pause
+                operator.snapshotState(
+                        2L,
+                        2L,
+                        CheckpointOptions.alignedNoTimeout(
+                                CheckpointType.CHECKPOINT,
+                                
CheckpointStorageLocationReference.getDefault()),
+                        new MemCheckpointStreamFactory(10240));
+                operator.emitNext(new DataOutputToOutput<>(operator.output));
+            }
+
+            assertThat(operator.isAvailable()).isTrue();
+            assertThat(out.stream().map(element -> 
element.asRecord().getValue()))
+                    .containsExactly(0);
+        }
+    }
+
+    @TestTemplate
     void testHandleBacklogEvent() throws Exception {
         List<StreamElement> outputStreamElements = new ArrayList<>();
         context =
@@ -236,7 +313,8 @@ class SourceOperatorTest {
                         WatermarkStrategy.<Integer>forMonotonousTimestamps()
                                 .withTimestampAssigner((element, 
recordTimestamp) -> element),
                         new CollectorOutput<>(outputStreamElements),
-                        false);
+                        false,
+                        pauseSourcesUntilCheckpoint);
         operator = context.getOperator();
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -264,7 +342,7 @@ class SourceOperatorTest {
                         new RecordAttributes(false));
     }
 
-    @Test
+    @TestTemplate
     public void testMetricGroupIsCreatedForNewSplit() throws Exception {
         operator.initializeState(context.createStateContext());
         operator.open();
@@ -275,7 +353,7 @@ class SourceOperatorTest {
         assertNotNull(operator.getSplitMetricGroup(newSplit.splitId()));
     }
 
-    @Test
+    @TestTemplate
     public void testMetricGroupIsCreatedForRestoredSplit() throws Exception {
         MockSourceSplit restoredSplit = new MockSourceSplit((1));
         StateInitializationContext stateContext =
@@ -285,7 +363,7 @@ class SourceOperatorTest {
         assertNotNull(operator.getSplitMetricGroup(restoredSplit.splitId()));
     }
 
-    @Test
+    @TestTemplate
     public void testMetricGroupTracksSplitWatermark() throws Exception {
         long expectedWatermark = 1000;
         operator.initializeState(context.createStateContext());
@@ -300,7 +378,7 @@ class SourceOperatorTest {
                 
operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark());
     }
 
-    @Test
+    @TestTemplate
     public void testMetricGroupReturnsDefaultIfNoSplitWatermark() throws 
Exception {
         long expectedWatermark = Watermark.UNINITIALIZED.getTimestamp();
         operator.initializeState(context.createStateContext());
@@ -314,7 +392,7 @@ class SourceOperatorTest {
                 
operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark());
     }
 
-    @Test
+    @TestTemplate
     public void testMultipleMetricGroupsReturnWatermarkOrDefaultWatermark() 
throws Exception {
         long expectedWatermarkValueForSplit0 = 
Watermark.UNINITIALIZED.getTimestamp();
         long expectedWatermarkValueForSplit1 = 1000;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index 6e198512445..c0286a53405 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
@@ -50,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.BiConsumer;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -64,18 +66,29 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
     private MockOperatorEventGateway mockGateway;
     private TestProcessingTimeService timeService;
     private SourceOperator<Integer, MockSourceSplit> operator;
+    public Output<StreamRecord<Integer>> output;
 
     public SourceOperatorTestContext() throws Exception {
-        this(false);
+        this(false, false);
     }
 
-    public SourceOperatorTestContext(boolean idle) throws Exception {
-        this(idle, WatermarkStrategy.noWatermarks());
+    public SourceOperatorTestContext(boolean idle, boolean 
pauseSourcesUntilFirstCheckpoint)
+            throws Exception {
+        this(idle, WatermarkStrategy.noWatermarks(), 
pauseSourcesUntilFirstCheckpoint);
     }
 
-    public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> 
watermarkStrategy)
+    public SourceOperatorTestContext(
+            boolean idle,
+            WatermarkStrategy<Integer> watermarkStrategy,
+            boolean pauseSourcesUntilFirstCheckpoint)
             throws Exception {
-        this(idle, false, watermarkStrategy, new MockOutput<>(new 
ArrayList<>()), false);
+        this(
+                idle,
+                false,
+                watermarkStrategy,
+                new MockOutput<>(new ArrayList<>()),
+                false,
+                pauseSourcesUntilFirstCheckpoint);
     }
 
     public SourceOperatorTestContext(
@@ -85,6 +98,43 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
             Output<StreamRecord<Integer>> output,
             boolean supportsSplitReassignmentOnRecovery)
             throws Exception {
+        this(
+                idle,
+                usePerSplitOutputs,
+                watermarkStrategy,
+                output,
+                supportsSplitReassignmentOnRecovery,
+                false,
+                (ign0, ign1) -> {});
+    }
+
+    public SourceOperatorTestContext(
+            boolean idle,
+            boolean usePerSplitOutputs,
+            WatermarkStrategy<Integer> watermarkStrategy,
+            Output<StreamRecord<Integer>> output,
+            boolean supportsSplitReassignmentOnRecovery,
+            boolean pauseSourcesUntilFirstCheckpoint)
+            throws Exception {
+        this(
+                idle,
+                usePerSplitOutputs,
+                watermarkStrategy,
+                output,
+                supportsSplitReassignmentOnRecovery,
+                pauseSourcesUntilFirstCheckpoint,
+                (ign0, ign1) -> {});
+    }
+
+    public SourceOperatorTestContext(
+            boolean idle,
+            boolean usePerSplitOutputs,
+            WatermarkStrategy<Integer> watermarkStrategy,
+            Output<StreamRecord<Integer>> output,
+            boolean supportsSplitReassignmentOnRecovery,
+            boolean pauseSourcesUntilFirstCheckpoint,
+            BiConsumer<TestTaskStateManager, OperatorID> preInit)
+            throws Exception {
         mockSourceReader =
                 new MockSourceReader(
                         idle
@@ -94,6 +144,7 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
                         usePerSplitOutputs);
         mockGateway = new MockOperatorEventGateway();
         timeService = new TestProcessingTimeService();
+        this.output = output;
         Environment env = getTestingEnvironment();
         operator =
                 new TestingSourceOperator<>(
@@ -111,7 +162,9 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
                         SUBTASK_INDEX,
                         5,
                         true,
-                        supportsSplitReassignmentOnRecovery);
+                        supportsSplitReassignmentOnRecovery,
+                        pauseSourcesUntilFirstCheckpoint);
+        preInit.accept((TestTaskStateManager) env.getTaskStateManager(), 
operator.getOperatorID());
         operator.initializeState(
                 new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index dc8ebc9806e..fed65a42ccc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -76,7 +76,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                 1,
                 5,
                 emitProgressiveWatermarks,
-                supportsSplitReassignmentOnRecovery);
+                supportsSplitReassignmentOnRecovery,
+                false);
     }
 
     public TestingSourceOperator(
@@ -88,7 +89,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
             int subtaskIndex,
             int parallelism,
             boolean emitProgressiveWatermarks,
-            boolean supportsSplitReassignmentOnRecovery) {
+            boolean supportsSplitReassignmentOnRecovery,
+            boolean pauseSourcesUntilFirstCheckpoint) {
 
         super(
                 parameters,
@@ -102,7 +104,8 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
                 emitProgressiveWatermarks,
                 () -> false,
                 Collections.emptyMap(),
-                supportsSplitReassignmentOnRecovery);
+                supportsSplitReassignmentOnRecovery,
+                pauseSourcesUntilFirstCheckpoint);
 
         this.subtaskIndex = subtaskIndex;
         this.parallelism = parallelism;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index e12dcfa8361..9ed9f92e3c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -83,4 +83,8 @@ public class MockOutput<T> implements Output<StreamRecord<T>> 
{
 
     @Override
     public void close() {}
+
+    public Collection<T> getOutputs() {
+        return outputs;
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index 0929f3940e7..4e733481536 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -167,7 +167,7 @@ public class StreamTaskMailboxTestHarness<OUT> implements 
AutoCloseable {
         cleanUp();
     }
 
-    public  void cleanUp() throws Exception {
+    public void cleanUp() throws Exception {
         streamTask.cleanUp(null);
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 2ef37eddd13..953df0e08a5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -51,9 +51,13 @@ import org.apache.flink.util.function.TriFunction;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -70,11 +74,19 @@ import static org.junit.Assert.assertEquals;
  * A test suite for source enumerator (operator coordinator) for situations 
where RPC calls for
  * split assignments (operator events) fails from time to time.
  */
+@RunWith(Parameterized.class)
 public class OperatorEventSendingCheckpointITCase extends TestLogger {
 
     private static final int PARALLELISM = 1;
     private static MiniCluster flinkCluster;
 
+    @Parameterized.Parameter public boolean pauseSources;
+
+    @Parameterized.Parameters(name = "pauseSources: {0}")
+    public static Collection<Boolean> specs() {
+        return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+    }
+
     @BeforeClass
     public static void setupMiniClusterAndEnv() throws Exception {
         Configuration config = new Configuration();
@@ -198,6 +210,7 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         env.enableCheckpointing(50);
+        
env.getCheckpointConfig().setPauseSourcesUntilFirstCheckpoint(pauseSources);
 
         // This test depends on checkpoints persisting progress from the 
source before the
         // artificial exception gets triggered. Otherwise, the job will run 
for a long time (or
@@ -217,7 +230,14 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
 
         final DataStream<Long> numbers =
                 env.fromSource(
-                                new 
NumberSequenceSourceWithWaitForCheckpoint(1L, numElements, 3),
+                                new NumberSequenceSourceWithWaitForCheckpoint(
+                                        1L,
+                                        numElements,
+                                        3,
+                                        env.getCheckpointConfig()
+                                                        
.isPauseSourcesUntilFirstCheckpoint()
+                                                ? 2
+                                                : 1),
                                 WatermarkStrategy.noWatermarks(),
                                 "numbers")
                         .map(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
index b73caf5a156..91cf8782dfc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
@@ -102,10 +102,10 @@ public class CheckpointIntervalDuringBacklogITCase {
         Source<Long, ?, ?> source =
                 HybridSource.builder(
                                 new NumberSequenceSourceWithWaitForCheckpoint(
-                                        0, NUM_RECORDS / 2 - 1, NUM_SPLITS))
+                                        0, NUM_RECORDS / 2 - 1, NUM_SPLITS, 1))
                         .addSource(
                                 new NumberSequenceSourceWithWaitForCheckpoint(
-                                        NUM_RECORDS / 2, NUM_RECORDS - 1, 
NUM_SPLITS))
+                                        NUM_RECORDS / 2, NUM_RECORDS - 1, 
NUM_SPLITS, 1))
                         .build();
 
         runAndVerifyResult(env, source);
@@ -127,7 +127,7 @@ public class CheckpointIntervalDuringBacklogITCase {
         Source<Long, ?, ?> source =
                 HybridSource.builder(
                                 new NumberSequenceSourceWithWaitForCheckpoint(
-                                        0, NUM_RECORDS / 2 - 1, NUM_SPLITS))
+                                        0, NUM_RECORDS / 2 - 1, NUM_SPLITS, 1))
                         .addSource(new NumberSequenceSource(NUM_RECORDS / 2, 
NUM_RECORDS - 1))
                         .build();
 
@@ -152,6 +152,7 @@ public class CheckpointIntervalDuringBacklogITCase {
         configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
firstCheckpointTime);
         configuration.set(
                 CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, 
Duration.ofMillis(0));
+        
configuration.set(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT, 
false);
         final StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(1);
@@ -174,6 +175,7 @@ public class CheckpointIntervalDuringBacklogITCase {
         configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
         configuration.set(
                 CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, 
Duration.ofMillis(0));
+        
configuration.set(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT, 
false);
         final StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(1);
@@ -189,7 +191,7 @@ public class CheckpointIntervalDuringBacklogITCase {
         DataStream<Long> source =
                 env.fromSource(
                         new NumberSequenceSourceWithWaitForCheckpoint(
-                                2, NUM_RECORDS - 1, NUM_SPLITS),
+                                2, NUM_RECORDS - 1, NUM_SPLITS, 1),
                         WatermarkStrategy.noWatermarks(),
                         "non-backlog-source");
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index 1cb92405958..b16ff4e476e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -127,7 +127,8 @@ public class UnalignedCheckpointFailureHandlingITCase {
 
     private void configure(StreamExecutionEnvironment env, String 
storageFactory) {
         // enable checkpointing but only via API
-        env.enableCheckpointing(Long.MAX_VALUE, 
CheckpointingMode.EXACTLY_ONCE);
+        // use big enough interval so a manual checkpoint can be performed
+        env.enableCheckpointing(100000, CheckpointingMode.EXACTLY_ONCE);
 
         CheckpointStorageUtils.configureCheckpointStorageWithFactory(env, 
storageFactory);
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java
 
b/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java
index 60455c094bc..3a361bb4dd0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java
@@ -43,14 +43,17 @@ import java.util.Queue;
  */
 public class NumberSequenceSourceWithWaitForCheckpoint extends 
NumberSequenceSource {
     private static final long serialVersionUID = 1L;
+    private final int numCheckpointsToWait;
 
     private final int numSplits;
     private final long numAllowedMessageBeforeCheckpoint;
 
-    public NumberSequenceSourceWithWaitForCheckpoint(long from, long to, int 
numSplits) {
+    public NumberSequenceSourceWithWaitForCheckpoint(
+            long from, long to, int numSplits, int numCheckpointsToWait) {
         super(from, to);
         this.numSplits = numSplits;
         this.numAllowedMessageBeforeCheckpoint = (to - from) / numSplits;
+        this.numCheckpointsToWait = numCheckpointsToWait;
     }
 
     @Override
@@ -63,7 +66,7 @@ public class NumberSequenceSourceWithWaitForCheckpoint 
extends NumberSequenceSou
     @Override
     public SourceReader<Long, NumberSequenceSplit> 
createReader(SourceReaderContext readerContext) {
         return new CheckpointListeningIteratorSourceReader<>(
-                readerContext, numAllowedMessageBeforeCheckpoint);
+                readerContext, numAllowedMessageBeforeCheckpoint, 
numCheckpointsToWait);
     }
 
     /**
@@ -112,19 +115,24 @@ public class NumberSequenceSourceWithWaitForCheckpoint 
extends NumberSequenceSou
     private static class CheckpointListeningIteratorSourceReader<
                     E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
             extends IteratorSourceReader<E, IterT, SplitT> {
-        private boolean checkpointed = false;
+        private int completedCheckpoints = 0;
         private long messagesProduced = 0;
         private final long numAllowedMessageBeforeCheckpoint;
+        private final int numCheckpointsToWait;
 
         public CheckpointListeningIteratorSourceReader(
-                SourceReaderContext context, long 
waitForCheckpointAfterMessages) {
+                SourceReaderContext context,
+                long waitForCheckpointAfterMessages,
+                int numCheckpointsToWait) {
             super(context);
             this.numAllowedMessageBeforeCheckpoint = 
waitForCheckpointAfterMessages;
+            this.numCheckpointsToWait = numCheckpointsToWait;
         }
 
         @Override
         public InputStatus pollNext(ReaderOutput<E> output) {
-            if (messagesProduced < numAllowedMessageBeforeCheckpoint || 
checkpointed) {
+            if (messagesProduced < numAllowedMessageBeforeCheckpoint
+                    || completedCheckpoints >= numCheckpointsToWait) {
                 messagesProduced++;
                 return super.pollNext(output);
             } else {
@@ -134,7 +142,7 @@ public class NumberSequenceSourceWithWaitForCheckpoint 
extends NumberSequenceSou
 
         @Override
         public void notifyCheckpointComplete(long checkpointId) {
-            checkpointed = true;
+            completedCheckpoints++;
         }
     }
 }

Reply via email to