This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c90b9e4eb68ab9f3c8430d05ae1bc5196ce6c2f Author: Roman Khachatryan <[email protected]> AuthorDate: Mon Jan 19 00:55:22 2026 +0100 [FLINK-38939][runtime] Pause Sources until the first checkpoint barrier is received This allows to prioritize processing of recovered records (when recovering from an unaligned checkpoint) --- .../generated/checkpointing_configuration.html | 6 + .../flink/configuration/CheckpointingOptions.java | 12 ++ .../tasks/CheckpointCoordinatorConfiguration.java | 17 +- .../api/environment/CheckpointConfig.java | 13 ++ .../flink/streaming/api/graph/StreamGraph.java | 2 + .../api/graph/StreamingJobGraphGenerator.java | 13 ++ .../streaming/api/operators/SourceOperator.java | 34 +++- .../api/operators/SourceOperatorFactory.java | 17 +- .../api/operators/SourceOperatorAlignmentTest.java | 6 +- .../SourceOperatorSplitWatermarkAlignmentTest.java | 2 + .../api/operators/SourceOperatorTest.java | 190 +++++++++++++++------ .../api/operators/SourceOperatorTestContext.java | 65 ++++++- .../operators/source/TestingSourceOperator.java | 9 +- .../apache/flink/streaming/util/MockOutput.java | 4 + .../tasks/StreamTaskMailboxTestHarness.java | 2 +- .../OperatorEventSendingCheckpointITCase.java | 22 ++- .../CheckpointIntervalDuringBacklogITCase.java | 10 +- .../UnalignedCheckpointFailureHandlingITCase.java | 3 +- .../NumberSequenceSourceWithWaitForCheckpoint.java | 20 ++- 19 files changed, 361 insertions(+), 86 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index cb38355a92a..6383e0bfc34 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -194,5 +194,11 @@ <td>Integer</td> <td>The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'execution.checkpointing.data-inline-threshold'.</td> </tr> + <tr> + <td><h5>pipeline.sources.pause-until-first-checkpoint</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Don't pull any data from sources until the first checkpoint is triggered. This might be helpful in reducing recovery times in cases where recovered records from unaligned checkpoint compete with new incoming records for processing. Incompatible with value 0 (disabled) for execution.checkpointing.interval-during-backlog</td> + </tr> </tbody> </table> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index 9921ffc8f5f..ce205077474 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -627,6 +627,18 @@ public class CheckpointingOptions { "the important considerations")) .build()); + public static final ConfigOption<Boolean> PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT = + key("pipeline.sources.pause-until-first-checkpoint") + .booleanType() + .defaultValue(false) + .withDescription( + "Don't pull any data from sources until the first checkpoint is triggered. " + + "This might be helpful in reducing recovery times in cases where " + + "recovered records from unaligned checkpoint compete with new incoming records for processing. " + + "Incompatible with value 0 (disabled) for " + + CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG + .key()); + // TODO: deprecated // Currently, both two file merging mechanism can work simultaneously: // 1. If UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE=1 and diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index cc755fc17a4..c0f4c07bba4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -73,6 +73,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private final boolean recoverOutputOnDownstreamTask; + private final boolean pauseSourcesUntilFirstCheckpoint; + /** * @deprecated use {@link #builder()}. */ @@ -101,6 +103,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { 0, checkpointIdOfIgnoredInFlightData, false, + false, false); } @@ -117,7 +120,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { long alignedCheckpointTimeout, long checkpointIdOfIgnoredInFlightData, boolean enableCheckpointsAfterTasksFinish, - boolean recoverOutputOnDownstreamTask) { + boolean recoverOutputOnDownstreamTask, + boolean pauseSourcesUntilFirstCheckpoint) { if (checkpointIntervalDuringBacklog < MINIMAL_CHECKPOINT_TIME) { // interval of max value means disable periodic checkpoint @@ -149,6 +153,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData; this.enableCheckpointsAfterTasksFinish = enableCheckpointsAfterTasksFinish; this.recoverOutputOnDownstreamTask = recoverOutputOnDownstreamTask; + this.pauseSourcesUntilFirstCheckpoint = pauseSourcesUntilFirstCheckpoint; } public long getCheckpointInterval() { @@ -297,6 +302,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private long checkpointIdOfIgnoredInFlightData; private boolean enableCheckpointsAfterTasksFinish; private boolean recoverOutputOnDownstreamTask; + private boolean pauseSourcesUntilFirstCheckpoint; public CheckpointCoordinatorConfiguration build() { return new CheckpointCoordinatorConfiguration( @@ -312,7 +318,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { alignedCheckpointTimeout, checkpointIdOfIgnoredInFlightData, enableCheckpointsAfterTasksFinish, - recoverOutputOnDownstreamTask); + recoverOutputOnDownstreamTask, + pauseSourcesUntilFirstCheckpoint); } public CheckpointCoordinatorConfigurationBuilder setCheckpointInterval( @@ -386,6 +393,12 @@ public class CheckpointCoordinatorConfiguration implements Serializable { return this; } + public CheckpointCoordinatorConfigurationBuilder setPauseSourcesUntilFirstCheckpoint( + boolean pauseSourcesUntilFirstCheckpoint1) { + pauseSourcesUntilFirstCheckpoint = pauseSourcesUntilFirstCheckpoint1; + return this; + } + public CheckpointCoordinatorConfigurationBuilder setRecoverOutputOnDownstreamTask( boolean recoverOutputOnDownstreamTask) { this.recoverOutputOnDownstreamTask = recoverOutputOnDownstreamTask; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 42e2be79cf3..8d0982a772d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -640,6 +640,19 @@ public class CheckpointConfig implements java.io.Serializable { configuration .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY) .ifPresent(this::setCheckpointDirectory); + configuration + .getOptional(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT) + .ifPresent(this::setPauseSourcesUntilFirstCheckpoint); + } + + public void setPauseSourcesUntilFirstCheckpoint(boolean pauseCheckpointsIfTasksNotRunning) { + this.configuration.set( + CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT, + pauseCheckpointsIfTasksNotRunning); + } + + public boolean isPauseSourcesUntilFirstCheckpoint() { + return this.configuration.get(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT); } private void setCheckpointDirectory(String checkpointDirectory) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 887c22a395e..db80a3bca41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -394,6 +394,8 @@ public class StreamGraph implements Pipeline, ExecutionPlan { jobConfiguration.get( CheckpointingOptions .UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM)) + .setPauseSourcesUntilFirstCheckpoint( + cfg.isPauseSourcesUntilFirstCheckpoint()) .build(), serializedStateBackend, getJobConfiguration() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 14ee3fdffc1..aa503e3b20b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; @@ -536,6 +537,18 @@ public class StreamingJobGraphGenerator { } } } + + if (checkpointConfig.isCheckpointingEnabled() + && checkpointConfig.getCheckpointIntervalDuringBacklog() + == CheckpointCoordinatorConfiguration.DISABLED_CHECKPOINT_INTERVAL + && checkpointConfig.isPauseSourcesUntilFirstCheckpoint()) { + throw new IllegalArgumentException( + "Pausing sources until first checkpoint is incompatible with disabling checkpoints during backlog processing. " + + "Please review and choose whether you require " + + CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT.key() + + " or " + + CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG.key()); + } } private static boolean hasCustomPartitioner(StreamNode node) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 09b12007d33..eeb1924a653 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -181,6 +181,10 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final Set<String> currentlyPausedSplits = new HashSet<>(); + private boolean waitingForCheckpoint; + + private final CompletableFuture<Void> checkpointsStartedFuture; + private final Map<String, InternalSourceSplitMetricGroup> splitMetricGroups = new HashMap<>(); private enum OperatingMode { @@ -230,7 +234,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr boolean emitProgressiveWatermarks, CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, Map<String, Boolean> watermarkIsAlignedMap, - boolean supportsSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery, + boolean pauseSourcesUntilFirstCheckpoint) { super(parameters); this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); @@ -246,6 +251,13 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords); this.watermarkIsAlignedMap = watermarkIsAlignedMap; this.supportsSplitReassignmentOnRecovery = supportsSplitReassignmentOnRecovery; + this.waitingForCheckpoint = pauseSourcesUntilFirstCheckpoint; + //noinspection unchecked + this.checkpointsStartedFuture = + waitingForCheckpoint + ? new CompletableFuture<>() + : (CompletableFuture<Void>) AVAILABLE; + LOG.info("SourceOperator initialized, wait for 1st checkpoint: {}", waitingForCheckpoint); } @Override @@ -487,6 +499,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public DataInputStatus emitNext(DataOutput<OUT> output) throws Exception { + if (waitingForCheckpoint && operatingMode == OperatingMode.SOURCE_DRAINED) { + return DataInputStatus.END_OF_DATA; + } // guarding an assumptions we currently make due to the fact that certain classes // assume a constant output, this assumption does not need to stand if we emitted all // records. In that case the output will change to FinishedDataOutput @@ -512,6 +527,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception { switch (operatingMode) { case OUTPUT_NOT_INITIALIZED: + if (waitingForCheckpoint) { + return DataInputStatus.NOTHING_AVAILABLE; + } if (watermarkAlignmentParams.isEnabled()) { // Only wrap the output when watermark alignment is enabled, as otherwise this // introduces a small performance regression (probably because of an extra @@ -606,6 +624,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void snapshotState(StateSnapshotContext context) throws Exception { + if (waitingForCheckpoint) { + waitingForCheckpoint = false; + checkpointsStartedFuture.complete(null); + LOG.info("Source un-paused (checkpoint barrier received)"); + } long checkpointId = context.getCheckpointId(); LOG.debug("Taking a snapshot for checkpoint {}", checkpointId); readerState.update(sourceReader.snapshotState(checkpointId)); @@ -617,6 +640,10 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr case WAITING_FOR_ALIGNMENT: return availabilityHelper.update(waitingForAlignmentFuture); case OUTPUT_NOT_INITIALIZED: + return availabilityHelper.update( + waitingForCheckpoint + ? checkpointsStartedFuture + : sourceReader.isAvailable()); case READING: return availabilityHelper.update(sourceReader.isAvailable()); case SOURCE_STOPPED: @@ -634,6 +661,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); readerState = new SimpleVersionedListState<>(rawState, splitSerializer); + if (waitingForCheckpoint && !context.isRestored()) { + LOG.debug("Not a recovery, won't wait for the checkpoint to emit records"); + waitingForCheckpoint = false; + checkpointsStartedFuture.complete(null); + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 6d1cbc88897..dbe5a302190 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -130,7 +131,15 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU emitProgressiveWatermarks, parameters.getContainingTask().getCanEmitBatchOfRecords(), getSourceWatermarkDeclarations(), - source instanceof SupportsSplitReassignmentOnRecovery); + source instanceof SupportsSplitReassignmentOnRecovery, + CheckpointingOptions.isCheckpointingEnabled( + parameters.getContainingTask().getJobConfiguration()) + && parameters + .getContainingTask() + .getJobConfiguration() + .get( + CheckpointingOptions + .PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT)); parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator); @@ -202,7 +211,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU boolean emitProgressiveWatermarks, CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, Collection<? extends WatermarkDeclaration> watermarkDeclarations, - boolean supportsSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery, + boolean pauseSourcesUntilFirstCheckpoint) { // jumping through generics hoops: cast the generics away to then cast them back more // strictly typed @@ -235,6 +245,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU emitProgressiveWatermarks, canEmitBatchOfRecords, watermarkIsAlignedMap, - supportsSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery, + pauseSourcesUntilFirstCheckpoint); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index 30b8fdd5063..b531d9b5d18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -66,7 +66,8 @@ class SourceOperatorAlignmentTest { WatermarkStrategy.forGenerator(ctx -> new PunctuatedGenerator()) .withTimestampAssigner((r, t) -> r) .withWatermarkAlignment( - "group1", Duration.ofMillis(100), Duration.ofMillis(1))); + "group1", Duration.ofMillis(100), Duration.ofMillis(1)), + false); operator = context.getOperator(); } @@ -140,7 +141,8 @@ class SourceOperatorAlignmentTest { PunctuatedGenerator.GenerationMode.ODD)) .withWatermarkAlignment( "group1", Duration.ofMillis(100), Duration.ofMillis(1)) - .withTimestampAssigner((r, t) -> r))) { + .withTimestampAssigner((r, t) -> r), + false)) { final SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); operator.initializeState(context.createStateContext()); operator.open(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 35621fbf5bc..b5246f7b518 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -100,6 +100,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { 1, 5, true, + false, false); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); @@ -549,6 +550,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { 1, 5, true, + false, false); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index aa4d2cf7081..cd44bc08700 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -23,6 +23,10 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; @@ -31,8 +35,10 @@ import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.DataInputStatus; @@ -44,28 +50,33 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.CollectorOutput; import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; /** Unit test for {@link SourceOperator}. */ @SuppressWarnings("serial") +@ExtendWith(ParameterizedTestExtension.class) class SourceOperatorTest { @Nullable private SourceOperatorTestContext context; @@ -73,9 +84,16 @@ class SourceOperatorTest { @Nullable private MockSourceReader mockSourceReader; @Nullable private MockOperatorEventGateway mockGateway; + @Parameter public boolean pauseSourcesUntilCheckpoint; + + @Parameters(name = "pauseSourcesUntilCheckpoint = {0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(true, false); + } + @BeforeEach void setup() throws Exception { - context = new SourceOperatorTestContext(); + context = new SourceOperatorTestContext(false, pauseSourcesUntilCheckpoint); operator = context.getOperator(); mockSourceReader = context.getSourceReader(); mockGateway = context.getGateway(); @@ -90,7 +108,7 @@ class SourceOperatorTest { mockGateway = null; } - @Test + @TestTemplate void testInitializeState() throws Exception { StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); @@ -102,49 +120,50 @@ class SourceOperatorTest { .isNotNull(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testOpen(boolean supportsSplitReassignmentOnRecovery) throws Exception { - try (SourceOperatorTestContext context = - new SourceOperatorTestContext( - false, - false, - WatermarkStrategy.noWatermarks(), - new MockOutput<>(new ArrayList<>()), - supportsSplitReassignmentOnRecovery)) { - SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); - // Initialize the operator. - operator.initializeState(context.createStateContext()); - // Open the operator. - operator.open(); - // The source reader should have been assigned a split. - if (supportsSplitReassignmentOnRecovery) { - assertThat(context.getSourceReader().getAssignedSplits()).isEmpty(); - } else { - assertThat(context.getSourceReader().getAssignedSplits()) - .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); - } - - // The source reader should have started. - assertThat(context.getSourceReader().isStarted()).isTrue(); - - // A ReaderRegistrationRequest should have been sent. - assertThat(context.getGateway().getEventsSent()).hasSize(1); - OperatorEvent operatorEvent = context.getGateway().getEventsSent().get(0); - assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class); - ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent) operatorEvent; - assertThat(registrationEvent.subtaskId()) - .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX); - if (supportsSplitReassignmentOnRecovery) { - assertThat(registrationEvent.splits(new MockSourceSplitSerializer())) - .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); - } else { - assertThat(registrationEvent.splits(new MockSourceSplitSerializer())).isEmpty(); + @TestTemplate + void testOpen() throws Exception { + for (boolean supportsSplitReassignmentOnRecovery : new boolean[] {true, false}) { + try (SourceOperatorTestContext context = + new SourceOperatorTestContext( + false, + false, + WatermarkStrategy.noWatermarks(), + new MockOutput<>(new ArrayList<>()), + supportsSplitReassignmentOnRecovery)) { + SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); + // Initialize the operator. + operator.initializeState(context.createStateContext()); + // Open the operator. + operator.open(); + // The source reader should have been assigned a split. + if (supportsSplitReassignmentOnRecovery) { + assertThat(context.getSourceReader().getAssignedSplits()).isEmpty(); + } else { + assertThat(context.getSourceReader().getAssignedSplits()) + .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); + } + + // The source reader should have started. + assertThat(context.getSourceReader().isStarted()).isTrue(); + + // A ReaderRegistrationRequest should have been sent. + assertThat(context.getGateway().getEventsSent()).hasSize(1); + OperatorEvent operatorEvent = context.getGateway().getEventsSent().get(0); + assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class); + ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent) operatorEvent; + assertThat(registrationEvent.subtaskId()) + .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX); + if (supportsSplitReassignmentOnRecovery) { + assertThat(registrationEvent.splits(new MockSourceSplitSerializer())) + .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); + } else { + assertThat(registrationEvent.splits(new MockSourceSplitSerializer())).isEmpty(); + } } } } - @Test + @TestTemplate void testStop() throws Exception { // Initialize the operator. operator.initializeState(context.createStateContext()); @@ -166,7 +185,7 @@ class SourceOperatorTest { assertThat(sourceStopped).isDone(); } - @Test + @TestTemplate void testHandleAddSplitsEvent() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); @@ -179,7 +198,7 @@ class SourceOperatorTest { .containsExactly(SourceOperatorTestContext.MOCK_SPLIT, newSplit); } - @Test + @TestTemplate void testHandleAddSourceEvent() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); @@ -189,7 +208,7 @@ class SourceOperatorTest { assertThat(mockSourceReader.getReceivedSourceEvents()).containsExactly(event); } - @Test + @TestTemplate void testSnapshotState() throws Exception { StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); @@ -206,7 +225,7 @@ class SourceOperatorTest { assertThat(splitsInState).containsExactly(SourceOperatorTestContext.MOCK_SPLIT, newSplit); } - @Test + @TestTemplate void testNotifyCheckpointComplete() throws Exception { StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); @@ -216,7 +235,7 @@ class SourceOperatorTest { assertThat(mockSourceReader.getCompletedCheckpoints().get(0)).isEqualTo(100L); } - @Test + @TestTemplate void testNotifyCheckpointAborted() throws Exception { StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); @@ -226,7 +245,65 @@ class SourceOperatorTest { assertThat(mockSourceReader.getAbortedCheckpoints().get(0)).isEqualTo(100L); } - @Test + @TestTemplate + public void testPausingUntilCheckpoint() throws Exception { + final List<StreamElement> out = new ArrayList<>(); + try (SourceOperatorTestContext context = + new SourceOperatorTestContext( + false, + false, + WatermarkStrategy.<Integer>forMonotonousTimestamps() + .withTimestampAssigner((element, recordTimestamp) -> element), + new CollectorOutput<>(out), + false, + pauseSourcesUntilCheckpoint, + // recover with some state, so the source will pause until a checkpoint + // to speedup recovery (if pauseSourcesUntilCheckpoint) + (stateManager, operatorID) -> { + long checkpointID = 1L; + stateManager.setReportedCheckpointId(checkpointID); + stateManager.setJobManagerTaskStateSnapshotsByCheckpointId( + singletonMap( + checkpointID, + new TaskStateSnapshot( + singletonMap( + operatorID, + OperatorSubtaskState.builder() + .build())))); + })) { + + final SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); + operator.open(); + + MockSourceSplit split = new MockSourceSplit(0); + split.addRecord(0); + operator.handleOperatorEvent( + new AddSplitEvent<>( + Collections.singletonList(split), new MockSourceSplitSerializer())); + + operator.emitNext(new DataOutputToOutput<>(operator.output)); + + if (pauseSourcesUntilCheckpoint) { + assertThat(out).isEmpty(); + assertThat(operator.isAvailable()).isFalse(); + // un-pause + operator.snapshotState( + 2L, + 2L, + CheckpointOptions.alignedNoTimeout( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault()), + new MemCheckpointStreamFactory(10240)); + operator.emitNext(new DataOutputToOutput<>(operator.output)); + } + + assertThat(operator.isAvailable()).isTrue(); + assertThat(out.stream().map(element -> element.asRecord().getValue())) + .containsExactly(0); + } + } + + @TestTemplate void testHandleBacklogEvent() throws Exception { List<StreamElement> outputStreamElements = new ArrayList<>(); context = @@ -236,7 +313,8 @@ class SourceOperatorTest { WatermarkStrategy.<Integer>forMonotonousTimestamps() .withTimestampAssigner((element, recordTimestamp) -> element), new CollectorOutput<>(outputStreamElements), - false); + false, + pauseSourcesUntilCheckpoint); operator = context.getOperator(); operator.initializeState(context.createStateContext()); operator.open(); @@ -264,7 +342,7 @@ class SourceOperatorTest { new RecordAttributes(false)); } - @Test + @TestTemplate public void testMetricGroupIsCreatedForNewSplit() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); @@ -275,7 +353,7 @@ class SourceOperatorTest { assertNotNull(operator.getSplitMetricGroup(newSplit.splitId())); } - @Test + @TestTemplate public void testMetricGroupIsCreatedForRestoredSplit() throws Exception { MockSourceSplit restoredSplit = new MockSourceSplit((1)); StateInitializationContext stateContext = @@ -285,7 +363,7 @@ class SourceOperatorTest { assertNotNull(operator.getSplitMetricGroup(restoredSplit.splitId())); } - @Test + @TestTemplate public void testMetricGroupTracksSplitWatermark() throws Exception { long expectedWatermark = 1000; operator.initializeState(context.createStateContext()); @@ -300,7 +378,7 @@ class SourceOperatorTest { operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark()); } - @Test + @TestTemplate public void testMetricGroupReturnsDefaultIfNoSplitWatermark() throws Exception { long expectedWatermark = Watermark.UNINITIALIZED.getTimestamp(); operator.initializeState(context.createStateContext()); @@ -314,7 +392,7 @@ class SourceOperatorTest { operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark()); } - @Test + @TestTemplate public void testMultipleMetricGroupsReturnWatermarkOrDefaultWatermark() throws Exception { long expectedWatermarkValueForSplit0 = Watermark.UNINITIALIZED.getTimestamp(); long expectedWatermarkValueForSplit1 = 1000; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index 6e198512445..c0286a53405 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; @@ -50,6 +51,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.function.BiConsumer; import static org.apache.flink.util.Preconditions.checkState; @@ -64,18 +66,29 @@ public class SourceOperatorTestContext implements AutoCloseable { private MockOperatorEventGateway mockGateway; private TestProcessingTimeService timeService; private SourceOperator<Integer, MockSourceSplit> operator; + public Output<StreamRecord<Integer>> output; public SourceOperatorTestContext() throws Exception { - this(false); + this(false, false); } - public SourceOperatorTestContext(boolean idle) throws Exception { - this(idle, WatermarkStrategy.noWatermarks()); + public SourceOperatorTestContext(boolean idle, boolean pauseSourcesUntilFirstCheckpoint) + throws Exception { + this(idle, WatermarkStrategy.noWatermarks(), pauseSourcesUntilFirstCheckpoint); } - public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> watermarkStrategy) + public SourceOperatorTestContext( + boolean idle, + WatermarkStrategy<Integer> watermarkStrategy, + boolean pauseSourcesUntilFirstCheckpoint) throws Exception { - this(idle, false, watermarkStrategy, new MockOutput<>(new ArrayList<>()), false); + this( + idle, + false, + watermarkStrategy, + new MockOutput<>(new ArrayList<>()), + false, + pauseSourcesUntilFirstCheckpoint); } public SourceOperatorTestContext( @@ -85,6 +98,43 @@ public class SourceOperatorTestContext implements AutoCloseable { Output<StreamRecord<Integer>> output, boolean supportsSplitReassignmentOnRecovery) throws Exception { + this( + idle, + usePerSplitOutputs, + watermarkStrategy, + output, + supportsSplitReassignmentOnRecovery, + false, + (ign0, ign1) -> {}); + } + + public SourceOperatorTestContext( + boolean idle, + boolean usePerSplitOutputs, + WatermarkStrategy<Integer> watermarkStrategy, + Output<StreamRecord<Integer>> output, + boolean supportsSplitReassignmentOnRecovery, + boolean pauseSourcesUntilFirstCheckpoint) + throws Exception { + this( + idle, + usePerSplitOutputs, + watermarkStrategy, + output, + supportsSplitReassignmentOnRecovery, + pauseSourcesUntilFirstCheckpoint, + (ign0, ign1) -> {}); + } + + public SourceOperatorTestContext( + boolean idle, + boolean usePerSplitOutputs, + WatermarkStrategy<Integer> watermarkStrategy, + Output<StreamRecord<Integer>> output, + boolean supportsSplitReassignmentOnRecovery, + boolean pauseSourcesUntilFirstCheckpoint, + BiConsumer<TestTaskStateManager, OperatorID> preInit) + throws Exception { mockSourceReader = new MockSourceReader( idle @@ -94,6 +144,7 @@ public class SourceOperatorTestContext implements AutoCloseable { usePerSplitOutputs); mockGateway = new MockOperatorEventGateway(); timeService = new TestProcessingTimeService(); + this.output = output; Environment env = getTestingEnvironment(); operator = new TestingSourceOperator<>( @@ -111,7 +162,9 @@ public class SourceOperatorTestContext implements AutoCloseable { SUBTASK_INDEX, 5, true, - supportsSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery, + pauseSourcesUntilFirstCheckpoint); + preInit.accept((TestTaskStateManager) env.getTaskStateManager(), operator.getOperatorID()); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index dc8ebc9806e..fed65a42ccc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -76,7 +76,8 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> 1, 5, emitProgressiveWatermarks, - supportsSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery, + false); } public TestingSourceOperator( @@ -88,7 +89,8 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> int subtaskIndex, int parallelism, boolean emitProgressiveWatermarks, - boolean supportsSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery, + boolean pauseSourcesUntilFirstCheckpoint) { super( parameters, @@ -102,7 +104,8 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> emitProgressiveWatermarks, () -> false, Collections.emptyMap(), - supportsSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery, + pauseSourcesUntilFirstCheckpoint); this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java index e12dcfa8361..9ed9f92e3c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockOutput.java @@ -83,4 +83,8 @@ public class MockOutput<T> implements Output<StreamRecord<T>> { @Override public void close() {} + + public Collection<T> getOutputs() { + return outputs; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java index 0929f3940e7..4e733481536 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java @@ -167,7 +167,7 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable { cleanUp(); } - public void cleanUp() throws Exception { + public void cleanUp() throws Exception { streamTask.cleanUp(null); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index 2ef37eddd13..953df0e08a5 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -51,9 +51,13 @@ import org.apache.flink.util.function.TriFunction; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.Serializable; import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -70,11 +74,19 @@ import static org.junit.Assert.assertEquals; * A test suite for source enumerator (operator coordinator) for situations where RPC calls for * split assignments (operator events) fails from time to time. */ +@RunWith(Parameterized.class) public class OperatorEventSendingCheckpointITCase extends TestLogger { private static final int PARALLELISM = 1; private static MiniCluster flinkCluster; + @Parameterized.Parameter public boolean pauseSources; + + @Parameterized.Parameters(name = "pauseSources: {0}") + public static Collection<Boolean> specs() { + return Arrays.asList(Boolean.TRUE, Boolean.FALSE); + } + @BeforeClass public static void setupMiniClusterAndEnv() throws Exception { Configuration config = new Configuration(); @@ -198,6 +210,7 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(50); + env.getCheckpointConfig().setPauseSourcesUntilFirstCheckpoint(pauseSources); // This test depends on checkpoints persisting progress from the source before the // artificial exception gets triggered. Otherwise, the job will run for a long time (or @@ -217,7 +230,14 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { final DataStream<Long> numbers = env.fromSource( - new NumberSequenceSourceWithWaitForCheckpoint(1L, numElements, 3), + new NumberSequenceSourceWithWaitForCheckpoint( + 1L, + numElements, + 3, + env.getCheckpointConfig() + .isPauseSourcesUntilFirstCheckpoint() + ? 2 + : 1), WatermarkStrategy.noWatermarks(), "numbers") .map( diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java index b73caf5a156..91cf8782dfc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java @@ -102,10 +102,10 @@ public class CheckpointIntervalDuringBacklogITCase { Source<Long, ?, ?> source = HybridSource.builder( new NumberSequenceSourceWithWaitForCheckpoint( - 0, NUM_RECORDS / 2 - 1, NUM_SPLITS)) + 0, NUM_RECORDS / 2 - 1, NUM_SPLITS, 1)) .addSource( new NumberSequenceSourceWithWaitForCheckpoint( - NUM_RECORDS / 2, NUM_RECORDS - 1, NUM_SPLITS)) + NUM_RECORDS / 2, NUM_RECORDS - 1, NUM_SPLITS, 1)) .build(); runAndVerifyResult(env, source); @@ -127,7 +127,7 @@ public class CheckpointIntervalDuringBacklogITCase { Source<Long, ?, ?> source = HybridSource.builder( new NumberSequenceSourceWithWaitForCheckpoint( - 0, NUM_RECORDS / 2 - 1, NUM_SPLITS)) + 0, NUM_RECORDS / 2 - 1, NUM_SPLITS, 1)) .addSource(new NumberSequenceSource(NUM_RECORDS / 2, NUM_RECORDS - 1)) .build(); @@ -152,6 +152,7 @@ public class CheckpointIntervalDuringBacklogITCase { configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, firstCheckpointTime); configuration.set( CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ofMillis(0)); + configuration.set(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT, false); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); @@ -174,6 +175,7 @@ public class CheckpointIntervalDuringBacklogITCase { configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(100)); configuration.set( CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ofMillis(0)); + configuration.set(CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT, false); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); @@ -189,7 +191,7 @@ public class CheckpointIntervalDuringBacklogITCase { DataStream<Long> source = env.fromSource( new NumberSequenceSourceWithWaitForCheckpoint( - 2, NUM_RECORDS - 1, NUM_SPLITS), + 2, NUM_RECORDS - 1, NUM_SPLITS, 1), WatermarkStrategy.noWatermarks(), "non-backlog-source"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java index 1cb92405958..b16ff4e476e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java @@ -127,7 +127,8 @@ public class UnalignedCheckpointFailureHandlingITCase { private void configure(StreamExecutionEnvironment env, String storageFactory) { // enable checkpointing but only via API - env.enableCheckpointing(Long.MAX_VALUE, CheckpointingMode.EXACTLY_ONCE); + // use big enough interval so a manual checkpoint can be performed + env.enableCheckpointing(100000, CheckpointingMode.EXACTLY_ONCE); CheckpointStorageUtils.configureCheckpointStorageWithFactory(env, storageFactory); diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java b/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java index 60455c094bc..3a361bb4dd0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.java @@ -43,14 +43,17 @@ import java.util.Queue; */ public class NumberSequenceSourceWithWaitForCheckpoint extends NumberSequenceSource { private static final long serialVersionUID = 1L; + private final int numCheckpointsToWait; private final int numSplits; private final long numAllowedMessageBeforeCheckpoint; - public NumberSequenceSourceWithWaitForCheckpoint(long from, long to, int numSplits) { + public NumberSequenceSourceWithWaitForCheckpoint( + long from, long to, int numSplits, int numCheckpointsToWait) { super(from, to); this.numSplits = numSplits; this.numAllowedMessageBeforeCheckpoint = (to - from) / numSplits; + this.numCheckpointsToWait = numCheckpointsToWait; } @Override @@ -63,7 +66,7 @@ public class NumberSequenceSourceWithWaitForCheckpoint extends NumberSequenceSou @Override public SourceReader<Long, NumberSequenceSplit> createReader(SourceReaderContext readerContext) { return new CheckpointListeningIteratorSourceReader<>( - readerContext, numAllowedMessageBeforeCheckpoint); + readerContext, numAllowedMessageBeforeCheckpoint, numCheckpointsToWait); } /** @@ -112,19 +115,24 @@ public class NumberSequenceSourceWithWaitForCheckpoint extends NumberSequenceSou private static class CheckpointListeningIteratorSourceReader< E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> extends IteratorSourceReader<E, IterT, SplitT> { - private boolean checkpointed = false; + private int completedCheckpoints = 0; private long messagesProduced = 0; private final long numAllowedMessageBeforeCheckpoint; + private final int numCheckpointsToWait; public CheckpointListeningIteratorSourceReader( - SourceReaderContext context, long waitForCheckpointAfterMessages) { + SourceReaderContext context, + long waitForCheckpointAfterMessages, + int numCheckpointsToWait) { super(context); this.numAllowedMessageBeforeCheckpoint = waitForCheckpointAfterMessages; + this.numCheckpointsToWait = numCheckpointsToWait; } @Override public InputStatus pollNext(ReaderOutput<E> output) { - if (messagesProduced < numAllowedMessageBeforeCheckpoint || checkpointed) { + if (messagesProduced < numAllowedMessageBeforeCheckpoint + || completedCheckpoints >= numCheckpointsToWait) { messagesProduced++; return super.pollNext(output); } else { @@ -134,7 +142,7 @@ public class NumberSequenceSourceWithWaitForCheckpoint extends NumberSequenceSou @Override public void notifyCheckpointComplete(long checkpointId) { - checkpointed = true; + completedCheckpoints++; } } }
