This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93025452714570a4d461519510375dd72af3a2c0 Author: Arvid Heise <[email protected]> AuthorDate: Thu Apr 3 09:32:00 2025 +0200 [FLINK-37605][runtime] Infer checkpoint id on endInput in sink So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times. With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is - higher than all checkpoint ids of the previous, successful checkpoints of this attempt - higher than the checkpoint id of the restored checkpoint - lower than any future checkpoint id. Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not [...] --- .../sink/compactor/operator/CompactorOperator.java | 13 +++-- .../api/connector/sink2/CommittableMessage.java | 12 ++++- .../api/connector/sink2/CommittableSummary.java | 4 +- .../connector/sink2/CommittableWithLineage.java | 2 +- .../runtime/operators/sink/CommitterOperator.java | 23 ++++---- .../runtime/operators/sink/SinkWriterOperator.java | 58 ++++---------------- .../CheckpointCommittableManagerImpl.java | 1 + .../sink/committables/CommittableCollector.java | 12 ----- .../sink/GlobalCommitterOperatorTest.java | 33 ------------ .../committables/CommittableCollectorTest.java | 20 ------- .../util/AbstractStreamOperatorTestHarness.java | 12 +++-- .../sink/SinkV2CommitterOperatorTest.java | 39 -------------- .../sink/SinkV2SinkWriterOperatorTest.java | 62 +++++++++++++++++++--- 13 files changed, 106 insertions(+), 185 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java index fa4e2b3d54b..ae2ffe7793a 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -107,6 +108,8 @@ public class CompactorOperator // submitted again while restoring private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; + public CompactorOperator( StreamOperatorParameters<CommittableMessage<FileSinkCommittable>> parameters, FileCompactStrategy strategy, @@ -139,15 +142,16 @@ public class CompactorOperator @Override public void endInput() throws Exception { // add collecting requests into the final snapshot - checkpointRequests.put(CommittableMessage.EOI, collectingRequests); + long checkpointId = lastKnownCheckpointId + 1; + checkpointRequests.put(checkpointId, collectingRequests); collectingRequests = new ArrayList<>(); // submit all requests and wait until they are done - submitUntil(CommittableMessage.EOI); + submitUntil(checkpointId); assert checkpointRequests.isEmpty(); getAllTasksFuture().join(); - emitCompacted(CommittableMessage.EOI); + emitCompacted(checkpointId); assert compactingRequests.isEmpty(); } @@ -225,6 +229,8 @@ public class CompactorOperator } private void emitCompacted(long checkpointId) throws Exception { + lastKnownCheckpointId = checkpointId; + List<FileSinkCommittable> compacted = new ArrayList<>(); Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter = compactingRequests.iterator(); @@ -252,7 +258,6 @@ public class CompactorOperator getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), checkpointId, compacted.size(), - compacted.size(), 0); output.collect(new StreamRecord<>(summary)); for (FileSinkCommittable c : compacted) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java index df77f658780..e0978e0ffcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java @@ -26,7 +26,10 @@ public interface CommittableMessage<CommT> { /** * Special value for checkpointId for the end of input in case of batch commit or final * checkpoint. + * + * @deprecated the special value is not used anymore at all (remove with Flink 2.2) */ + @Deprecated(forRemoval = true) long EOI = Long.MAX_VALUE; /** The subtask that created this committable. */ @@ -35,6 +38,13 @@ public interface CommittableMessage<CommT> { /** * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch * commit. + * + * @deprecated the special value EOI is not used anymore */ - long getCheckpointIdOrEOI(); + @Deprecated(forRemoval = true) + default long getCheckpointIdOrEOI() { + return getCheckpointId(); + } + + long getCheckpointId(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java index 252b10fadf4..7496013b046 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java @@ -42,7 +42,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { /** The number of committables coming from the given subtask in the particular checkpoint. */ private final int numberOfCommittables; - @Deprecated + @Deprecated(forRemoval = true) /** The number of committables that have not been successfully committed. */ private final int numberOfPendingCommittables; @@ -88,7 +88,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { return numberOfSubtasks; } - public long getCheckpointIdOrEOI() { + public long getCheckpointId() { return checkpointId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java index 819b4fdfc4f..6641a352885 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java @@ -50,7 +50,7 @@ public class CommittableWithLineage<CommT> implements CommittableMessage<CommT> return subtaskId; } - public long getCheckpointIdOrEOI() { + public long getCheckpointId() { return checkpointId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 9cd85c4001a..2f1cffcbcce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -51,7 +52,6 @@ import java.util.Collection; import java.util.Collections; import java.util.OptionalLong; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage private SinkCommitterMetricGroup metricGroup; private Committer<CommT> committer; private CommittableCollector<CommT> committableCollector; - private long lastCompletedCheckpointId = -1; + private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; private int maxRetries; - private boolean endInput = false; - /** The operator's state descriptor. */ private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>( @@ -134,11 +132,11 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), metricGroup)); - if (context.isRestored()) { + if (checkpointId.isPresent()) { committableCollectorState.get().forEach(cc -> committableCollector.merge(cc)); lastCompletedCheckpointId = checkpointId.getAsLong(); // try to re-commit recovered transactions as quickly as possible - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId); } } @@ -151,24 +149,23 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage @Override public void endInput() throws Exception { - endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId)); } - private void commitAndEmitCheckpoints() throws IOException, InterruptedException { - long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; + private void commitAndEmitCheckpoints(long checkpointId) + throws IOException, InterruptedException { + lastCompletedCheckpointId = checkpointId; for (CheckpointCommittableManager<CommT> checkpointManager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { // ensure that all committables of the first checkpoint are fully committed before // attempting the next committable commitAndEmit(checkpointManager); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index f5cfbd2aa6a..712d6541eec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema.Initializat import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -54,8 +53,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.shaded.guava33.com.google.common.collect.Lists; - import javax.annotation.Nullable; import java.io.IOException; @@ -64,6 +61,7 @@ import java.util.Collection; import java.util.List; import java.util.OptionalLong; +import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -93,13 +91,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Nullable private final SimpleVersionedSerializer<CommT> committableSerializer; private final List<CommT> legacyCommittables = new ArrayList<>(); - /** - * Used to remember that EOI has already happened so that we don't emit the last committables of - * the final checkpoints twice. - */ - private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC = - new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); - /** The runtime information of the input element. */ private final Context<InputT> context; @@ -118,10 +109,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab private boolean endOfInput = false; - /** - * Remembers the endOfInput state for (final) checkpoints iff the operator emits committables. - */ - @Nullable private ListState<Boolean> endOfInputState; + private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1; SinkWriterOperator( StreamOperatorParameters<CommittableMessage<CommT>> parameters, @@ -164,8 +152,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId()); - if (context.isRestored()) { + OptionalLong restoredCheckpointId = context.getRestoredCheckpointId(); + WriterInitContext initContext = createInitContext(restoredCheckpointId); + if (restoredCheckpointId.isPresent()) { + lastKnownCheckpointId = restoredCheckpointId.getAsLong(); if (committableSerializer != null) { final ListState<List<CommT>> legacyCommitterState = new SimpleVersionedListState<>( @@ -179,41 +169,12 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab } sinkWriter = writerStateHandler.createWriter(initContext, context); - - if (emitDownstream) { - // Figure out if we have seen end of input before and if we can suppress creating - // transactions and sending them downstream to the CommitterOperator. We have the - // following - // cases: - // 1. state is empty: - // - First time initialization - // - Restoring from a previous version of Flink that didn't handle EOI - // - Upscaled from a final or regular checkpoint - // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries - // that the CommitterOperator needs to handle. - // 2. state is not empty: - // - This implies Flink restores from a version that handles EOI. - // - If there is one entry, no rescaling happened (for this subtask), so if it's true, - // we recover from a final checkpoint (for this subtask) and can ignore another EOI - // else we have a regular checkpoint. - // - If there are multiple entries, Flink downscaled, and we need to check if all are - // true and do the same as above. As soon as one entry is false, we regularly start - // the writer and potentially emit duplicate summaries if we indeed recovered from a - // final checkpoint. - endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); - ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get()); - endOfInput = !previousState.isEmpty() && !previousState.contains(false); - } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); - if (endOfInputState != null) { - endOfInputState.clear(); - endOfInputState.add(this.endOfInput); - } } @Override @@ -243,17 +204,16 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void endInput() throws Exception { + LOG.info("Received endInput"); if (!endOfInput) { endOfInput = true; - if (endOfInputState != null) { - endOfInputState.add(true); - } sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + emitCommittables(lastKnownCheckpointId + 1); } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { + lastKnownCheckpointId = checkpointId; if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 6aa7401a00a..2118874d777 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -95,6 +95,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa summary.getSubtaskId(), checkpointId, metricGroup); + // Remove branch once CommittableMessage.EOI has been removed (earliest 2.2) if (checkpointId == CommittableMessage.EOI) { SubtaskCommittableManager<CommT> merged = subtasksCommittableManagers.merge( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index be832152ee7..96585a632d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Objects; -import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; @@ -49,8 +48,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class CommittableCollector<CommT> { - private static final long EOI = Long.MAX_VALUE; - /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */ private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables; @@ -144,15 +141,6 @@ public class CommittableCollector<CommT> { return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values()); } - /** - * Returns {@link CheckpointCommittableManager} belonging to the last input. - * - * @return {@link CheckpointCommittableManager} - */ - public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() { - return Optional.ofNullable(checkpointCommittables.get(EOI)); - } - /** * Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are * either committed or failed. diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java index a73dcc24d01..bf8da806b7b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class GlobalCommitterOperatorTest { @@ -140,38 +139,6 @@ class GlobalCommitterOperatorTest { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception { - final MockCommitter committer = new MockCommitter(); - final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = - createTestHarness(committer, commitOnInput); - testHarness.open(); - - final CommittableSummary<Integer> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<Integer> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - // commitOnInput implies that the global committer is not using notifyCheckpointComplete - if (commitOnInput) { - assertThat(committer.committed).containsExactly(1, 2); - } else { - assertThat(committer.committed).isEmpty(); - testHarness.notifyOfCompletedCheckpoint(EOI); - assertThat(committer.committed).containsExactly(1, 2); - } - - assertThat(testHarness.getOutput()).isEmpty(); - } - private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness( Committer<Integer> committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 6d311170d47..892b3785e25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -24,9 +24,6 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.junit.jupiter.api.Test; -import java.util.Optional; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class CommittableCollectorTest { @@ -44,22 +41,5 @@ class CommittableCollectorTest { committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0)); assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2); - - assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent(); - } - - @Test - void testGetEndOfInputCommittable() { - final CommittableCollector<Integer> committableCollector = - new CommittableCollector<>(METRIC_GROUP); - CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, EOI, 1, 0); - committableCollector.addMessage(first); - - Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - assertThat(endOfInputCommittable).isPresent(); - assertThat(endOfInputCommittable) - .get() - .returns(EOI, CheckpointCommittableManager::getCheckpointId); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index e600f666204..561ed1fc4dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -185,6 +185,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { private volatile boolean wasFailedExternally = false; + private long restoredCheckpointId = 0; + public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception { @@ -397,6 +399,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { return config; } + public void setRestoredCheckpointId(long restoredCheckpointId) { + this.restoredCheckpointId = restoredCheckpointId; + } + /** Get all the output from the task. This contains StreamRecords and Events interleaved. */ public ConcurrentLinkedQueue<Object> getOutput() { return outputList; @@ -614,16 +620,16 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { jmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), jmOperatorStateHandles); - taskStateManager.setReportedCheckpointId(0); + taskStateManager.setReportedCheckpointId(restoredCheckpointId); taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, jmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, jmTaskStateSnapshot)); if (tmOperatorStateHandles != null) { TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); tmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), tmOperatorStateHandles); taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, tmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, tmTaskStateSnapshot)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 950f76b75d5..94a22fb68bf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -41,7 +41,6 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; import java.util.function.IntSupplier; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -165,44 +164,6 @@ class SinkV2CommitterOperatorTest { testHarness.close(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - @Test void testStateRestore() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index 55fe2cbfae9..ba3f80d66fa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; @@ -69,7 +68,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -264,7 +262,7 @@ class SinkV2SinkWriterOperatorTest { testHarness.processElement(1, 1); testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); } } @@ -411,6 +409,54 @@ class SinkV2SinkWriterOperatorTest { } } + @Test + void testDoubleEndOfInput() throws Exception { + TestSinkV2<Integer> sink = + TestSinkV2.newBuilder() + .setWriter(new DefaultCommittingSinkWriter<Integer>()) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWriterState(true) + .build(); + + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Record<Integer>>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + testHarness.processElement(1, 1); + + testHarness.endInput(); + testHarness.prepareSnapshotPreBarrier(1); + snapshot = testHarness.snapshot(1, 1); + + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); + } + + final TestSinkV2<Integer> restoredSink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWriter(new DefaultStatefulSinkWriter<Integer>()) + .setWriterState(true) + .build(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink))) { + restoredTestHarness.setRestoredCheckpointId(1L); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + restoredTestHarness.processElement(2, 2); + + restoredTestHarness.endInput(); + restoredTestHarness.prepareSnapshotPreBarrier(3); + restoredTestHarness.snapshot(3, 1); + + // asserts the guessed checkpoint id which needs + assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L); + } + } + @Test void testInitContext() throws Exception { final AtomicReference<WriterInitContext> initContext = new AtomicReference<>(); @@ -459,12 +505,12 @@ class SinkV2SinkWriterOperatorTest { } private static void assertBasicOutput( - List<CommittableMessage<Integer>> output, int numberOfCommittables, long checkpointId) { - ListAssert<CommittableMessage<Integer>> records = + List<? extends CommittableMessage<?>> output, + int numberOfCommittables, + long checkpointId) { + ListAssert<? extends CommittableMessage<?>> records = assertThat(output).hasSize(numberOfCommittables + 1); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasOverallCommittables(numberOfCommittables); + records.element(0, as(committableSummary())).hasOverallCommittables(numberOfCommittables); records.filteredOn(r -> r instanceof CommittableWithLineage) .allSatisfy( cl ->
