This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 37bcc5b5c7e [FLINK-28941][Runtime/Checkpointing] Add concurrent 
checkpoint support in Operator Coordinator
37bcc5b5c7e is described below

commit 37bcc5b5c7e5bb0b467f14b8effe978928211ef0
Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com>
AuthorDate: Tue Sep 6 21:36:59 2022 +0800

    [FLINK-28941][Runtime/Checkpointing] Add concurrent checkpoint support in 
Operator Coordinator
    
    This closes #20754.
---
 .../coordination/OperatorCoordinatorHolder.java    |  17 +-
 .../operators/coordination/SubtaskGatewayImpl.java |  99 +++++----
 .../CoordinatorEventsExactlyOnceITCase.java        |   4 +-
 .../OperatorCoordinatorHolderTest.java             |  20 +-
 .../coordination/SubtaskGatewayImplTest.java       |   4 +-
 ...ToStreamOperatorRecipientExactlyOnceITCase.java | 236 ++++++++++++++++++---
 6 files changed, 286 insertions(+), 94 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 17aa17a8a1e..7587795ba74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -58,12 +58,6 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <h3>Exactly-one Mechanism</h3>
  *
- * <p>This implementation can handle one checkpoint being triggered at a time. 
If another checkpoint
- * is triggered while the triggering of the first one was not completed or 
aborted, this class will
- * throw an exception. That is in line with the capabilities of the Checkpoint 
Coordinator, which
- * can handle multiple concurrent checkpoints on the TaskManagers, but only 
one concurrent
- * triggering phase.
- *
  * <p>The mechanism for exactly once semantics is as follows:
  *
  * <ul>
@@ -94,6 +88,11 @@ import static org.apache.flink.util.Preconditions.checkState;
  *       AcknowledgeCheckpointEvent}, it would be sent out immediately.
  * </ul>
  *
+ * <p>This implementation can handle concurrent checkpoints. In the behavior 
described above, If an
+ * event is generated after the coordinator has completed multiple 
checkpoints, and before it
+ * receives {@link AcknowledgeCheckpointEvent} about any of them, the event 
would be buffered until
+ * the coordinator has received {@link AcknowledgeCheckpointEvent} about all 
of these checkpoints.
+ *
  * <p><b>IMPORTANT:</b> A critical assumption is that all events from the 
scheduler to the Tasks are
  * transported strictly in order. Events being sent from the coordinator after 
the checkpoint
  * barrier was injected must not overtake the checkpoint barrier. This is 
currently guaranteed by
@@ -282,7 +281,7 @@ public class OperatorCoordinatorHolder
             mainThreadExecutor.assertRunningInMainThread();
         }
 
-        
subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint);
+        
subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint);
         context.resetFailed();
 
         // when initial savepoints are restored, this call comes before the 
mainThreadExecutor
@@ -401,7 +400,9 @@ public class OperatorCoordinatorHolder
                 () ->
                         subtaskGatewayMap
                                 .values()
-                                
.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint));
+                                .forEach(
+                                        SubtaskGatewayImpl
+                                                
::openGatewayAndUnmarkLastCheckpointIfAny));
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
index 86bf373a3f0..2fb2bffce3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
@@ -30,8 +30,10 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 
@@ -39,9 +41,12 @@ import java.util.concurrent.CompletableFuture;
  * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface 
that access to
  * subtasks for status and event sending via {@link SubtaskAccess}.
  *
- * <p>Instances of this class can be temporarily closed, blocking events from 
going through,
- * buffering them, and releasing them later. It is used for "alignment" of 
operator event streams
- * with checkpoint barrier injection, similar to how the input channels are 
aligned during a common
+ * <p>Instances of this class can be closed, blocking events from going 
through, buffering them, and
+ * releasing them later. If the instance is closed for a specific checkpoint, 
events arrived after
+ * that would be blocked temporarily, and released after the checkpoint 
finishes. If an event is
+ * blocked & buffered when there are multiple ongoing checkpoints, the event 
would be released after
+ * all these checkpoints finish. It is used for "alignment" of operator event 
streams with
+ * checkpoint barrier injection, similar to how the input channels are aligned 
during a common
  * checkpoint.
  *
  * <p>The methods on the critical communication path, including 
