yunfengzhou-hub commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r944382861


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -299,9 +364,78 @@ public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData
     }
 
     private void checkpointCoordinatorInternal(
-            final long checkpointId, final CompletableFuture<byte[]> result) {
+            long checkpointId, CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
+        try {
+            if (currentPendingCheckpointId != NO_CHECKPOINT
+                    && currentPendingCheckpointId != checkpointId) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Cannot checkpoint coordinator for checkpoint 
%d, "
+                                        + "since checkpoint %d has already 
started.",
+                                checkpointId, currentPendingCheckpointId));
+            }
+
+            if (latestAttemptedCheckpointId >= checkpointId) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Regressing checkpoint IDs. Previous 
checkpointId = %d, new checkpointId = %d",
+                                latestAttemptedCheckpointId, checkpointId));
+            }
+
+            subtaskGatewayMap.forEach(
+                    (subtask, gateway) -> 
gateway.markForCheckpoint(checkpointId));
+
+            
Preconditions.checkState(acknowledgeCloseGatewayFutureMap.isEmpty());
+        } catch (Throwable t) {
+            result.completeExceptionally(t);
+            globalFailureHandler.handleGlobalFailure(t);
+            return;
+        }
+
+        currentPendingCheckpointId = checkpointId;
+        latestAttemptedCheckpointId = checkpointId;
+
+        for (int subtask : subtaskGatewayMap.keySet()) {
+            acknowledgeCloseGatewayFutureMap.put(subtask, new 
CompletableFuture<>());
+            final OperatorEvent closeGatewayEvent = new 
CloseGatewayEvent(checkpointId, subtask);
+            subtaskGatewayMap
+                    .get(subtask)
+                    .sendEventWithCallBackOnCompletion(
+                            closeGatewayEvent,
+                            (success, failure) -> {
+                                if (failure != null) {
+                                    // If the close gateway event failed to 
reach the subtask for
+                                    // some reason, the coordinator would 
trigger a fail-over on
+                                    // the subtask if the subtask is still 
running. This behavior
+                                    // also guarantees that the coordinator 
won't receive more
+                                    // events from this subtask before the 
current checkpoint
+                                    // finishes, which is equivalent to 
receiving ACK from this
+                                    // subtask.
+                                    if (!(failure instanceof 
TaskNotRunningException)) {
+                                        boolean isFailoverTriggered =
+                                                subtaskGatewayMap
+                                                        .get(subtask)
+                                                        
.tryTriggerTaskFailover(

Review Comment:
   `Runnables.assertNoException` has only been used around 
`subtaskAccess.triggerTaskFailover`, instead of 
`SubtaskGatewayImpl.tryTriggerTaskFailover`. The previous usage of this method 
has been preserved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to