huwh commented on code in PR #21565:
URL: https://github.com/apache/flink/pull/21565#discussion_r1061433261


##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java:
##########
@@ -435,9 +487,19 @@ private void startTaskExecutorInContainerAsync(
                 containerLaunchContextFuture.handleAsync(
                         (context, exception) -> {
                             if (exception == null) {
-                                
nodeManagerClient.startContainerAsync(container, context);
-                                requestResourceFuture.complete(
-                                        new YarnWorkerNode(container, 
resourceId));
+                                if (FutureUtils.isCompletedWithException(
+                                        requestResourceFuture,
+                                        
ActiveResourceManager.RequestCancelledException.class)) {
+                                    log.info(
+                                            "container {} already be 
cancelled.",
+                                            container.getId());
+                                    
resourceManagerClient.releaseAssignedContainer(
+                                            container.getId());
+                                } else {
+                                    
nodeManagerClient.startContainerAsync(container, context);
+                                    requestResourceFuture.complete(
+                                            new YarnWorkerNode(container, 
resourceId));
+                                }

Review Comment:
   Currently the requestFuture will be removed from requestResourceFutures in 
#onContainersOfPriorityAllocated, but this future will only complete when 
#createTaskExecutorLaunchContext is done. So it is possible that 
startTaskExecutorInContainerAsync received a requestFuture, and then this 
future was cancelled.
   
   Maybe we can complete the requestFuture in #onContainersOfPriorityAllocated, 
and when createTaskExecutorLaunchContext finished with exception, we can call 
getResourceEventHandler().onWorkerTerminated to tell RM this resource finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to