This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 88b309b7dca [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager 88b309b7dca is described below commit 88b309b7dcad269ad084eab5e2944724daf6dee4 Author: 鲍健昕 <1411643...@qq.com> AuthorDate: Wed Jul 20 10:35:40 2022 +0800 [FLINK-27570][runtime] Count checkpoint finalization failures in CheckpointFailureManager --- .../runtime/checkpoint/CheckpointCoordinator.java | 12 ++-- .../checkpoint/CheckpointFailureManager.java | 3 +- .../runtime/checkpoint/DefaultCheckpointPlan.java | 5 +- .../checkpoint/FinishedTaskStateProvider.java | 23 ++++++- .../filesystem/FsCheckpointStorageAccess.java | 10 ++- .../checkpoint/CheckpointFailureManagerTest.java | 5 +- .../CheckpointFailureManagerITCase.java | 79 ++++++++++++++++++++-- 7 files changed, 117 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 1051dbc6bc0..0f5033ac8ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -1365,18 +1366,21 @@ public class CheckpointCoordinator { } catch (Exception e1) { // abort the current pending checkpoint if we fails to finalize the pending // checkpoint. + final CheckpointFailureReason failureReason = + e1 instanceof PartialFinishingNotSupportedByStateException + ? CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING + : CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE; + if (!pendingCheckpoint.isDisposed()) { abortPendingCheckpoint( - pendingCheckpoint, - new CheckpointException( - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1)); + pendingCheckpoint, new CheckpointException(failureReason, e1)); } throw new CheckpointException( "Could not finalize the pending checkpoint " + pendingCheckpoint.getCheckpointID() + '.', - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, + failureReason, e1); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 08cf49e41ee..8db1fe307a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -238,8 +238,8 @@ public class CheckpointFailureManager { case TASK_FAILURE: case TASK_CHECKPOINT_FAILURE: case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE: + // there are some edge cases shouldn't be counted as a failure, e.g. shutdown case TRIGGER_CHECKPOINT_FAILURE: - case FINALIZE_CHECKPOINT_FAILURE: // ignore break; @@ -247,6 +247,7 @@ public class CheckpointFailureManager { case CHECKPOINT_ASYNC_EXCEPTION: case CHECKPOINT_DECLINED: case CHECKPOINT_EXPIRED: + case FINALIZE_CHECKPOINT_FAILURE: // we should make sure one checkpoint only be counted once if (checkpointId == UNKNOWN_CHECKPOINT_ID || countedCheckpointIds.add(checkpointId)) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java index eaa7a595e7d..9253799a17a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.util.FlinkRuntimeException; import java.util.Collection; import java.util.HashMap; @@ -152,7 +151,7 @@ public class DefaultCheckpointPlan implements CheckpointPlan { Map<OperatorID, OperatorState> operatorStates) { for (ExecutionJobVertex vertex : partlyFinishedVertex.values()) { if (hasUsedUnionListState(vertex, operatorStates)) { - throw new FlinkRuntimeException( + throw new PartialFinishingNotSupportedByStateException( String.format( "The vertex %s (id = %s) has used" + " UnionListState, but part of its tasks are FINISHED.", @@ -183,7 +182,7 @@ public class DefaultCheckpointPlan implements CheckpointPlan { if (entry.getValue() != vertex.getParallelism() && hasUsedUnionListState(vertex, operatorStates)) { - throw new FlinkRuntimeException( + throw new PartialFinishingNotSupportedByStateException( String.format( "The vertex %s (id = %s) has used" + " UnionListState, but part of its tasks has called operators' finish method.", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java index 167d3686042..4b17899b2ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.util.FlinkRuntimeException; import java.util.Map; @@ -33,5 +34,25 @@ public interface FinishedTaskStateProvider { void reportTaskHasFinishedOperators(ExecutionVertex task); /** Fulfills the state for the finished subtasks and operators to indicate they are finished. */ - void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState> operatorStates); + void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState> operatorStates) + throws PartialFinishingNotSupportedByStateException; + + /** + * Thrown when some subtasks of the operator have been finished but state doesn't support that + * (e.g. Union). + */ + class PartialFinishingNotSupportedByStateException extends FlinkRuntimeException { + + public PartialFinishingNotSupportedByStateException(String message) { + super(message); + } + + public PartialFinishingNotSupportedByStateException(Throwable cause) { + super(cause); + } + + public PartialFinishingNotSupportedByStateException(String message, Throwable cause) { + super(message, cause); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java index af373242c64..11807c3de85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java @@ -113,8 +113,14 @@ public class FsCheckpointStorageAccess extends AbstractFsCheckpointStorageAccess @Override public void initializeBaseLocationsForCheckpoint() throws IOException { - fileSystem.mkdirs(sharedStateDirectory); - fileSystem.mkdirs(taskOwnedStateDirectory); + if (!fileSystem.mkdirs(sharedStateDirectory)) { + throw new IOException( + "Failed to create directory for shared state: " + sharedStateDirectory); + } + if (!fileSystem.mkdirs(taskOwnedStateDirectory)) { + throw new IOException( + "Failed to create directory for task owned state: " + taskOwnedStateDirectory); + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java index 8ed9b037ade..a2084c6616a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java @@ -119,8 +119,9 @@ public class CheckpointFailureManagerTest extends TestLogger { checkpointProperties, new CheckpointException(reason), -2); } - // IO_EXCEPTION, CHECKPOINT_DECLINED, CHECKPOINT_EXPIRED and CHECKPOINT_ASYNC_EXCEPTION - assertEquals(4, callback.getInvokeCounter()); + // IO_EXCEPTION, CHECKPOINT_DECLINED, FINALIZE_CHECKPOINT_FAILURE, CHECKPOINT_EXPIRED and + // CHECKPOINT_ASYNC_EXCEPTION + assertEquals(5, callback.getInvokeCounter()); } @Test diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java index c23a4cdf420..27f6d78501f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.checkpointing; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -28,7 +29,12 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageAccess; +import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -39,7 +45,10 @@ import org.apache.flink.runtime.state.SnapshotExecutionType; import org.apache.flink.runtime.state.SnapshotResources; import org.apache.flink.runtime.state.SnapshotStrategy; import org.apache.flink.runtime.state.SnapshotStrategyRunner; +import org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -60,13 +69,41 @@ import javax.annotation.Nonnull; import java.util.Collection; import java.util.HashMap; -import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE; +import static org.junit.Assert.fail; + /** Tests to verify end-to-end logic of checkpoint failure manager. */ public class CheckpointFailureManagerITCase extends TestLogger { + /** + * Test that checkpoint finalization failure is counted by {@link CheckpointFailureManager} and + * eventually fails the job. In this test, finalization is failed by throwing an exception from + * {@link org.apache.flink.runtime.state.CheckpointStorageLocation#createMetadataOutputStream} + * which should fail the job. + */ + @Test + public void testFinalizationFailureCounted() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10); + env.getCheckpointConfig().setCheckpointStorage(new FailingFinalizationCheckpointStorage()); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new DiscardingSink<>()); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + try { + TestUtils.submitJobAndWaitForResult( + cluster.getClusterClient(), jobGraph, getClass().getClassLoader()); + fail("The job should fail"); + } catch (JobExecutionException jobException) { + if (!isCheckpointFailure(jobException)) { + throw jobException; + } + } + } + @ClassRule public static MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( @@ -85,12 +122,9 @@ public class CheckpointFailureManagerITCase extends TestLogger { TestUtils.submitJobAndWaitForResult( cluster.getClusterClient(), jobGraph, getClass().getClassLoader()); } catch (JobExecutionException jobException) { - Optional<FlinkRuntimeException> throwable = - ExceptionUtils.findThrowable(jobException, FlinkRuntimeException.class); - Assert.assertTrue(throwable.isPresent()); - Assert.assertEquals( - CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, - throwable.get().getMessage()); + if (!isCheckpointFailure(jobException)) { + throw jobException; + } } // assert that the job only failed once. Assert.assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get()); @@ -202,4 +236,35 @@ public class CheckpointFailureManagerITCase extends TestLogger { return this; } } + + private static class FailingFinalizationCheckpointStorage implements CheckpointStorage { + private static final long serialVersionUID = 8134582566514272546L; + + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) { + return new TestCompletedCheckpointStorageLocation(); + } + + @Override + public CheckpointStorageAccess createCheckpointStorage(JobID jobId) { + return new TestingCheckpointStorageAccessCoordinatorView() { + @Override + public CheckpointStorageLocation initializeLocationForCheckpoint( + long checkpointId) { + return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE) { + @Override + public CheckpointMetadataOutputStream createMetadataOutputStream() { + throw new RuntimeException("finalization failure"); + } + }; + } + }; + } + } + + private boolean isCheckpointFailure(JobExecutionException jobException) { + return ExceptionUtils.findThrowable(jobException, FlinkRuntimeException.class) + .filter(ex -> ex.getMessage().equals(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE)) + .isPresent(); + } }