Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168197381 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List<ContainerStatus> list) { @Override public void onContainersAllocated(List<Container> containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - requestYarnContainer(container.getResource(), container.getPriority()); } } + + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval if (numPendingContainerRequests <= 0) { --- End diff -- yes indeed. However, the logic should not be affected by using `<=` as the comparison operator.
---