zhuzhurk commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425754123



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -122,7 +135,20 @@ public boolean canRestart() {
         * @return result of a set of tasks to restart to recover from the 
failure
         */
        public static FailureHandlingResult restartable(Set<ExecutionVertexID> 
verticesToRestart, long restartDelayMS) {

Review comment:
       Minor: If we make `globalFailure` a param instead of adding new factory 
methods, seems the changes in both `FailureHandlingResult` can 
`ExecutionFailureHandler` can be much simpler. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1127,16 +1127,44 @@ public boolean restoreLatestCheckpointedState(
                        boolean errorIfNoCheckpoint,
                        boolean allowNonRestoredState) throws Exception {
 
-               return restoreLatestCheckpointedState(new 
HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+               return restoreLatestCheckpointedStateInternal(new 
HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
        }
 
        /**
-        * Restores the latest checkpointed state.
+        * Restores the latest checkpointed state to a set of subtasks. This 
method represents a "local"
+        * or "regional" failover and does restore states to coordinators. Note 
that a regional failover
+        * might still include all tasks.
+        *
+        * @param tasks Set of job vertices to restore. State for these 
vertices is
+        * restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
+
+        * @return <code>true</code> if state was restored, <code>false</code> 
otherwise.
+        * @throws IllegalStateException If the CheckpointCoordinator is shut 
down.
+        * @throws IllegalStateException If no completed checkpoint is 
available and
+        *                               the <code>failIfNoCheckpoint</code> 
flag has been set.
+        * @throws IllegalStateException If the checkpoint contains state that 
cannot be
+        *                               mapped to any job vertex in 
<code>tasks</code> and the
+        *                               <code>allowNonRestoredState</code> 
flag has not been set.
+        * @throws IllegalStateException If the max parallelism changed for an 
operator
+        *                               that restores state from this 
checkpoint.
+        * @throws IllegalStateException If the parallelism changed for an 
operator
+        *                               that restores <i>non-partitioned</i> 
state from this
+        *                               checkpoint.
+        */
+       public boolean restoreLatestCheckpointedStateToSubtasks(final 
Set<ExecutionJobVertex> tasks) throws Exception {
+               // when restoring subtasks only we accept potentially unmatched 
state because what we

Review comment:
       I think another reason why `allowNonRestoredState` must be true is that 
not all `JobVertex` are checked so that states of an absent `JobVertex` cannot 
get matched.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to