huwh commented on code in PR #21565: URL: https://github.com/apache/flink/pull/21565#discussion_r1061433261
########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java: ########## @@ -435,9 +487,19 @@ private void startTaskExecutorInContainerAsync( containerLaunchContextFuture.handleAsync( (context, exception) -> { if (exception == null) { - nodeManagerClient.startContainerAsync(container, context); - requestResourceFuture.complete( - new YarnWorkerNode(container, resourceId)); + if (FutureUtils.isCompletedWithException( + requestResourceFuture, + ActiveResourceManager.RequestCancelledException.class)) { + log.info( + "container {} already be cancelled.", + container.getId()); + resourceManagerClient.releaseAssignedContainer( + container.getId()); + } else { + nodeManagerClient.startContainerAsync(container, context); + requestResourceFuture.complete( + new YarnWorkerNode(container, resourceId)); + } Review Comment: Currently the requestFuture will be removed from requestResourceFutures in #onContainersOfPriorityAllocated, but this future will only complete when #createTaskExecutorLaunchContext is done. So it is possible that startTaskExecutorInContainerAsync received a requestFuture, and then this future was cancelled. Maybe we can complete the requestFuture in #onContainersOfPriorityAllocated, and when createTaskExecutorLaunchContext finished with exception, we can call getResourceEventHandler().onWorkerTerminated to tell RM this resource finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org