closing/reopening the gateway and
@@ -63,13 +68,13 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
 
     private final IncompleteFuturesTracker incompleteFuturesTracker;
 
-    private final List<BlockedEvent> blockedEvents;
+    private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap;
 
-    private long currentCheckpointId;
+    /** The ids of the checkpoints that have been marked but not unmarked yet. 
*/
+    private final TreeSet<Long> currentMarkedCheckpointIds;
 
-    private long lastCheckpointId;
-
-    private boolean isClosed;
+    /** The id of the latest checkpoint that has ever been marked. */
+    private long latestAttemptedCheckpointId;
 
     SubtaskGatewayImpl(
             SubtaskAccess subtaskAccess,
@@ -78,10 +83,9 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
         this.subtaskAccess = subtaskAccess;
         this.mainThreadExecutor = mainThreadExecutor;
         this.incompleteFuturesTracker = incompleteFuturesTracker;
-        this.blockedEvents = new ArrayList<>();
-        this.currentCheckpointId = NO_CHECKPOINT;
-        this.lastCheckpointId = Long.MIN_VALUE;
-        this.isClosed = false;
+        this.blockedEventsMap = new TreeMap<>();
+        this.currentMarkedCheckpointIds = new TreeSet<>();
+        this.latestAttemptedCheckpointId = NO_CHECKPOINT;
     }
 
     @Override
@@ -134,8 +138,8 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
             CompletableFuture<Acknowledge> result) {
         checkRunsInMainThread();
 
-        if (isClosed) {
-            blockedEvents.add(new BlockedEvent(sendAction, result));
+        if (!blockedEventsMap.isEmpty()) {
+            blockedEventsMap.lastEntry().getValue().add(new 
BlockedEvent(sendAction, result));
         } else {
             callSendAction(sendAction, result);
         }
@@ -180,21 +184,14 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
     void markForCheckpoint(long checkpointId) {
         checkRunsInMainThread();
 
-        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != 
checkpointId) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Cannot mark for checkpoint %d, already marked for 
checkpoint %d",
-                            checkpointId, currentCheckpointId));
-        }
-
-        if (checkpointId > lastCheckpointId) {
-            currentCheckpointId = checkpointId;
-            lastCheckpointId = checkpointId;
+        if (checkpointId > latestAttemptedCheckpointId) {
+            currentMarkedCheckpointIds.add(checkpointId);
+            latestAttemptedCheckpointId = checkpointId;
         } else {
             throw new IllegalStateException(
                     String.format(
                             "Regressing checkpoint IDs. Previous checkpointId 
= %d, new checkpointId = %d",
-                            lastCheckpointId, checkpointId));
+                            latestAttemptedCheckpointId, checkpointId));
         }
     }
 
