This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 88b309b7dca [FLINK-27570][runtime] Count checkpoint finalization 
failures in CheckpointFailureManager
88b309b7dca is described below

commit 88b309b7dcad269ad084eab5e2944724daf6dee4
Author: 鲍健昕 <1411643...@qq.com>
AuthorDate: Wed Jul 20 10:35:40 2022 +0800

    [FLINK-27570][runtime] Count checkpoint finalization failures in 
CheckpointFailureManager
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 12 ++--
 .../checkpoint/CheckpointFailureManager.java       |  3 +-
 .../runtime/checkpoint/DefaultCheckpointPlan.java  |  5 +-
 .../checkpoint/FinishedTaskStateProvider.java      | 23 ++++++-
 .../filesystem/FsCheckpointStorageAccess.java      | 10 ++-
 .../checkpoint/CheckpointFailureManagerTest.java   |  5 +-
 .../CheckpointFailureManagerITCase.java            | 79 ++++++++++++++++++++--
 7 files changed, 117 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 1051dbc6bc0..0f5033ac8ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.SavepointFormatType;
+import 
org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -1365,18 +1366,21 @@ public class CheckpointCoordinator {
         } catch (Exception e1) {
             // abort the current pending checkpoint if we fails to finalize 
the pending
             // checkpoint.
+            final CheckpointFailureReason failureReason =
+                    e1 instanceof PartialFinishingNotSupportedByStateException
+                            ? 
CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING
+                            : 
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE;
+
             if (!pendingCheckpoint.isDisposed()) {
                 abortPendingCheckpoint(
-                        pendingCheckpoint,
-                        new CheckpointException(
-                                
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
+                        pendingCheckpoint, new 
CheckpointException(failureReason, e1));
             }
 
             throw new CheckpointException(
                     "Could not finalize the pending checkpoint "
                             + pendingCheckpoint.getCheckpointID()
                             + '.',
-                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
+                    failureReason,
                     e1);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 08cf49e41ee..8db1fe307a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -238,8 +238,8 @@ public class CheckpointFailureManager {
             case TASK_FAILURE:
             case TASK_CHECKPOINT_FAILURE:
             case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
+                // there are some edge cases shouldn't be counted as a 
failure, e.g. shutdown
             case TRIGGER_CHECKPOINT_FAILURE:
-            case FINALIZE_CHECKPOINT_FAILURE:
                 // ignore
                 break;
 
@@ -247,6 +247,7 @@ public class CheckpointFailureManager {
             case CHECKPOINT_ASYNC_EXCEPTION:
             case CHECKPOINT_DECLINED:
             case CHECKPOINT_EXPIRED:
+            case FINALIZE_CHECKPOINT_FAILURE:
                 // we should make sure one checkpoint only be counted once
                 if (checkpointId == UNKNOWN_CHECKPOINT_ID
                         || countedCheckpointIds.add(checkpointId)) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
index eaa7a595e7d..9253799a17a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -152,7 +151,7 @@ public class DefaultCheckpointPlan implements 
CheckpointPlan {
             Map<OperatorID, OperatorState> operatorStates) {
         for (ExecutionJobVertex vertex : partlyFinishedVertex.values()) {
             if (hasUsedUnionListState(vertex, operatorStates)) {
-                throw new FlinkRuntimeException(
+                throw new PartialFinishingNotSupportedByStateException(
                         String.format(
                                 "The vertex %s (id = %s) has used"
                                         + " UnionListState, but part of its 
tasks are FINISHED.",
@@ -183,7 +182,7 @@ public class DefaultCheckpointPlan implements 
CheckpointPlan {
 
             if (entry.getValue() != vertex.getParallelism()
                     && hasUsedUnionListState(vertex, operatorStates)) {
-                throw new FlinkRuntimeException(
+                throw new PartialFinishingNotSupportedByStateException(
                         String.format(
                                 "The vertex %s (id = %s) has used"
                                         + " UnionListState, but part of its 
tasks has called operators' finish method.",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
index 167d3686042..4b17899b2ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Map;
 
@@ -33,5 +34,25 @@ public interface FinishedTaskStateProvider {
     void reportTaskHasFinishedOperators(ExecutionVertex task);
 
     /** Fulfills the state for the finished subtasks and operators to indicate 
they are finished. */
-    void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState> 
operatorStates);
+    void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState> 
operatorStates)
+            throws PartialFinishingNotSupportedByStateException;
+
+    /**
+     * Thrown when some subtasks of the operator have been finished but state 
doesn't support that
+     * (e.g. Union).
+     */
+    class PartialFinishingNotSupportedByStateException extends 
FlinkRuntimeException {
+
+        public PartialFinishingNotSupportedByStateException(String message) {
+            super(message);
+        }
+
+        public PartialFinishingNotSupportedByStateException(Throwable cause) {
+            super(cause);
+        }
+
+        public PartialFinishingNotSupportedByStateException(String message, 
Throwable cause) {
+            super(message, cause);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index af373242c64..11807c3de85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -113,8 +113,14 @@ public class FsCheckpointStorageAccess extends 
AbstractFsCheckpointStorageAccess
 
     @Override
     public void initializeBaseLocationsForCheckpoint() throws IOException {
-        fileSystem.mkdirs(sharedStateDirectory);
-        fileSystem.mkdirs(taskOwnedStateDirectory);
+        if (!fileSystem.mkdirs(sharedStateDirectory)) {
+            throw new IOException(
+                    "Failed to create directory for shared state: " + 
sharedStateDirectory);
+        }
+        if (!fileSystem.mkdirs(taskOwnedStateDirectory)) {
+            throw new IOException(
+                    "Failed to create directory for task owned state: " + 
taskOwnedStateDirectory);
+        }
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 8ed9b037ade..a2084c6616a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -119,8 +119,9 @@ public class CheckpointFailureManagerTest extends 
TestLogger {
                     checkpointProperties, new CheckpointException(reason), -2);
         }
 
-        // IO_EXCEPTION, CHECKPOINT_DECLINED, CHECKPOINT_EXPIRED and 
CHECKPOINT_ASYNC_EXCEPTION
-        assertEquals(4, callback.getInvokeCounter());
+        // IO_EXCEPTION, CHECKPOINT_DECLINED, FINALIZE_CHECKPOINT_FAILURE, 
CHECKPOINT_EXPIRED and
+        // CHECKPOINT_ASYNC_EXCEPTION
+        assertEquals(5, callback.getInvokeCounter());
     }
 
     @Test
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
index c23a4cdf420..27f6d78501f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,7 +29,12 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,7 +45,10 @@ import org.apache.flink.runtime.state.SnapshotExecutionType;
 import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import 
org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import 
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -60,13 +69,41 @@ import javax.annotation.Nonnull;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE;
+import static org.junit.Assert.fail;
+
 /** Tests to verify end-to-end logic of checkpoint failure manager. */
 public class CheckpointFailureManagerITCase extends TestLogger {
 
+    /**
+     * Test that checkpoint finalization failure is counted by {@link 
CheckpointFailureManager} and
+     * eventually fails the job. In this test, finalization is failed by 
throwing an exception from
+     * {@link 
org.apache.flink.runtime.state.CheckpointStorageLocation#createMetadataOutputStream}
+     * which should fail the job.
+     */
+    @Test
+    public void testFinalizationFailureCounted() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10);
+        env.getCheckpointConfig().setCheckpointStorage(new 
FailingFinalizationCheckpointStorage());
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new 
DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            TestUtils.submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+            fail("The job should fail");
+        } catch (JobExecutionException jobException) {
+            if (!isCheckpointFailure(jobException)) {
+                throw jobException;
+            }
+        }
+    }
+
     @ClassRule
     public static MiniClusterWithClientResource cluster =
             new MiniClusterWithClientResource(
@@ -85,12 +122,9 @@ public class CheckpointFailureManagerITCase extends 
TestLogger {
             TestUtils.submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
-            Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+            if (!isCheckpointFailure(jobException)) {
+                throw jobException;
+            }
         }
         // assert that the job only failed once.
         Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
@@ -202,4 +236,35 @@ public class CheckpointFailureManagerITCase extends 
TestLogger {
             return this;
         }
     }
+
+    private static class FailingFinalizationCheckpointStorage implements 
CheckpointStorage {
+        private static final long serialVersionUID = 8134582566514272546L;
+
+        @Override
+        public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer) {
+            return new TestCompletedCheckpointStorageLocation();
+        }
+
+        @Override
+        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) {
+            return new TestingCheckpointStorageAccessCoordinatorView() {
+                @Override
+                public CheckpointStorageLocation 
initializeLocationForCheckpoint(
+                        long checkpointId) {
+                    return new 
NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE) {
+                        @Override
+                        public CheckpointMetadataOutputStream 
createMetadataOutputStream() {
+                            throw new RuntimeException("finalization failure");
+                        }
+                    };
+                }
+            };
+        }
+    }
+
+    private boolean isCheckpointFailure(JobExecutionException jobException) {
+        return ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class)
+                .filter(ex -> 
ex.getMessage().equals(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE))
+                .isPresent();
+    }
 }

Reply via email to