zhuzhurk commented on a change in pull request #13749:
URL: https://github.com/apache/flink/pull/13749#discussion_r511632430



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
##########
@@ -121,7 +122,12 @@ public RestartPipelinedRegionFailoverStrategy(
                // calculate the tasks to restart based on the result of 
regions to restart
                Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
                for (SchedulingPipelinedRegion region : 
getRegionsToRestart(failedRegion)) {
-                       region.getVertices().forEach(vertex -> 
tasksToRestart.add(vertex.getId()));
+                       for (SchedulingExecutionVertex vertex : 
region.getVertices()) {
+                               // we do not need to restart tasks which are 
already in the initial state
+                               if (vertex.getState() != 
ExecutionState.CREATED) {

Review comment:
       How about we skip visiting regions in which all vertices are `CREATED` 
in `getRegionsToRestart()`?
   This may significantly reduce the computing complexity for most batch job, 
because we may only need to check N vertices and N edges, instead of KN 
vertices and K*N^2 edges. (N is the parallelism and K is the JobVertex count) 
(Exceptional cases are `PartitionException` cases and jobs using 
`LazyFromSourcesSchedulingStrategy` with `InputDependencyConstraint=ANY` but 
they may also benefit from it)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to