@@ -207,8 +204,8 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
     boolean tryCloseGateway(long checkpointId) {
         checkRunsInMainThread();
 
-        if (checkpointId == currentCheckpointId) {
-            isClosed = true;
+        if (currentMarkedCheckpointIds.contains(checkpointId)) {
+            blockedEventsMap.putIfAbsent(checkpointId, new LinkedList<>());
             return true;
         }
 
@@ -221,38 +218,56 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
         // Gateways should always be marked and closed for a specific 
checkpoint before it can be
         // reopened for that checkpoint. If a gateway is to be opened for an 
unforeseen checkpoint,
         // exceptions should be thrown.
-        if (lastCheckpointId < checkpointId) {
+        if (latestAttemptedCheckpointId < checkpointId) {
             throw new IllegalStateException(
                     String.format(
-                            "Gateway closed for different checkpoint: closed 
for = %d, expected = %d",
-                            currentCheckpointId, checkpointId));
+                            "Trying to open gateway for unseen checkpoint: "
+                                    + "latest known checkpoint = %d, incoming 
checkpoint = %d",
+                            latestAttemptedCheckpointId, checkpointId));
         }
 
         // The message to open gateway with a specific checkpoint id might 
arrive after the
         // checkpoint has been aborted, or even after a new checkpoint has 
started. In these cases
         // this message should be ignored.
-        if (currentCheckpointId == NO_CHECKPOINT || checkpointId < 
lastCheckpointId) {
+        if (!currentMarkedCheckpointIds.contains(checkpointId)) {
             return;
         }
 
-        openGatewayAndUnmarkCheckpoint();
+        if (blockedEventsMap.containsKey(checkpointId)) {
+            if (blockedEventsMap.firstKey() == checkpointId) {
+                for (BlockedEvent blockedEvent : 
blockedEventsMap.firstEntry().getValue()) {
+                    callSendAction(blockedEvent.sendAction, 
blockedEvent.future);
+                }
+            } else {
+                blockedEventsMap
+                        .floorEntry(checkpointId - 1)
+                        .getValue()
+                        .addAll(blockedEventsMap.get(checkpointId));
+            }
+            blockedEventsMap.remove(checkpointId);
+        }
+
+        currentMarkedCheckpointIds.remove(checkpointId);
     }
 
     /** Opens the gateway, releasing all buffered events. */
-    void openGatewayAndUnmarkCheckpoint() {
+    void openGatewayAndUnmarkAllCheckpoint() {
         checkRunsInMainThread();
 
-        currentCheckpointId = NO_CHECKPOINT;
-        if (!isClosed) {
-            return;
+        for (List<BlockedEvent> blockedEvents : blockedEventsMap.values()) {
+            for (BlockedEvent blockedEvent : blockedEvents) {
+                callSendAction(blockedEvent.sendAction, blockedEvent.future);
+            }
         }
 
-        for (BlockedEvent blockedEvent : blockedEvents) {
-            callSendAction(blockedEvent.sendAction, blockedEvent.future);
-        }
-        blockedEvents.clear();
+        blockedEventsMap.clear();
+        currentMarkedCheckpointIds.clear();
+    }
 
-        isClosed = false;
+    void openGatewayAndUnmarkLastCheckpointIfAny() {
+        if (!currentMarkedCheckpointIds.isEmpty()) {
+            openGatewayAndUnmarkCheckpoint(currentMarkedCheckpointIds.last());
+        }
     }
 
     private void checkRunsInMainThread() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 06b2466be93..020021c6b40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -312,7 +312,7 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
         protected int nextNumber;
 
         protected CompletableFuture<byte[]> nextToComplete;
-        private CompletableFuture<byte[]> requestedCheckpoint;
+        protected CompletableFuture<byte[]> requestedCheckpoint;
 
         private SubtaskGateway subtaskGateway;
         private boolean workLoopRunning;
@@ -683,7 +683,7 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
             return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new 
TestScript());
         }
 
-        static void reset() {
+        public static void reset() {
             MAP_FOR_OPERATOR.clear();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index e6082f31d6e..cdd46f103e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -133,7 +133,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
     }
 
     @Test
-    public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception 
{
+    public void acknowledgeCheckpointEventReleasesBlockedEvents() throws 
Exception {
         final EventReceivingTasks tasks = 
EventReceivingTasks.createForRunningTasks();
         final OperatorCoordinatorHolder holder =
                 createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
@@ -183,19 +183,23 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
     }
 
     @Test
-    public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
+    public void triggerConcurrentCheckpoints() throws Exception {
         final EventReceivingTasks tasks = 
EventReceivingTasks.createForRunningTasks();
         final OperatorCoordinatorHolder holder =
                 createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
 
-        holder.checkpointCoordinator(11L, new CompletableFuture<>());
+        triggerAndCompleteCheckpoint(holder, 1111L);
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(1337));
+        triggerAndCompleteCheckpoint(holder, 1112L);
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(1338));
+        assertThat(tasks.getSentEventsForSubtask(0)).isEmpty();
 
-        final CompletableFuture<byte[]> future = new CompletableFuture<>();
-        holder.checkpointCoordinator(12L, future);
+        holder.handleEventFromOperator(0, 0, new 
AcknowledgeCheckpointEvent(1111L));
+        assertThat(tasks.getSentEventsForSubtask(0)).containsExactly(new 
TestOperatorEvent(1337));
 
-        assertThat(future).isCompletedExceptionally();
-        assertThat(globalFailure).isNotNull();
-        globalFailure = null;
+        holder.handleEventFromOperator(0, 0, new 
AcknowledgeCheckpointEvent(1112L));
+        assertThat(tasks.getSentEventsForSubtask(0))
+                .containsExactly(new TestOperatorEvent(1337), new 
TestOperatorEvent(1338));
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
index 459d8c34099..ad15f61eec6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
@@ -139,7 +139,7 @@ public class SubtaskGatewayImplTest {
         final CompletableFuture<Acknowledge> future1 = 
gateway3.sendEvent(event1);
         final CompletableFuture<Acknowledge> future2 = 
gateway0.sendEvent(event2);
 
-        gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint);
+        
gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint);
 
         assertThat(receiver.events)
                 .containsExactly(new EventWithSubtask(event1, 3), new 
EventWithSubtask(event2, 0));
@@ -161,7 +161,7 @@ public class SubtaskGatewayImplTest {
         gateway.tryCloseGateway(17L);
 
         final CompletableFuture<Acknowledge> future = gateway.sendEvent(new 
TestOperatorEvent());
-        gateway.openGatewayAndUnmarkCheckpoint();
+        gateway.openGatewayAndUnmarkAllCheckpoint();
 
         assertThat(future).isCompletedExceptionally();
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
index a21d9dea702..5af4f469140 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
@@ -114,7 +114,12 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
         env.setParallelism(1);
         env.enableCheckpointing(100);
         ManuallyClosedSourceFunction.shouldCloseSource = false;
-        
EventSendingCoordinatorWithGuaranteedCheckpoint.isEventSentAfterFirstCheckpoint 
= false;
+        EventReceivingOperator.shouldUnblockAllCheckpoint = false;
+        EventReceivingOperator.shouldUnblockNextCheckpoint = false;
+        EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint
+                        .isCheckpointAbortedBeforeScriptFailure =
+                false;
+        TestScript.reset();
     }
 
     @Test
@@ -155,6 +160,19 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
                 .isTrue();
     }
 
