Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5087#discussion_r156731976
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -965,11 +965,20 @@ public void cancel() {
     
                                        // we build a future that is complete 
once all vertices have reached a terminal state
                                        final ConjunctFuture<Void> allTerminal 
= FutureUtils.waitForAll(futures);
    -                                   allTerminal.thenAccept(
    -                                           (Void value) -> {
    -                                                   // cancellations may 
currently be overridden by failures which trigger
    -                                                   // restarts, so we need 
to pass a proper restart global version here
    -                                                   
allVerticesInTerminalState(globalVersionForRestart);
    +                                   allTerminal.whenCompleteAsync(
    --- End diff --
    
    It does not have to run asynchronously. I will remove it.
    
    We need the `transitionState` because it might happen that we cannot 
properly cancel all `ExecutionJobVertices`. E.g. if the `SlotOwner` cannot 
return the allocated slot. This is currently not the case, but we should guard 
against it if this should change in the future.


---

Reply via email to