This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7bb3ffa91a9916348d2f0a6a2e6cba4b109be56e
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Wed Jun 3 21:43:56 2020 +0200

    [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in 
SubtaskCheckpointCoordinator
    
    Check (by task thread) whether the current checkpoint was already aborted 
in the following scenario:
    1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start 
checkpointing (netty thread)
    2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread)
    3. task thread processes a mail to start checkpointing
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index cf8a21e..5d7a2c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiFunctionWithException;
 
 import org.slf4j.Logger;
@@ -57,10 +56,12 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -183,6 +184,16 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
        @Override
        public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause, OperatorChain<?, ?> operatorChain) throws IOException {
                LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, taskName);
+               lastCheckpointId = Math.max(lastCheckpointId, checkpointId);
+               Iterator<Long> iterator = abortedCheckpointIds.iterator();
+               while (iterator.hasNext()) {
+                       long next = iterator.next();
+                       if (next < lastCheckpointId) {
+                               iterator.remove();
+                       } else {
+                               break;
+                       }
+               }
 
                checkpointStorage.clearCacheFor(checkpointId);
 
@@ -221,9 +232,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                // We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
                // checkpoint alignments
 
+               if (lastCheckpointId >= metadata.getCheckpointId()) {
+                       LOG.info("Out of order checkpoint barrier (aborted 
previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
+                       channelStateWriter.abort(metadata.getCheckpointId(), 
new CancellationException(), true);
+                       checkAndClearAbortedStatus(metadata.getCheckpointId());
+                       return;
+               }
+
                // Step (0): Record the last triggered checkpointId.
-               Preconditions.checkArgument(lastCheckpointId < 
metadata.getCheckpointId(), String.format(
-                       "Unexpected current checkpoint-id: %s vs last 
checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId));
                lastCheckpointId = metadata.getCheckpointId();
                if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
                        LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());

Reply via email to