This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new 37bcc5b5c7e [FLINK-28941][Runtime/Checkpointing] Add concurrent checkpoint support in Operator Coordinator 37bcc5b5c7e is described below commit 37bcc5b5c7e5bb0b467f14b8effe978928211ef0 Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com> AuthorDate: Tue Sep 6 21:36:59 2022 +0800 [FLINK-28941][Runtime/Checkpointing] Add concurrent checkpoint support in Operator Coordinator This closes #20754. --- .../coordination/OperatorCoordinatorHolder.java | 17 +- .../operators/coordination/SubtaskGatewayImpl.java | 99 +++++---- .../CoordinatorEventsExactlyOnceITCase.java | 4 +- .../OperatorCoordinatorHolderTest.java | 20 +- .../coordination/SubtaskGatewayImplTest.java | 4 +- ...ToStreamOperatorRecipientExactlyOnceITCase.java | 236 ++++++++++++++++++--- 6 files changed, 286 insertions(+), 94 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 17aa17a8a1e..7587795ba74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -58,12 +58,6 @@ import static org.apache.flink.util.Preconditions.checkState; * * <h3>Exactly-one Mechanism</h3> * - * <p>This implementation can handle one checkpoint being triggered at a time. If another checkpoint - * is triggered while the triggering of the first one was not completed or aborted, this class will - * throw an exception. That is in line with the capabilities of the Checkpoint Coordinator, which - * can handle multiple concurrent checkpoints on the TaskManagers, but only one concurrent - * triggering phase. - * * <p>The mechanism for exactly once semantics is as follows: * * <ul> @@ -94,6 +88,11 @@ import static org.apache.flink.util.Preconditions.checkState; * AcknowledgeCheckpointEvent}, it would be sent out immediately. * </ul> * + * <p>This implementation can handle concurrent checkpoints. In the behavior described above, If an + * event is generated after the coordinator has completed multiple checkpoints, and before it + * receives {@link AcknowledgeCheckpointEvent} about any of them, the event would be buffered until + * the coordinator has received {@link AcknowledgeCheckpointEvent} about all of these checkpoints. + * * <p><b>IMPORTANT:</b> A critical assumption is that all events from the scheduler to the Tasks are * transported strictly in order. Events being sent from the coordinator after the checkpoint * barrier was injected must not overtake the checkpoint barrier. This is currently guaranteed by @@ -282,7 +281,7 @@ public class OperatorCoordinatorHolder mainThreadExecutor.assertRunningInMainThread(); } - subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint); + subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint); context.resetFailed(); // when initial savepoints are restored, this call comes before the mainThreadExecutor @@ -401,7 +400,9 @@ public class OperatorCoordinatorHolder () -> subtaskGatewayMap .values() - .forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint)); + .forEach( + SubtaskGatewayImpl + ::openGatewayAndUnmarkLastCheckpointIfAny)); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java index 86bf373a3f0..2fb2bffce3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java @@ -30,8 +30,10 @@ import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; import java.io.IOException; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -39,9 +41,12 @@ import java.util.concurrent.CompletableFuture; * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface that access to * subtasks for status and event sending via {@link SubtaskAccess}. * - * <p>Instances of this class can be temporarily closed, blocking events from going through, - * buffering them, and releasing them later. It is used for "alignment" of operator event streams - * with checkpoint barrier injection, similar to how the input channels are aligned during a common + * <p>Instances of this class can be closed, blocking events from going through, buffering them, and + * releasing them later. If the instance is closed for a specific checkpoint, events arrived after + * that would be blocked temporarily, and released after the checkpoint finishes. If an event is + * blocked & buffered when there are multiple ongoing checkpoints, the event would be released after + * all these checkpoints finish. It is used for "alignment" of operator event streams with + * checkpoint barrier injection, similar to how the input channels are aligned during a common * checkpoint. * * <p>The methods on the critical communication path, including closing/reopening the gateway and @@ -63,13 +68,13 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { private final IncompleteFuturesTracker incompleteFuturesTracker; - private final List<BlockedEvent> blockedEvents; + private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap; - private long currentCheckpointId; + /** The ids of the checkpoints that have been marked but not unmarked yet. */ + private final TreeSet<Long> currentMarkedCheckpointIds; - private long lastCheckpointId; - - private boolean isClosed; + /** The id of the latest checkpoint that has ever been marked. */ + private long latestAttemptedCheckpointId; SubtaskGatewayImpl( SubtaskAccess subtaskAccess, @@ -78,10 +83,9 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { this.subtaskAccess = subtaskAccess; this.mainThreadExecutor = mainThreadExecutor; this.incompleteFuturesTracker = incompleteFuturesTracker; - this.blockedEvents = new ArrayList<>(); - this.currentCheckpointId = NO_CHECKPOINT; - this.lastCheckpointId = Long.MIN_VALUE; - this.isClosed = false; + this.blockedEventsMap = new TreeMap<>(); + this.currentMarkedCheckpointIds = new TreeSet<>(); + this.latestAttemptedCheckpointId = NO_CHECKPOINT; } @Override @@ -134,8 +138,8 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { CompletableFuture<Acknowledge> result) { checkRunsInMainThread(); - if (isClosed) { - blockedEvents.add(new BlockedEvent(sendAction, result)); + if (!blockedEventsMap.isEmpty()) { + blockedEventsMap.lastEntry().getValue().add(new BlockedEvent(sendAction, result)); } else { callSendAction(sendAction, result); } @@ -180,21 +184,14 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { void markForCheckpoint(long checkpointId) { checkRunsInMainThread(); - if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) { - throw new IllegalStateException( - String.format( - "Cannot mark for checkpoint %d, already marked for checkpoint %d", - checkpointId, currentCheckpointId)); - } - - if (checkpointId > lastCheckpointId) { - currentCheckpointId = checkpointId; - lastCheckpointId = checkpointId; + if (checkpointId > latestAttemptedCheckpointId) { + currentMarkedCheckpointIds.add(checkpointId); + latestAttemptedCheckpointId = checkpointId; } else { throw new IllegalStateException( String.format( "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", - lastCheckpointId, checkpointId)); + latestAttemptedCheckpointId, checkpointId)); } } @@ -207,8 +204,8 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { boolean tryCloseGateway(long checkpointId) { checkRunsInMainThread(); - if (checkpointId == currentCheckpointId) { - isClosed = true; + if (currentMarkedCheckpointIds.contains(checkpointId)) { + blockedEventsMap.putIfAbsent(checkpointId, new LinkedList<>()); return true; } @@ -221,38 +218,56 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway { // Gateways should always be marked and closed for a specific checkpoint before it can be // reopened for that checkpoint. If a gateway is to be opened for an unforeseen checkpoint, // exceptions should be thrown. - if (lastCheckpointId < checkpointId) { + if (latestAttemptedCheckpointId < checkpointId) { throw new IllegalStateException( String.format( - "Gateway closed for different checkpoint: closed for = %d, expected = %d", - currentCheckpointId, checkpointId)); + "Trying to open gateway for unseen checkpoint: " + + "latest known checkpoint = %d, incoming checkpoint = %d", + latestAttemptedCheckpointId, checkpointId)); } // The message to open gateway with a specific checkpoint id might arrive after the // checkpoint has been aborted, or even after a new checkpoint has started. In these cases // this message should be ignored. - if (currentCheckpointId == NO_CHECKPOINT || checkpointId < lastCheckpointId) { + if (!currentMarkedCheckpointIds.contains(checkpointId)) { return; } - openGatewayAndUnmarkCheckpoint(); + if (blockedEventsMap.containsKey(checkpointId)) { + if (blockedEventsMap.firstKey() == checkpointId) { + for (BlockedEvent blockedEvent : blockedEventsMap.firstEntry().getValue()) { + callSendAction(blockedEvent.sendAction, blockedEvent.future); + } + } else { + blockedEventsMap + .floorEntry(checkpointId - 1) + .getValue() + .addAll(blockedEventsMap.get(checkpointId)); + } + blockedEventsMap.remove(checkpointId); + } + + currentMarkedCheckpointIds.remove(checkpointId); } /** Opens the gateway, releasing all buffered events. */ - void openGatewayAndUnmarkCheckpoint() { + void openGatewayAndUnmarkAllCheckpoint() { checkRunsInMainThread(); - currentCheckpointId = NO_CHECKPOINT; - if (!isClosed) { - return; + for (List<BlockedEvent> blockedEvents : blockedEventsMap.values()) { + for (BlockedEvent blockedEvent : blockedEvents) { + callSendAction(blockedEvent.sendAction, blockedEvent.future); + } } - for (BlockedEvent blockedEvent : blockedEvents) { - callSendAction(blockedEvent.sendAction, blockedEvent.future); - } - blockedEvents.clear(); + blockedEventsMap.clear(); + currentMarkedCheckpointIds.clear(); + } - isClosed = false; + void openGatewayAndUnmarkLastCheckpointIfAny() { + if (!currentMarkedCheckpointIds.isEmpty()) { + openGatewayAndUnmarkCheckpoint(currentMarkedCheckpointIds.last()); + } } private void checkRunsInMainThread() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index 06b2466be93..020021c6b40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -312,7 +312,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { protected int nextNumber; protected CompletableFuture<byte[]> nextToComplete; - private CompletableFuture<byte[]> requestedCheckpoint; + protected CompletableFuture<byte[]> requestedCheckpoint; private SubtaskGateway subtaskGateway; private boolean workLoopRunning; @@ -683,7 +683,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new TestScript()); } - static void reset() { + public static void reset() { MAP_FOR_OPERATOR.clear(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index e6082f31d6e..cdd46f103e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -133,7 +133,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { } @Test - public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception { + public void acknowledgeCheckpointEventReleasesBlockedEvents() throws Exception { final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); final OperatorCoordinatorHolder holder = createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); @@ -183,19 +183,23 @@ public class OperatorCoordinatorHolderTest extends TestLogger { } @Test - public void triggeringFailsIfOtherTriggeringInProgress() throws Exception { + public void triggerConcurrentCheckpoints() throws Exception { final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); final OperatorCoordinatorHolder holder = createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); - holder.checkpointCoordinator(11L, new CompletableFuture<>()); + triggerAndCompleteCheckpoint(holder, 1111L); + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337)); + triggerAndCompleteCheckpoint(holder, 1112L); + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1338)); + assertThat(tasks.getSentEventsForSubtask(0)).isEmpty(); - final CompletableFuture<byte[]> future = new CompletableFuture<>(); - holder.checkpointCoordinator(12L, future); + holder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1111L)); + assertThat(tasks.getSentEventsForSubtask(0)).containsExactly(new TestOperatorEvent(1337)); - assertThat(future).isCompletedExceptionally(); - assertThat(globalFailure).isNotNull(); - globalFailure = null; + holder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1112L)); + assertThat(tasks.getSentEventsForSubtask(0)) + .containsExactly(new TestOperatorEvent(1337), new TestOperatorEvent(1338)); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java index 459d8c34099..ad15f61eec6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java @@ -139,7 +139,7 @@ public class SubtaskGatewayImplTest { final CompletableFuture<Acknowledge> future1 = gateway3.sendEvent(event1); final CompletableFuture<Acknowledge> future2 = gateway0.sendEvent(event2); - gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint); + gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint); assertThat(receiver.events) .containsExactly(new EventWithSubtask(event1, 3), new EventWithSubtask(event2, 0)); @@ -161,7 +161,7 @@ public class SubtaskGatewayImplTest { gateway.tryCloseGateway(17L); final CompletableFuture<Acknowledge> future = gateway.sendEvent(new TestOperatorEvent()); - gateway.openGatewayAndUnmarkCheckpoint(); + gateway.openGatewayAndUnmarkAllCheckpoint(); assertThat(future).isCompletedExceptionally(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java index a21d9dea702..5af4f469140 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java @@ -114,7 +114,12 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase env.setParallelism(1); env.enableCheckpointing(100); ManuallyClosedSourceFunction.shouldCloseSource = false; - EventSendingCoordinatorWithGuaranteedCheckpoint.isEventSentAfterFirstCheckpoint = false; + EventReceivingOperator.shouldUnblockAllCheckpoint = false; + EventReceivingOperator.shouldUnblockNextCheckpoint = false; + EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint + .isCheckpointAbortedBeforeScriptFailure = + false; + TestScript.reset(); } @Test @@ -155,6 +160,19 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase .isTrue(); } + @Test + public void testConcurrentCheckpoint() throws Exception { + env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); + executeAndVerifyResults( + env, + new EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<>( + "eventReceiving", NUM_EVENTS, DELAY)); + assertThat( + EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint + .isCheckpointAbortedBeforeScriptFailure) + .isFalse(); + } + private void executeAndVerifyResults( StreamExecutionEnvironment env, EventReceivingOperatorFactory<Long, Long> factory) throws Exception { @@ -162,10 +180,6 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase // when checkpoint barriers are injected into sources, the event receiving operator has not // started checkpoint yet. env.addSource(new ManuallyClosedSourceFunction<>(), TypeInformation.of(Long.class)) - .transform( - "blockCheckpointBarrier", - TypeInformation.of(Long.class), - new BlockCheckpointBarrierOperator<>()) .disableChaining() .transform(factory.name, TypeInformation.of(Long.class), factory) .addSink(new DiscardingSink<>()); @@ -197,29 +211,6 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase public void cancel() {} } - /** - * A stream operator that blocks the checkpoint barrier until the coordinator has sent events to - * its subtask. It helps to guarantee that there are events being sent when the coordinator has - * completed the first checkpoint while the subtask has not yet. - */ - private static class BlockCheckpointBarrierOperator<T> extends AbstractStreamOperator<T> - implements OneInputStreamOperator<T, T> { - - @Override - public void processElement(StreamRecord<T> element) throws Exception { - output.collect(element); - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - super.snapshotState(context); - while (!EventSendingCoordinatorWithGuaranteedCheckpoint - .isEventSentAfterFirstCheckpoint) { - Thread.sleep(100); - } - } - } - /** * A wrapper operator factory for {@link EventSendingCoordinatorWithGuaranteedCheckpoint} and * {@link EventReceivingOperator}. @@ -232,7 +223,7 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase protected final int numEvents; - private final int delay; + protected final int delay; public EventReceivingOperatorFactory(String name, int numEvents, int delay) { this.name = name; @@ -293,15 +284,15 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase private static class EventSendingCoordinatorWithGuaranteedCheckpoint extends EventSendingCoordinator { - /** Whether the coordinator has sent any event to its subtask after any checkpoint. */ - private static boolean isEventSentAfterFirstCheckpoint; - /** * The max number that the coordinator might send out before it completes the first * checkpoint. */ private final int maxNumberBeforeFirstCheckpoint; + /** Whether the coordinator has sent any event to its subtask after any checkpoint. */ + private boolean isEventSentAfterFirstCheckpoint; + /** Whether the coordinator has completed the first checkpoint. */ private boolean isCoordinatorFirstCheckpointCompleted; @@ -312,6 +303,7 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase Context context, String name, int numEvents, int delay) { super(context, name, numEvents, delay); this.maxNumberBeforeFirstCheckpoint = new Random().nextInt(numEvents / 6); + this.isEventSentAfterFirstCheckpoint = false; this.isCoordinatorFirstCheckpointCompleted = false; this.isJobFirstCheckpointCompleted = false; } @@ -331,6 +323,7 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase if (!isEventSentAfterFirstCheckpoint && isCoordinatorFirstCheckpointCompleted) { isEventSentAfterFirstCheckpoint = true; + EventReceivingOperator.shouldUnblockAllCheckpoint = true; } } @@ -382,12 +375,23 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase /** * The stream operator that receives the events and accumulates the numbers. The task is * stateful and checkpoints the accumulator. + * + * <p>The operator also supports blocking the checkpoint process until certain signal is invoked + * (See {@link #shouldUnblockAllCheckpoint} and {@link #shouldUnblockNextCheckpoint}). It helps + * to guarantee that there are events being sent when the coordinator has completed a checkpoint + * while the subtask has not yet. */ private static class EventReceivingOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T>, OperatorEventHandler { protected static final String ACCUMULATOR_NAME = "receivedIntegers"; + /** Whether to unblock all the following checkpoints. */ + private static boolean shouldUnblockAllCheckpoint; + + /** Whether to unblock the next checkpoint. */ + private static boolean shouldUnblockNextCheckpoint; + protected final ListAccumulator<Integer> accumulator = new ListAccumulator<>(); protected ListState<Integer> state; @@ -426,6 +430,14 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); + while (!shouldUnblockAllCheckpoint && !shouldUnblockNextCheckpoint) { + Thread.sleep(100); + } + + if (shouldUnblockNextCheckpoint) { + shouldUnblockNextCheckpoint = false; + } + state.update(accumulator.getLocalValue()); } @@ -550,4 +562,164 @@ public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase getOperatorID(), new SerializedValue<>(new StartEvent(lastValue))); } } + + /** + * A wrapper operator factory for {@link + * EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint} and {@link + * EventReceivingOperator}. + */ + private static class EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<IN, OUT> + extends EventReceivingOperatorFactory<IN, OUT> { + + public EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint( + String name, int numEvents, int delay) { + super(name, numEvents, delay); + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new OperatorCoordinator.Provider() { + + @Override + public OperatorID getOperatorId() { + return operatorID; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) { + return new EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint( + context, name, numEvents, delay); + } + }; + } + } + + /** + * A subclass of {@link EventSendingCoordinator} that additionally guarantees the following + * behavior around checkpoint. + * + * <ul> + * <li>The job must have completed two checkpoints before the coordinator injects the failure. + * <li>The two checkpoints must have an overlapping period. i.e. The second checkpoint must + * have started before the first checkpoint finishes. + * <li>The failure must be injected after the coordinator has completed its second checkpoint + * and before it completes the third. + * <li>There must be events being sent when the coordinator has completed the second + * checkpoint while the subtask has not. + * </ul> + * + * <p>In order for this class to work correctly, make sure to invoke {@link + * org.apache.flink.streaming.api.environment.CheckpointConfig#setMaxConcurrentCheckpoints(int)} + * method with a parameter value larger than 1. + */ + private static class EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint + extends EventSendingCoordinator { + + /** Whether there is a checkpoint aborted before the test script failure is triggered. */ + private static boolean isCheckpointAbortedBeforeScriptFailure; + + /** + * The max number that the coordinator might send out before it completes the second + * checkpoint. + */ + private final int maxNumberBeforeSecondCheckpoint; + + /** Whether the coordinator has sent out any event after the second checkpoint. */ + private boolean isEventSentAfterSecondCheckpoint; + + /** Whether the coordinator has completed the first checkpoint. */ + private boolean isCoordinatorFirstCheckpointCompleted; + + /** Whether the job (both coordinator and operator) has completed the first checkpoint. */ + private boolean isJobFirstCheckpointCompleted; + + /** Whether the coordinator has completed the second checkpoint. */ + private boolean isCoordinatorSecondCheckpointCompleted; + + /** Whether the job (both coordinator and operator) has completed the second checkpoint. */ + private boolean isJobSecondCheckpointCompleted; + + public EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint( + Context context, String name, int numEvents, int delay) { + super(context, name, numEvents, delay); + this.maxNumberBeforeSecondCheckpoint = new Random().nextInt(numEvents / 6); + this.isEventSentAfterSecondCheckpoint = false; + this.isCoordinatorFirstCheckpointCompleted = false; + this.isJobFirstCheckpointCompleted = false; + this.isCoordinatorSecondCheckpointCompleted = false; + this.isJobSecondCheckpointCompleted = false; + } + + @Override + protected void sendNextEvent() { + if (!isCoordinatorSecondCheckpointCompleted + && nextNumber > maxNumberBeforeSecondCheckpoint) { + return; + } + + if (!isJobSecondCheckpointCompleted && nextNumber >= maxNumberBeforeFailure) { + return; + } + + super.sendNextEvent(); + + if (!isEventSentAfterSecondCheckpoint && isCoordinatorSecondCheckpointCompleted) { + isEventSentAfterSecondCheckpoint = true; + EventReceivingOperator.shouldUnblockAllCheckpoint = true; + } + } + + @Override + protected void handleCheckpoint() { + if (nextToComplete != null) { + if (!isCoordinatorFirstCheckpointCompleted) { + isCoordinatorFirstCheckpointCompleted = true; + } else if (!isCoordinatorSecondCheckpointCompleted) { + isCoordinatorSecondCheckpointCompleted = true; + EventReceivingOperator.shouldUnblockNextCheckpoint = true; + } + } + + super.handleCheckpoint(); + + if (nextToComplete != null + && isEventSentAfterSecondCheckpoint + && !testScript.hasAlreadyFailed()) { + testScript.recordHasFailed(); + context.failJob(new Exception("test failure")); + } + } + + @Override + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) + throws Exception { + super.resetToCheckpoint(checkpointId, checkpointData); + runInMailbox( + () -> { + isCoordinatorFirstCheckpointCompleted = true; + isJobFirstCheckpointCompleted = true; + }); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + if (!testScript.hasAlreadyFailed()) { + isCheckpointAbortedBeforeScriptFailure = true; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + super.notifyCheckpointComplete(checkpointId); + runInMailbox( + () -> { + if (!isJobFirstCheckpointCompleted) { + isJobFirstCheckpointCompleted = true; + } else if (!isJobSecondCheckpointCompleted) { + isJobSecondCheckpointCompleted = true; + } + }); + } + } }