This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new bc9f0fc  [FLINK-16753][checkpointing] Use CheckpointException to wrap 
exceptions thrown from AsyncCheckpointRunnable (#14072)
bc9f0fc is described below

commit bc9f0fc7d6a02211e1e30ecc425d2dbfe4a0bcb5
Author: Jiayi Liao <buptliaoji...@gmail.com>
AuthorDate: Mon Nov 16 17:20:56 2020 +0800

    [FLINK-16753][checkpointing] Use CheckpointException to wrap exceptions 
thrown from AsyncCheckpointRunnable (#14072)
---
 .../checkpoint/CheckpointFailureManager.java       |   1 +
 .../checkpoint/CheckpointFailureReason.java        |   2 +
 .../runtime/tasks/AsyncCheckpointRunnable.java     |   6 +-
 .../runtime/tasks/AsyncCheckpointRunnableTest.java | 112 +++++++++++++++++++++
 4 files changed, 120 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index fbf5d98..ca8f42d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -117,6 +117,7 @@ public class CheckpointFailureManager {
                        case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
 
                        case EXCEPTION:
+                       case CHECKPOINT_ASYNC_EXCEPTION:
                        case TASK_FAILURE:
                        case TASK_CHECKPOINT_FAILURE:
                        case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index cd787d0..e20e57f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -36,6 +36,8 @@ public enum CheckpointFailureReason {
 
        EXCEPTION(true, "An Exception occurred while triggering the 
checkpoint."),
 
+       CHECKPOINT_ASYNC_EXCEPTION(false, "Asynchronous task checkpoint 
failed."),
+
        CHECKPOINT_EXPIRED(false, "Checkpoint expired before completing."),
 
        CHECKPOINT_SUBSUMED(false, "Checkpoint has been subsumed."),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index af10411..e5aeb68 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -195,7 +197,9 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                                // We only report the exception for the 
original cause of fail and cleanup.
                                // Otherwise this followup exception could race 
the original exception in failing the task.
                                try {
-                                       
taskEnvironment.declineCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointException);
+                                       taskEnvironment.declineCheckpoint(
+                                                       
checkpointMetaData.getCheckpointId(),
+                                                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION, 
checkpointException));
                                } catch (Exception unhandled) {
                                        AsynchronousException asyncException = 
new AsynchronousException(unhandled);
                                        
asyncExceptionHandler.handleAsyncException("Failure in asynchronous checkpoint 
materialization", asyncException);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
new file mode 100644
index 0000000..30e8e80
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link AsyncCheckpointRunnable}.
+ */
+public class AsyncCheckpointRunnableTest {
+
+       @Test
+       public void testAsyncCheckpointException() {
+               final Map<OperatorID, OperatorSnapshotFutures> 
snapshotsInProgress = new HashMap<>();
+               snapshotsInProgress.put(
+                               new OperatorID(),
+                               new OperatorSnapshotFutures(
+                                               ExceptionallyDoneFuture.of(new 
RuntimeException("Async Checkpoint Exception")),
+                                               
DoneFuture.of(SnapshotResult.empty()),
+                                               
DoneFuture.of(SnapshotResult.empty()),
+                                               
DoneFuture.of(SnapshotResult.empty()),
+                                               
DoneFuture.of(SnapshotResult.empty()),
+                                               
DoneFuture.of(SnapshotResult.empty())));
+
+               final TestEnvironment environment = new TestEnvironment();
+               final AsyncCheckpointRunnable runnable = new 
AsyncCheckpointRunnable(
+                               snapshotsInProgress,
+                               new CheckpointMetaData(1, 1L),
+                               new CheckpointMetrics(),
+                               1L,
+                               "Task Name",
+                               r -> {},
+                               r -> {},
+                               environment,
+                               (msg, ex) -> {});
+               runnable.run();
+
+               Assert.assertTrue(environment.getCause() instanceof 
CheckpointException);
+               Assert.assertSame(((CheckpointException) environment.getCause())
+                               .getCheckpointFailureReason(), 
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
+       }
+
+       private static class TestEnvironment extends StreamMockEnvironment {
+
+               Throwable cause = null;
+
+               TestEnvironment() {
+                       this(
+                                       new Configuration(),
+                                       new Configuration(),
+                                       new ExecutionConfig(),
+                                       1L,
+                                       new MockInputSplitProvider(),
+                                       1,
+                                       new TestTaskStateManager());
+               }
+
+               TestEnvironment(
+                               Configuration jobConfig,
+                               Configuration taskConfig,
+                               ExecutionConfig executionConfig,
+                               long memorySize,
+                               MockInputSplitProvider inputSplitProvider,
+                               int bufferSize,
+                               TaskStateManager taskStateManager) {
+                       super(jobConfig, taskConfig, executionConfig, 
memorySize, inputSplitProvider, bufferSize, taskStateManager);
+               }
+
+               @Override
+               public void declineCheckpoint(long checkpointId, Throwable 
cause) {
+                       this.cause = cause;
+               }
+
+               Throwable getCause() {
+                       return cause;
+               }
+       }
+}

Reply via email to