This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8069249703bbe7858e0c6a044deb54ce75e3989
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Wed Jun 3 14:25:01 2020 +0200

    [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint 
is subsumed
    
    Motivation: stop writing channel state ASAP if the checkpoint is subsumed
    
    Changes:
    1. complete 
CheckpointBarrierUnaligner.ThreadSafeUnaligner#allBarriersReceivedFuture
    2. abort channel state write on its erroneous completion
    3. add cleanup parameter to ChannelStateWriter.abort to use cleanup=false
    in the call above
---
 .../checkpoint/channel/ChannelStateWriter.java     |  4 +--
 .../checkpoint/channel/ChannelStateWriterImpl.java |  6 +++--
 .../channel/ChannelStateWriterImplTest.java        |  2 +-
 .../checkpoint/channel/MockChannelStateWriter.java |  2 +-
 .../runtime/io/CheckpointBarrierUnaligner.java     | 31 +++++++++++-----------
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 15 +++++++----
 .../tasks/TestSubtaskCheckpointCoordinator.java    |  2 +-
 7 files changed, 35 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 02a3a69..2112444 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -139,7 +139,7 @@ public interface ChannelStateWriter extends Closeable {
         * Aborts the checkpoint and fails pending result for this checkpoint.
         * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not 
supposed to be called afterwards.
         */
-       void abort(long checkpointId, Throwable cause);
+       void abort(long checkpointId, Throwable cause, boolean cleanup);
 
        /**
         * Must be called after {@link #start(long, CheckpointOptions)} once.
@@ -174,7 +174,7 @@ public interface ChannelStateWriter extends Closeable {
                }
 
                @Override
-               public void abort(long checkpointId, Throwable cause) {
+               public void abort(long checkpointId, Throwable cause, boolean 
cleanup) {
                }
 
                @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 8996b3b..3e18050 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -144,11 +144,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
        }
 
        @Override
-       public void abort(long checkpointId, Throwable cause) {
+       public void abort(long checkpointId, Throwable cause, boolean cleanup) {
                LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
                enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), 
true); // abort already started
                enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), 
false); // abort enqueued but not started
-               results.remove(checkpointId);
+               if (cleanup) {
+                       results.remove(checkpointId);
+               }
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 0dae88e..d0193dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -290,7 +290,7 @@ public class ChannelStateWriterImplTest {
        }
 
        private void callAbort(ChannelStateWriter writer) {
-               writer.abort(CHECKPOINT_ID, new TestException());
+               writer.abort(CHECKPOINT_ID, new TestException(), false);
        }
 
        private void callFinish(ChannelStateWriter writer) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 88bd334..7641d36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -106,7 +106,7 @@ public class MockChannelStateWriter implements 
ChannelStateWriter {
        }
 
        @Override
-       public void abort(long checkpointId, Throwable cause) {
+       public void abort(long checkpointId, Throwable cause, boolean cleanup) {
                checkCheckpointId(checkpointId);
                
channelStateWriteResult.getInputChannelStateHandles().cancel(false);
                
channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index f978e72..114c03e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
 import static org.apache.flink.util.CloseableIterator.ofElement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -331,21 +332,21 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
 
                private synchronized void handleNewCheckpoint(CheckpointBarrier 
barrier) throws IOException {
                        long barrierId = barrier.getId();
-                       if (!allBarriersReceivedFuture.isDone() && 
isCheckpointPending()) {
-                               // we did not complete the current checkpoint, 
another started before
-                               LOG.warn("{}: Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.",
-                                       handler.taskName,
-                                       barrierId,
-                                       currentReceivedCheckpointId);
-
-                               // let the task know we are not completing this
-                               long currentCheckpointId = 
currentReceivedCheckpointId;
-                               handler.executeInTaskThread(() ->
-                                       handler.notifyAbort(
-                                               currentCheckpointId,
-                                               new 
CheckpointException("Barrier id: " + barrierId, 
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
-                                               "notifyAbort");
+                       if (!allBarriersReceivedFuture.isDone()) {
+                               CheckpointException exception = new 
CheckpointException("Barrier id: " + barrierId, CHECKPOINT_DECLINED_SUBSUMED);
+                               if (isCheckpointPending()) {
+                                       // we did not complete the current 
checkpoint, another started before
+                                       LOG.warn("{}: Received checkpoint 
barrier for checkpoint {} before completing current checkpoint {}. " +
+                                                       "Skipping current 
checkpoint.",
+                                               handler.taskName,
+                                               barrierId,
+                                               currentReceivedCheckpointId);
+
+                                       // let the task know we are not 
completing this
+                                       final long currentCheckpointId = 
currentReceivedCheckpointId;
+                                       handler.executeInTaskThread(() -> 
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+                               }
+                               
allBarriersReceivedFuture.completeExceptionally(exception);
                        }
 
                        currentReceivedCheckpointId = barrierId;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index adfaa44..cf8a21e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -186,7 +186,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 
                checkpointStorage.clearCacheFor(checkpointId);
 
-               channelStateWriter.abort(checkpointId, cause);
+               channelStateWriter.abort(checkpointId, cause, true);
 
                // notify the coordinator that we decline this checkpoint
                env.declineCheckpoint(checkpointId, cause);
@@ -391,7 +391,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        CheckpointMetrics metrics,
                        Exception ex) {
 
-               channelStateWriter.abort(metadata.getCheckpointId(), ex);
+               channelStateWriter.abort(metadata.getCheckpointId(), ex, true);
                for (OperatorSnapshotFutures operatorSnapshotResult : 
operatorSnapshotsInProgress.values()) {
                        if (operatorSnapshotResult != null) {
                                try {
@@ -412,9 +412,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
        }
 
        private void prepareInflightDataSnapshot(long checkpointId) throws 
IOException {
-               prepareInputSnapshot.apply(channelStateWriter, checkpointId)
-                       .thenAccept(unused -> 
channelStateWriter.finishInput(checkpointId));
-
                ResultPartitionWriter[] writers = env.getAllWriters();
                for (ResultPartitionWriter writer : writers) {
                        for (int i = 0; i < writer.getNumberOfSubpartitions(); 
i++) {
@@ -427,6 +424,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        }
                }
                channelStateWriter.finishOutput(checkpointId);
+               prepareInputSnapshot.apply(channelStateWriter, checkpointId)
+                       .whenComplete((unused, ex) -> {
+                               if (ex != null) {
+                                       channelStateWriter.abort(checkpointId, 
ex, false /* result is needed and cleaned by getWriteResult */);
+                               } else {
+                                       
channelStateWriter.finishInput(checkpointId);
+                               }
+                       });
        }
 
        private void finishAndReportAsync(Map<OperatorID, 
OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, 
CheckpointMetrics metrics, CheckpointOptions options) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
index 3897ab1..9d71a2d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
@@ -68,7 +68,7 @@ public class TestSubtaskCheckpointCoordinator implements 
SubtaskCheckpointCoordi
 
        @Override
        public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause, OperatorChain<?, ?> operatorChain) {
-               channelStateWriter.abort(checkpointId, cause);
+               channelStateWriter.abort(checkpointId, cause, true);
        }
 
        @Override

Reply via email to