[ https://issues.apache.org/jira/browse/FLINK-7804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401580#comment-16401580 ]
ASF GitHub Bot commented on FLINK-7804: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5675#discussion_r175020997 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,67 +328,74 @@ public float getProgress() { @Override public void onContainersCompleted(List<ContainerStatus> list) { - for (ContainerStatus container : list) { - if (container.getExitStatus() < 0) { - closeTaskManagerConnection(new ResourceID( - container.getContainerId().toString()), new Exception(container.getDiagnostics())); + runAsync(() -> { + for (ContainerStatus container : list) { + if (container.getExitStatus() < 0) { + closeTaskManagerConnection(new ResourceID( + container.getContainerId().toString()), new Exception(container.getDiagnostics())); + } + workerNodeMap.remove(new ResourceID(container.getContainerId().toString())); + } } - workerNodeMap.remove(new ResourceID(container.getContainerId().toString())); - } + ); } @Override public void onContainersAllocated(List<Container> containers) { - for (Container container : containers) { - log.info( - "Received new container: {} - Remaining pending container requests: {}", - container.getId(), - numPendingContainerRequests); - - if (numPendingContainerRequests > 0) { - numPendingContainerRequests--; - - final String containerIdStr = container.getId().toString(); - - workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); - - try { - // Context information used to start a TaskExecutor Java process - ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( - container.getResource(), - containerIdStr, - container.getNodeId().getHost()); - - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } catch (Throwable t) { - log.error("Could not start TaskManager in container {}.", container.getId(), t); - - // release the failed container + runAsync(() -> { + for (Container container : containers) { + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests--; + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - // and ask for a new one - requestYarnContainer(container.getResource(), container.getPriority()); } - } else { - // return the excessive containers - log.info("Returning excess container {}.", container.getId()); - resourceManagerClient.releaseAssignedContainer(container.getId()); } - } - // if we are waiting for no further containers, we can go to the - // regular heartbeat interval - if (numPendingContainerRequests <= 0) { - resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); - } + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval + if (numPendingContainerRequests <= 0) { + resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); + } + }); } @Override public void onShutdownRequest() { - try { - shutDown(); - } catch (Exception e) { - log.warn("Fail to shutdown the YARN resource manager.", e); - } + runAsync(() -> { + try { + shutDown(); --- End diff -- done > YarnResourceManager does not execute AMRMClientAsync callbacks in main thread > ----------------------------------------------------------------------------- > > Key: FLINK-7804 > URL: https://issues.apache.org/jira/browse/FLINK-7804 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN > Affects Versions: 1.4.0, 1.5.0 > Reporter: Till Rohrmann > Assignee: Gary Yao > Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} registers callbacks at a {{AMRMClientAsync}} > which it uses to react to Yarn container allocations. These callbacks (e.g. > {{onContainersAllocated}} modify the internal state of the > {{YarnResourceManager}}. This can lead to race conditions with the > {{requestYarnContainer}} method. > In order to solve this problem we have to execute the state changing > operations in the main thread of the {{YarnResourceManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)