Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/11612#discussion_r55891228 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -265,25 +265,52 @@ private[yarn] class YarnAllocator( // For locality unmatched and locality free container requests, cancel these container // requests, since required locality preference has been changed, recalculating using // container placement strategy. - val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality( + val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( hostToLocalTaskCounts, pendingAllocate) - // Remove the outdated container request and recalculate the requested container number - localityUnMatched.foreach(amClient.removeContainerRequest) - localityFree.foreach(amClient.removeContainerRequest) - val updatedNumContainer = missing + localityUnMatched.size + localityFree.size + // cancel "stale" requests for locations that are no longer needed + staleRequests.foreach { stale => + amClient.removeContainerRequest(stale) + } + val cancelledContainers = staleRequests.size + logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)") + + // consider the number of new containers and cancelled stale containers available + val availableContainers = missing + cancelledContainers + + // to maximize locality, include requests with no locality preference that can be cancelled + val potentialContainers = availableContainers + anyHostRequests.size val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( - updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts, - allocatedHostToContainersMap, localityMatched) + potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts, + allocatedHostToContainersMap, localRequests) + + val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] + containerLocalityPreferences.foreach { + case ContainerLocalityPreferences(nodes, racks) if nodes != null => + newLocalityRequests.append(createContainerRequest(resource, nodes, racks)) + case _ => + } - for (locality <- containerLocalityPreferences) { - val request = createContainerRequest(resource, locality.nodes, locality.racks) + if (availableContainers >= newLocalityRequests.size) { + // more containers are available than needed for locality, fill in requests for any host + for (i <- 0 until (availableContainers - newLocalityRequests.size)) { + newLocalityRequests.append(createContainerRequest(resource, null, null)) + } + } else { + val numToCancel = newLocalityRequests.size - availableContainers --- End diff -- Can this ever become negative? `potentialContainers` is always greater or equal than `availableContainers`, but is the same true for `newLocalityRequests.size`? Given the "catch all" in L292 it seems it could be less than `potentialContainers`. Or, if that's not expected to happen, maybe put an assert in L292, or make sure that case can't happen?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org