GJL commented on a change in pull request #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states URL: https://github.com/apache/flink/pull/9860#discussion_r333976919
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ########## @@ -211,7 +215,8 @@ private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexV } private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) { - return executionVertexOperations.cancel(getExecutionVertex(executionVertexId)); + return executionVertexOperations.cancel(getExecutionVertex(executionVertexId)) + .whenComplete((Object ignored, Throwable t) -> executionSlotAllocator.cancel(executionVertexId)); Review comment: > We can add a check in allocateSlotsAndDeploy() to guarantee that DefaultExecutionSlotAllocator has no pending slot assignment for the vertices to schedule. The check will fail only if there is a bug. That would be good to have. > It should not happen because the cancelFuture returned by cancelExecutionVertex completes only if executionSlotAllocator.cancel() completes. And a vertex will only be restarted if its cancelFuture completes. That sounds right. I still don't like the use of `whenComplete` here. If the slot is already provisioned, then `DefaultExecutionSlotAllocator#cancel()` should be a no-op. If the slot future is not completed, then it cannot be assigned to the execution. I don't see harm to cancel the slot allocation independently from the vertex state. I think the difference to `Execution#allocateAndAssignSlotForExecution()`: ``` // register call back to cancel slot request in case that the execution gets canceled releaseFuture.whenComplete( (Object ignored, Throwable throwable) -> { if (logicalSlotFuture.cancel(false)) { slotProviderStrategy.cancelSlotRequest( slotRequestId, slotSharingGroupId, new FlinkException("Execution " + this + " was released.")); } }); ``` is that `DefaultScheduler#cancelExecutionVertex()` is not an asynchronous callback. If `cancelExecutionVertex()` is invoked, we already know that the vertex must be cancelled. Using `whenComplete` implies that the `executionSlotAllocator.cancel()` must be invoked after the vertex cancellation is finished. If there is no such dependency, I'd prefer to keep the code a bit simpler by removing `whenComplete`. Of course, if there is such a dependency, let's keep it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services