This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d8069249703bbe7858e0c6a044deb54ce75e3989 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Jun 3 14:25:01 2020 +0200 [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed Motivation: stop writing channel state ASAP if the checkpoint is subsumed Changes: 1. complete CheckpointBarrierUnaligner.ThreadSafeUnaligner#allBarriersReceivedFuture 2. abort channel state write on its erroneous completion 3. add cleanup parameter to ChannelStateWriter.abort to use cleanup=false in the call above --- .../checkpoint/channel/ChannelStateWriter.java | 4 +-- .../checkpoint/channel/ChannelStateWriterImpl.java | 6 +++-- .../channel/ChannelStateWriterImplTest.java | 2 +- .../checkpoint/channel/MockChannelStateWriter.java | 2 +- .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++++++++++----------- .../tasks/SubtaskCheckpointCoordinatorImpl.java | 15 +++++++---- .../tasks/TestSubtaskCheckpointCoordinator.java | 2 +- 7 files changed, 35 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 02a3a69..2112444 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -139,7 +139,7 @@ public interface ChannelStateWriter extends Closeable { * Aborts the checkpoint and fails pending result for this checkpoint. * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */ - void abort(long checkpointId, Throwable cause); + void abort(long checkpointId, Throwable cause, boolean cleanup); /** * Must be called after {@link #start(long, CheckpointOptions)} once. @@ -174,7 +174,7 @@ public interface ChannelStateWriter extends Closeable { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 8996b3b..3e18050 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -144,11 +144,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId); enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started - results.remove(checkpointId); + if (cleanup) { + results.remove(checkpointId); + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 0dae88e..d0193dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -290,7 +290,7 @@ public class ChannelStateWriterImplTest { } private void callAbort(ChannelStateWriter writer) { - writer.abort(CHECKPOINT_ID, new TestException()); + writer.abort(CHECKPOINT_ID, new TestException(), false); } private void callFinish(ChannelStateWriter writer) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java index 88bd334..7641d36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java @@ -106,7 +106,7 @@ public class MockChannelStateWriter implements ChannelStateWriter { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { checkCheckpointId(checkpointId); channelStateWriteResult.getInputChannelStateHandles().cancel(false); channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java index f978e72..114c03e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java @@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.IntStream; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED; import static org.apache.flink.util.CloseableIterator.ofElement; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -331,21 +332,21 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException { long barrierId = barrier.getId(); - if (!allBarriersReceivedFuture.isDone() && isCheckpointPending()) { - // we did not complete the current checkpoint, another started before - LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", - handler.taskName, - barrierId, - currentReceivedCheckpointId); - - // let the task know we are not completing this - long currentCheckpointId = currentReceivedCheckpointId; - handler.executeInTaskThread(() -> - handler.notifyAbort( - currentCheckpointId, - new CheckpointException("Barrier id: " + barrierId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)), - "notifyAbort"); + if (!allBarriersReceivedFuture.isDone()) { + CheckpointException exception = new CheckpointException("Barrier id: " + barrierId, CHECKPOINT_DECLINED_SUBSUMED); + if (isCheckpointPending()) { + // we did not complete the current checkpoint, another started before + LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", + handler.taskName, + barrierId, + currentReceivedCheckpointId); + + // let the task know we are not completing this + final long currentCheckpointId = currentReceivedCheckpointId; + handler.executeInTaskThread(() -> handler.notifyAbort(currentCheckpointId, exception), "notifyAbort"); + } + allBarriersReceivedFuture.completeExceptionally(exception); } currentReceivedCheckpointId = barrierId; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index adfaa44..cf8a21e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -186,7 +186,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { checkpointStorage.clearCacheFor(checkpointId); - channelStateWriter.abort(checkpointId, cause); + channelStateWriter.abort(checkpointId, cause, true); // notify the coordinator that we decline this checkpoint env.declineCheckpoint(checkpointId, cause); @@ -391,7 +391,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { CheckpointMetrics metrics, Exception ex) { - channelStateWriter.abort(metadata.getCheckpointId(), ex); + channelStateWriter.abort(metadata.getCheckpointId(), ex, true); for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { if (operatorSnapshotResult != null) { try { @@ -412,9 +412,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } private void prepareInflightDataSnapshot(long checkpointId) throws IOException { - prepareInputSnapshot.apply(channelStateWriter, checkpointId) - .thenAccept(unused -> channelStateWriter.finishInput(checkpointId)); - ResultPartitionWriter[] writers = env.getAllWriters(); for (ResultPartitionWriter writer : writers) { for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) { @@ -427,6 +424,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } } channelStateWriter.finishOutput(checkpointId); + prepareInputSnapshot.apply(channelStateWriter, checkpointId) + .whenComplete((unused, ex) -> { + if (ex != null) { + channelStateWriter.abort(checkpointId, ex, false /* result is needed and cleaned by getWriteResult */); + } else { + channelStateWriter.finishInput(checkpointId); + } + }); } private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java index 3897ab1..9d71a2d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java @@ -68,7 +68,7 @@ public class TestSubtaskCheckpointCoordinator implements SubtaskCheckpointCoordi @Override public void abortCheckpointOnBarrier(long checkpointId, Throwable cause, OperatorChain<?, ?> operatorChain) { - channelStateWriter.abort(checkpointId, cause); + channelStateWriter.abort(checkpointId, cause, true); } @Override