This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b5140e9  [FLINK-23809][checkpoint] Respect the finished flag when 
extracting operator states due to skip in-flight data
b5140e9 is described below

commit b5140e96fcf7095439a1bd6697ad307a0ad52fe5
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Fri Aug 20 23:01:46 2021 +0800

    [FLINK-23809][checkpoint] Respect the finished flag when extracting 
operator states due to skip in-flight data
    
    This closes #16913.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 23 ++--------
 .../checkpoint/FullyFinishedOperatorState.java     |  6 +++
 .../flink/runtime/checkpoint/OperatorState.java    | 18 ++++++++
 .../CheckpointCoordinatorRestoringTest.java        | 49 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 1091c57..7e6d7b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1600,26 +1600,9 @@ public class CheckpointCoordinator {
         HashMap<OperatorID, OperatorState> newStates = new HashMap<>();
         // Create the new operator states without in-flight data.
         for (OperatorState originalOperatorState : 
originalOperatorStates.values()) {
-            OperatorState newState =
-                    new OperatorState(
-                            originalOperatorState.getOperatorID(),
-                            originalOperatorState.getParallelism(),
-                            originalOperatorState.getMaxParallelism());
-
-            newStates.put(newState.getOperatorID(), newState);
-
-            for (Map.Entry<Integer, OperatorSubtaskState> 
originalSubtaskStateEntry :
-                    originalOperatorState.getSubtaskStates().entrySet()) {
-
-                newState.putState(
-                        originalSubtaskStateEntry.getKey(),
-                        originalSubtaskStateEntry
-                                .getValue()
-                                .toBuilder()
-                                
.setResultSubpartitionState(StateObjectCollection.empty())
-                                
.setInputChannelState(StateObjectCollection.empty())
-                                .build());
-            }
+            newStates.put(
+                    originalOperatorState.getOperatorID(),
+                    originalOperatorState.copyAndDiscardInFlightData());
         }
 
         return newStates;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
index be2b196..1726ec9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
@@ -53,6 +53,12 @@ public class FullyFinishedOperatorState extends 
OperatorState {
     }
 
     @Override
+    public OperatorState copyAndDiscardInFlightData() {
+        return new FullyFinishedOperatorState(
+                getOperatorID(), getParallelism(), getMaxParallelism());
+    }
+
+    @Override
     public int hashCode() {
         return super.hashCode();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index d68130d..ace8878 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -141,6 +141,24 @@ public class OperatorState implements CompositeStateHandle 
{
         return maxParallelism;
     }
 
+    public OperatorState copyAndDiscardInFlightData() {
+        OperatorState newState = new OperatorState(operatorID, parallelism, 
maxParallelism);
+
+        for (Map.Entry<Integer, OperatorSubtaskState> 
originalSubtaskStateEntry :
+                operatorSubtaskStates.entrySet()) {
+            newState.putState(
+                    originalSubtaskStateEntry.getKey(),
+                    originalSubtaskStateEntry
+                            .getValue()
+                            .toBuilder()
+                            
.setResultSubpartitionState(StateObjectCollection.empty())
+                            
.setInputChannelState(StateObjectCollection.empty())
+                            .build());
+        }
+
+        return newState;
+    }
+
     @Override
     public void discardState() throws Exception {
         for (OperatorSubtaskState operatorSubtaskState : 
operatorSubtaskStates.values()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 5ff7efb..8b698c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -1208,6 +1208,55 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
     }
 
     @Test
+    public void testRestoreFinishedStateWithoutInFlightData() throws Exception 
{
+        // given: Operator with not empty states.
+        OperatorIDPair op1 = OperatorIDPair.generatedIDOnly(new OperatorID());
+        final JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID, 1, 1, singletonList(op1), 
true)
+                        .build();
+
+        CompletedCheckpointStore completedCheckpointStore = new 
EmbeddedCompletedCheckpointStore();
+        Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+        operatorStates.put(
+                op1.getGeneratedOperatorID(),
+                new FullyFinishedOperatorState(op1.getGeneratedOperatorID(), 
1, 1));
+        CompletedCheckpoint completedCheckpoint =
+                new CompletedCheckpoint(
+                        graph.getJobID(),
+                        2,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis() + 3000,
+                        operatorStates,
+                        Collections.emptyList(),
+                        CheckpointProperties.forCheckpoint(
+                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                        new TestCompletedCheckpointStorageLocation());
+        completedCheckpointStore.addCheckpoint(
+                completedCheckpoint, new CheckpointsCleaner(), () -> {});
+
+        CheckpointCoordinator coord =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCheckpointCoordinatorConfiguration(
+                                new CheckpointCoordinatorConfigurationBuilder()
+                                        
.setCheckpointIdOfIgnoredInFlightData(2)
+                                        .build())
+                        .setCompletedCheckpointStore(completedCheckpointStore)
+                        .build();
+
+        ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID);
+        coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex));
+        TaskStateSnapshot restoredState =
+                vertex.getTaskVertices()[0]
+                        .getCurrentExecutionAttempt()
+                        .getTaskRestore()
+                        .getTaskStateSnapshot();
+        assertTrue(restoredState.isFinishedOnRestore());
+    }
+
+    @Test
     public void testRestoringPartiallyFinishedChainsFails() throws Exception {
         final JobVertexID jobVertexID1 = new JobVertexID();
         final JobVertexID jobVertexID2 = new JobVertexID();

Reply via email to