+    @Test
+    public void testConcurrentCheckpoint() throws Exception {
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
+        executeAndVerifyResults(
+                env,
+                new 
EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<>(
+                        "eventReceiving", NUM_EVENTS, DELAY));
+        assertThat(
+                        
EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint
+                                .isCheckpointAbortedBeforeScriptFailure)
+                .isFalse();
+    }
+
     private void executeAndVerifyResults(
             StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
             throws Exception {
@@ -162,10 +180,6 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
         // when checkpoint barriers are injected into sources, the event 
receiving operator has not
         // started checkpoint yet.
         env.addSource(new ManuallyClosedSourceFunction<>(), 
TypeInformation.of(Long.class))
-                .transform(
-                        "blockCheckpointBarrier",
-                        TypeInformation.of(Long.class),
-                        new BlockCheckpointBarrierOperator<>())
                 .disableChaining()
                 .transform(factory.name, TypeInformation.of(Long.class), 
factory)
                 .addSink(new DiscardingSink<>());
@@ -197,29 +211,6 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
         public void cancel() {}
     }
 
-    /**
-     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
-     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
-     * completed the first checkpoint while the subtask has not yet.
-     */
-    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
-            implements OneInputStreamOperator<T, T> {
-
-        @Override
-        public void processElement(StreamRecord<T> element) throws Exception {
-            output.collect(element);
-        }
-
-        @Override
-        public void snapshotState(StateSnapshotContext context) throws 
Exception {
-            super.snapshotState(context);
-            while (!EventSendingCoordinatorWithGuaranteedCheckpoint
-                    .isEventSentAfterFirstCheckpoint) {
-                Thread.sleep(100);
-            }
-        }
-    }
-
     /**
      * A wrapper operator factory for {@link 
EventSendingCoordinatorWithGuaranteedCheckpoint} and
      * {@link EventReceivingOperator}.
@@ -232,7 +223,7 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
 
         protected final int numEvents;
 
-        private final int delay;
+        protected final int delay;
 
         public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
             this.name = name;
@@ -293,15 +284,15 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
     private static class EventSendingCoordinatorWithGuaranteedCheckpoint
             extends EventSendingCoordinator {
 
-        /** Whether the coordinator has sent any event to its subtask after 
any checkpoint. */
-        private static boolean isEventSentAfterFirstCheckpoint;
-
         /**
          * The max number that the coordinator might send out before it 
completes the first
          * checkpoint.
          */
         private final int maxNumberBeforeFirstCheckpoint;
 
+        /** Whether the coordinator has sent any event to its subtask after 
any checkpoint. */
+        private boolean isEventSentAfterFirstCheckpoint;
+
         /** Whether the coordinator has completed the first checkpoint. */
         private boolean isCoordinatorFirstCheckpointCompleted;
 
@@ -312,6 +303,7 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
                 Context context, String name, int numEvents, int delay) {
             super(context, name, numEvents, delay);
             this.maxNumberBeforeFirstCheckpoint = new 
Random().nextInt(numEvents / 6);
+            this.isEventSentAfterFirstCheckpoint = false;
             this.isCoordinatorFirstCheckpointCompleted = false;
             this.isJobFirstCheckpointCompleted = false;
         }
