pnowojski commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r697436574



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
##########
@@ -71,7 +75,7 @@
             prioritizedResultSubpartitionState;
 
     /** Signal flag if this represents state for a restored operator. */
-    private final boolean restored;
+    private final @Nullable Long restoredCheckpointId;

Review comment:
       Outdated comment?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
##########
@@ -254,11 +254,9 @@ private void assignNonFinishedStateToTask(
                     operatorID.getGeneratedOperatorID(), operatorSubtaskState);
         }
 
-        if (!statelessTask) {
-            JobManagerTaskRestore taskRestore =
-                    new JobManagerTaskRestore(restoreCheckpointId, taskState);
-            currentExecutionAttempt.setInitialState(taskRestore);
-        }
+        JobManagerTaskRestore taskRestore =
+                new JobManagerTaskRestore(restoreCheckpointId, taskState);
+        currentExecutionAttempt.setInitialState(taskRestore);

Review comment:
       Apart of that, there are tons of `@Nullable` annotations around 
`JobManagerTaskRestore` that are no longer needed in your version.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
##########
@@ -219,22 +239,22 @@ public static PrioritizedOperatorSubtaskState 
emptyNotRestored() {
         @Nonnull private final List<OperatorSubtaskState> 
alternativesByPriority;
 
         /** Flag if the states have been restored. */
-        private final boolean restored;
+        private final @Nullable Long restoredCheckpointId;

Review comment:
       Ditto

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
##########
@@ -254,11 +254,9 @@ private void assignNonFinishedStateToTask(
                     operatorID.getGeneratedOperatorID(), operatorSubtaskState);
         }
 
-        if (!statelessTask) {
-            JobManagerTaskRestore taskRestore =
-                    new JobManagerTaskRestore(restoreCheckpointId, taskState);
-            currentExecutionAttempt.setInitialState(taskRestore);
-        }
+        JobManagerTaskRestore taskRestore =
+                new JobManagerTaskRestore(restoreCheckpointId, taskState);
+        currentExecutionAttempt.setInitialState(taskRestore);

Review comment:
       I fear this change might have side effects:
   - `ExecutionVertex#getPreferredLocationBasedOnState`
   - `TaskStateManagerImpl#jobManagerTaskRestore`
   
   have you checked how this will alter the behaviour?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to