This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7bb3ffa91a9916348d2f0a6a2e6cba4b109be56e Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Jun 3 21:43:56 2020 +0200 [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator Check (by task thread) whether the current checkpoint was already aborted in the following scenario: 1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start checkpointing (netty thread) 2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread) 3. task thread processes a mail to start checkpointing --- .../tasks/SubtaskCheckpointCoordinatorImpl.java | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index cf8a21e..5d7a2c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -44,7 +44,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; @@ -57,10 +56,12 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -183,6 +184,16 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @Override public void abortCheckpointOnBarrier(long checkpointId, Throwable cause, OperatorChain<?, ?> operatorChain) throws IOException { LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, taskName); + lastCheckpointId = Math.max(lastCheckpointId, checkpointId); + Iterator<Long> iterator = abortedCheckpointIds.iterator(); + while (iterator.hasNext()) { + long next = iterator.next(); + if (next < lastCheckpointId) { + iterator.remove(); + } else { + break; + } + } checkpointStorage.clearCacheFor(checkpointId); @@ -221,9 +232,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments + if (lastCheckpointId >= metadata.getCheckpointId()) { + LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); + channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); + checkAndClearAbortedStatus(metadata.getCheckpointId()); + return; + } + // Step (0): Record the last triggered checkpointId. - Preconditions.checkArgument(lastCheckpointId < metadata.getCheckpointId(), String.format( - "Unexpected current checkpoint-id: %s vs last checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId)); lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());