Repository: hadoop Updated Branches: refs/heads/branch-2 736fb3b66 -> fa86fdc3c
YARN-7339. LocalityMulticastAMRMProxyPolicy should handle cancel request properly. (Botong Huang via curino) Edited cherry-pick from 1c5c2b5dde6f2cffc587ca8f79a18828e1b1faf9 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa86fdc3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa86fdc3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa86fdc3 Branch: refs/heads/branch-2 Commit: fa86fdc3cc3cc9c4d4bfabcd6dfadcf4635dc92e Parents: 736fb3b6 Author: Carlo Curino <[email protected]> Authored: Wed Oct 25 13:56:35 2017 -0700 Committer: Carlo Curino <[email protected]> Committed: Wed Oct 25 13:56:35 2017 -0700 ---------------------------------------------------------------------- .../LocalityMulticastAMRMProxyPolicy.java | 41 +++++++------- .../TestLocalityMulticastAMRMProxyPolicy.java | 57 ++++++++++++++++++++ 2 files changed, 78 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa86fdc3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index e07c7c2..06c9ede 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -326,10 +326,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // any RM we have previously contacted (this might be the user way // to cancel a previous request). if (numContainer == 0) { - for (SubClusterId targetId : targetSubclusters) { - if (headroom.containsKey(targetId)) { - allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); - } + for (SubClusterId targetId : headroom.keySet()) { + allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); } return; } @@ -562,24 +560,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { Preconditions .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); - if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { - countContainersPerRM.put(rr.getAllocationRequestId(), - new HashMap<SubClusterId, AtomicLong>()); - } - if (!countContainersPerRM.get(rr.getAllocationRequestId()) - .containsKey(targetId)) { - countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, - new AtomicLong(0)); - } - countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) - .addAndGet(rr.getNumContainers()); + if (rr.getNumContainers() > 0) { + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), + new HashMap<SubClusterId, AtomicLong>()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(targetId)) { + countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, + new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) + .addAndGet(rr.getNumContainers()); - if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) { - totNumLocalizedContainers.put(rr.getAllocationRequestId(), - new AtomicLong(0)); + if (!totNumLocalizedContainers + .containsKey(rr.getAllocationRequestId())) { + totNumLocalizedContainers.put(rr.getAllocationRequestId(), + new AtomicLong(0)); + } + totNumLocalizedContainers.get(rr.getAllocationRequestId()) + .addAndGet(rr.getNumContainers()); } - totNumLocalizedContainers.get(rr.getAllocationRequestId()) - .addAndGet(rr.getNumContainers()); internalAddToAnswer(targetId, rr); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa86fdc3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 46a6011..f66bbb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -659,4 +659,61 @@ public class TestLocalityMulticastAMRMProxyPolicy "Expect sum to be 19 in array: " + printList(allocations), 19, sum); } } + + @Test + public void testCancelWithLocalizedResource() throws YarnException { + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + List<ResourceRequest> resourceRequests = new ArrayList<>(); + + // Initialize the headroom map + prepPolicyWithHeadroom(); + + // Cancel at ANY level only + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 1, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 0, null, false)); + + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + checkExpectedAllocation(response, "subcluster0", 3, 1); + checkExpectedAllocation(response, "subcluster1", 1, 0); + checkExpectedAllocation(response, "subcluster2", 1, 0); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", -1, -1); + + resourceRequests.clear(); + // Cancel at node level only + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 0, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 0, null, false)); + resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, false)); + + response = ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + /* + * Since node request is a cancel, it should not be considered associated + * with localized requests. Based on headroom, we expect 75 containers to + * got to subcluster0 (60) and subcluster2 (15) according to the advertised + * headroom (40 and 10), no containers for sublcuster1 as it advertise zero + * headroom, and 25 to subcluster5 which has unknown headroom, and so it + * gets 1/4th of the load + */ + checkExpectedAllocation(response, "subcluster0", 3, 60); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 15); + checkExpectedAllocation(response, "subcluster5", 1, 25); + checkTotalContainerAllocation(response, 100); + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