@@ -331,6 +323,7 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
 
             if (!isEventSentAfterFirstCheckpoint && 
isCoordinatorFirstCheckpointCompleted) {
                 isEventSentAfterFirstCheckpoint = true;
+                EventReceivingOperator.shouldUnblockAllCheckpoint = true;
             }
         }
 
@@ -382,12 +375,23 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
     /**
      * The stream operator that receives the events and accumulates the 
numbers. The task is
      * stateful and checkpoints the accumulator.
+     *
+     * <p>The operator also supports blocking the checkpoint process until 
certain signal is invoked
+     * (See {@link #shouldUnblockAllCheckpoint} and {@link 
#shouldUnblockNextCheckpoint}). It helps
+     * to guarantee that there are events being sent when the coordinator has 
completed a checkpoint
+     * while the subtask has not yet.
      */
     private static class EventReceivingOperator<T> extends 
AbstractStreamOperator<T>
             implements OneInputStreamOperator<T, T>, OperatorEventHandler {
 
         protected static final String ACCUMULATOR_NAME = "receivedIntegers";
 
+        /** Whether to unblock all the following checkpoints. */
+        private static boolean shouldUnblockAllCheckpoint;
+
+        /** Whether to unblock the next checkpoint. */
+        private static boolean shouldUnblockNextCheckpoint;
+
         protected final ListAccumulator<Integer> accumulator = new 
ListAccumulator<>();
 
         protected ListState<Integer> state;
@@ -426,6 +430,14 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
         @Override
         public void snapshotState(StateSnapshotContext context) throws 
Exception {
             super.snapshotState(context);
+            while (!shouldUnblockAllCheckpoint && 
!shouldUnblockNextCheckpoint) {
+                Thread.sleep(100);
+            }
+
+            if (shouldUnblockNextCheckpoint) {
+                shouldUnblockNextCheckpoint = false;
+            }
+
             state.update(accumulator.getLocalValue());
         }
 
@@ -550,4 +562,164 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
                             getOperatorID(), new SerializedValue<>(new 
StartEvent(lastValue)));
         }
     }
