This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new bc9f0fc [FLINK-16753][checkpointing] Use CheckpointException to wrap exceptions thrown from AsyncCheckpointRunnable (#14072) bc9f0fc is described below commit bc9f0fc7d6a02211e1e30ecc425d2dbfe4a0bcb5 Author: Jiayi Liao <buptliaoji...@gmail.com> AuthorDate: Mon Nov 16 17:20:56 2020 +0800 [FLINK-16753][checkpointing] Use CheckpointException to wrap exceptions thrown from AsyncCheckpointRunnable (#14072) --- .../checkpoint/CheckpointFailureManager.java | 1 + .../checkpoint/CheckpointFailureReason.java | 2 + .../runtime/tasks/AsyncCheckpointRunnable.java | 6 +- .../runtime/tasks/AsyncCheckpointRunnableTest.java | 112 +++++++++++++++++++++ 4 files changed, 120 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index fbf5d98..ca8f42d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -117,6 +117,7 @@ public class CheckpointFailureManager { case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM: case EXCEPTION: + case CHECKPOINT_ASYNC_EXCEPTION: case TASK_FAILURE: case TASK_CHECKPOINT_FAILURE: case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java index cd787d0..e20e57f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java @@ -36,6 +36,8 @@ public enum CheckpointFailureReason { EXCEPTION(true, "An Exception occurred while triggering the checkpoint."), + CHECKPOINT_ASYNC_EXCEPTION(false, "Asynchronous task checkpoint failed."), + CHECKPOINT_EXPIRED(false, "Checkpoint expired before completing."), CHECKPOINT_SUBSUMED(false, "Checkpoint has been subsumed."), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index af10411..e5aeb68 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; @@ -195,7 +197,9 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { // We only report the exception for the original cause of fail and cleanup. // Otherwise this followup exception could race the original exception in failing the task. try { - taskEnvironment.declineCheckpoint(checkpointMetaData.getCheckpointId(), checkpointException); + taskEnvironment.declineCheckpoint( + checkpointMetaData.getCheckpointId(), + new CheckpointException(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION, checkpointException)); } catch (Exception unhandled) { AsynchronousException asyncException = new AsynchronousException(unhandled); asyncExceptionHandler.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java new file mode 100644 index 0000000..30e8e80 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for {@link AsyncCheckpointRunnable}. + */ +public class AsyncCheckpointRunnableTest { + + @Test + public void testAsyncCheckpointException() { + final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>(); + snapshotsInProgress.put( + new OperatorID(), + new OperatorSnapshotFutures( + ExceptionallyDoneFuture.of(new RuntimeException("Async Checkpoint Exception")), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()))); + + final TestEnvironment environment = new TestEnvironment(); + final AsyncCheckpointRunnable runnable = new AsyncCheckpointRunnable( + snapshotsInProgress, + new CheckpointMetaData(1, 1L), + new CheckpointMetrics(), + 1L, + "Task Name", + r -> {}, + r -> {}, + environment, + (msg, ex) -> {}); + runnable.run(); + + Assert.assertTrue(environment.getCause() instanceof CheckpointException); + Assert.assertSame(((CheckpointException) environment.getCause()) + .getCheckpointFailureReason(), CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION); + } + + private static class TestEnvironment extends StreamMockEnvironment { + + Throwable cause = null; + + TestEnvironment() { + this( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1L, + new MockInputSplitProvider(), + 1, + new TestTaskStateManager()); + } + + TestEnvironment( + Configuration jobConfig, + Configuration taskConfig, + ExecutionConfig executionConfig, + long memorySize, + MockInputSplitProvider inputSplitProvider, + int bufferSize, + TaskStateManager taskStateManager) { + super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize, taskStateManager); + } + + @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + this.cause = cause; + } + + Throwable getCause() { + return cause; + } + } +}