This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b5140e9 [FLINK-23809][checkpoint] Respect the finished flag when extracting operator states due to skip in-flight data b5140e9 is described below commit b5140e96fcf7095439a1bd6697ad307a0ad52fe5 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Fri Aug 20 23:01:46 2021 +0800 [FLINK-23809][checkpoint] Respect the finished flag when extracting operator states due to skip in-flight data This closes #16913. --- .../runtime/checkpoint/CheckpointCoordinator.java | 23 ++-------- .../checkpoint/FullyFinishedOperatorState.java | 6 +++ .../flink/runtime/checkpoint/OperatorState.java | 18 ++++++++ .../CheckpointCoordinatorRestoringTest.java | 49 ++++++++++++++++++++++ 4 files changed, 76 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 1091c57..7e6d7b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1600,26 +1600,9 @@ public class CheckpointCoordinator { HashMap<OperatorID, OperatorState> newStates = new HashMap<>(); // Create the new operator states without in-flight data. for (OperatorState originalOperatorState : originalOperatorStates.values()) { - OperatorState newState = - new OperatorState( - originalOperatorState.getOperatorID(), - originalOperatorState.getParallelism(), - originalOperatorState.getMaxParallelism()); - - newStates.put(newState.getOperatorID(), newState); - - for (Map.Entry<Integer, OperatorSubtaskState> originalSubtaskStateEntry : - originalOperatorState.getSubtaskStates().entrySet()) { - - newState.putState( - originalSubtaskStateEntry.getKey(), - originalSubtaskStateEntry - .getValue() - .toBuilder() - .setResultSubpartitionState(StateObjectCollection.empty()) - .setInputChannelState(StateObjectCollection.empty()) - .build()); - } + newStates.put( + originalOperatorState.getOperatorID(), + originalOperatorState.copyAndDiscardInFlightData()); } return newStates; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java index be2b196..1726ec9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java @@ -53,6 +53,12 @@ public class FullyFinishedOperatorState extends OperatorState { } @Override + public OperatorState copyAndDiscardInFlightData() { + return new FullyFinishedOperatorState( + getOperatorID(), getParallelism(), getMaxParallelism()); + } + + @Override public int hashCode() { return super.hashCode(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java index d68130d..ace8878 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java @@ -141,6 +141,24 @@ public class OperatorState implements CompositeStateHandle { return maxParallelism; } + public OperatorState copyAndDiscardInFlightData() { + OperatorState newState = new OperatorState(operatorID, parallelism, maxParallelism); + + for (Map.Entry<Integer, OperatorSubtaskState> originalSubtaskStateEntry : + operatorSubtaskStates.entrySet()) { + newState.putState( + originalSubtaskStateEntry.getKey(), + originalSubtaskStateEntry + .getValue() + .toBuilder() + .setResultSubpartitionState(StateObjectCollection.empty()) + .setInputChannelState(StateObjectCollection.empty()) + .build()); + } + + return newState; + } + @Override public void discardState() throws Exception { for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index 5ff7efb..8b698c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -1208,6 +1208,55 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { } @Test + public void testRestoreFinishedStateWithoutInFlightData() throws Exception { + // given: Operator with not empty states. + OperatorIDPair op1 = OperatorIDPair.generatedIDOnly(new OperatorID()); + final JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID, 1, 1, singletonList(op1), true) + .build(); + + CompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore(); + Map<OperatorID, OperatorState> operatorStates = new HashMap<>(); + operatorStates.put( + op1.getGeneratedOperatorID(), + new FullyFinishedOperatorState(op1.getGeneratedOperatorID(), 1, 1)); + CompletedCheckpoint completedCheckpoint = + new CompletedCheckpoint( + graph.getJobID(), + 2, + System.currentTimeMillis(), + System.currentTimeMillis() + 3000, + operatorStates, + Collections.emptyList(), + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + new TestCompletedCheckpointStorageLocation()); + completedCheckpointStore.addCheckpoint( + completedCheckpoint, new CheckpointsCleaner(), () -> {}); + + CheckpointCoordinator coord = + new CheckpointCoordinatorBuilder() + .setExecutionGraph(graph) + .setCheckpointCoordinatorConfiguration( + new CheckpointCoordinatorConfigurationBuilder() + .setCheckpointIdOfIgnoredInFlightData(2) + .build()) + .setCompletedCheckpointStore(completedCheckpointStore) + .build(); + + ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID); + coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex)); + TaskStateSnapshot restoredState = + vertex.getTaskVertices()[0] + .getCurrentExecutionAttempt() + .getTaskRestore() + .getTaskStateSnapshot(); + assertTrue(restoredState.isFinishedOnRestore()); + } + + @Test public void testRestoringPartiallyFinishedChainsFails() throws Exception { final JobVertexID jobVertexID1 = new JobVertexID(); final JobVertexID jobVertexID2 = new JobVertexID();