+
+    /**
+     * A wrapper operator factory for {@link
+     * EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint} and {@link
+     * EventReceivingOperator}.
+     */
+    private static class 
EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<IN, OUT>
+            extends EventReceivingOperatorFactory<IN, OUT> {
+
+        public EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint(
+                String name, int numEvents, int delay) {
+            super(name, numEvents, delay);
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new 
EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint(
+                            context, name, numEvents, delay);
+                }
+            };
+        }
+    }
+
+    /**
+     * A subclass of {@link EventSendingCoordinator} that additionally 
guarantees the following
+     * behavior around checkpoint.
+     *
+     * <ul>
+     *   <li>The job must have completed two checkpoints before the 
coordinator injects the failure.
+     *   <li>The two checkpoints must have an overlapping period. i.e. The 
second checkpoint must
+     *       have started before the first checkpoint finishes.
+     *   <li>The failure must be injected after the coordinator has completed 
its second checkpoint
+     *       and before it completes the third.
+     *   <li>There must be events being sent when the coordinator has 
completed the second
+     *       checkpoint while the subtask has not.
+     * </ul>
+     *
+     * <p>In order for this class to work correctly, make sure to invoke {@link
+     * 
org.apache.flink.streaming.api.environment.CheckpointConfig#setMaxConcurrentCheckpoints(int)}
+     * method with a parameter value larger than 1.
+     */
+    private static class 
EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint
+            extends EventSendingCoordinator {
+
+        /** Whether there is a checkpoint aborted before the test script 
failure is triggered. */
+        private static boolean isCheckpointAbortedBeforeScriptFailure;
+
+        /**
+         * The max number that the coordinator might send out before it 
completes the second
+         * checkpoint.
+         */
+        private final int maxNumberBeforeSecondCheckpoint;
+
+        /** Whether the coordinator has sent out any event after the second 
checkpoint. */
+        private boolean isEventSentAfterSecondCheckpoint;
+
+        /** Whether the coordinator has completed the first checkpoint. */
+        private boolean isCoordinatorFirstCheckpointCompleted;
+
+        /** Whether the job (both coordinator and operator) has completed the 
first checkpoint. */
+        private boolean isJobFirstCheckpointCompleted;
+
+        /** Whether the coordinator has completed the second checkpoint. */
+        private boolean isCoordinatorSecondCheckpointCompleted;
+
+        /** Whether the job (both coordinator and operator) has completed the 
second checkpoint. */
+        private boolean isJobSecondCheckpointCompleted;
+
+        public EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint(
+                Context context, String name, int numEvents, int delay) {
+            super(context, name, numEvents, delay);
+            this.maxNumberBeforeSecondCheckpoint = new 
Random().nextInt(numEvents / 6);
+            this.isEventSentAfterSecondCheckpoint = false;
+            this.isCoordinatorFirstCheckpointCompleted = false;
+            this.isJobFirstCheckpointCompleted = false;
+            this.isCoordinatorSecondCheckpointCompleted = false;
+            this.isJobSecondCheckpointCompleted = false;
+        }
+
+        @Override
+        protected void sendNextEvent() {
+            if (!isCoordinatorSecondCheckpointCompleted
+                    && nextNumber > maxNumberBeforeSecondCheckpoint) {
+                return;
+            }
+
+            if (!isJobSecondCheckpointCompleted && nextNumber >= 
maxNumberBeforeFailure) {
+                return;
+            }
+
+            super.sendNextEvent();
+
+            if (!isEventSentAfterSecondCheckpoint && 
isCoordinatorSecondCheckpointCompleted) {
+                isEventSentAfterSecondCheckpoint = true;
+                EventReceivingOperator.shouldUnblockAllCheckpoint = true;
+            }
+        }
+
+        @Override
+        protected void handleCheckpoint() {
+            if (nextToComplete != null) {
+                if (!isCoordinatorFirstCheckpointCompleted) {
+                    isCoordinatorFirstCheckpointCompleted = true;
+                } else if (!isCoordinatorSecondCheckpointCompleted) {
+                    isCoordinatorSecondCheckpointCompleted = true;
+                    EventReceivingOperator.shouldUnblockNextCheckpoint = true;
+                }
+            }
+
+            super.handleCheckpoint();
+
+            if (nextToComplete != null
+                    && isEventSentAfterSecondCheckpoint
+                    && !testScript.hasAlreadyFailed()) {
+                testScript.recordHasFailed();
+                context.failJob(new Exception("test failure"));
+            }
+        }
+
+        @Override
+        public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+                throws Exception {
+            super.resetToCheckpoint(checkpointId, checkpointData);
+            runInMailbox(
+                    () -> {
+                        isCoordinatorFirstCheckpointCompleted = true;
+                        isJobFirstCheckpointCompleted = true;
+                    });
+        }
+
+        @Override
+        public void notifyCheckpointAborted(long checkpointId) {
+            if (!testScript.hasAlreadyFailed()) {
+                isCheckpointAbortedBeforeScriptFailure = true;
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            super.notifyCheckpointComplete(checkpointId);
+            runInMailbox(
+                    () -> {
+                        if (!isJobFirstCheckpointCompleted) {
+                            isJobFirstCheckpointCompleted = true;
+                        } else if (!isJobSecondCheckpointCompleted) {
+                            isJobSecondCheckpointCompleted = true;
+                        }
+                    });
+        }
+    }
 }

Reply via email to