YARN-7317. Fix overallocation resulted from ceiling in LocalityMulticastAMRMProxyPolicy. (contributed by Botong Huang via curino)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13fcfb3d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13fcfb3d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13fcfb3d Branch: refs/heads/YARN-1011 Commit: 13fcfb3d46ee7a0d606b4bb221d1cd66ef2a5a7c Parents: 075358e Author: Carlo Curino <cur...@apache.org> Authored: Thu Oct 12 10:38:58 2017 -0700 Committer: Carlo Curino <cur...@apache.org> Committed: Thu Oct 12 10:38:58 2017 -0700 ---------------------------------------------------------------------- .../policies/FederationPolicyUtils.java | 41 ++++++- .../LocalityMulticastAMRMProxyPolicy.java | 103 ++++++++++++++--- .../router/WeightedRandomRouterPolicy.java | 33 ++---- .../policies/TestFederationPolicyUtils.java | 58 ++++++++++ .../TestLocalityMulticastAMRMProxyPolicy.java | 110 ++++++++++++++----- 5 files changed, 279 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fcfb3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java index 7716a6f..aaa2c43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.server.federation.policies; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,8 @@ public final class FederationPolicyUtils { public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE = "No active SubCluster available to submit the request."; + private static final Random RAND = new Random(System.currentTimeMillis()); + /** Disable constructor. */ private FederationPolicyUtils() { } @@ -200,4 +204,39 @@ public final class FederationPolicyUtils { FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } -} \ No newline at end of file + /** + * Select a random bin according to the weight array for the bins. Only bins + * with positive weights will be considered. If no positive weight found, + * return -1. + * + * @param weights the weight array + * @return the index of the sample in the array + */ + public static int getWeightedRandom(ArrayList<Float> weights) { + int i; + float totalWeight = 0; + for (i = 0; i < weights.size(); i++) { + if (weights.get(i) > 0) { + totalWeight += weights.get(i); + } + } + if (totalWeight == 0) { + return -1; + } + float samplePoint = RAND.nextFloat() * totalWeight; + int lastIndex = 0; + for (i = 0; i < weights.size(); i++) { + if (weights.get(i) > 0) { + if (samplePoint <= weights.get(i)) { + return i; + } else { + lastIndex = i; + samplePoint -= weights.get(i); + } + } + } + // This can only happen if samplePoint is very close to totoalWeight and + // float rounding kicks in during subtractions + return lastIndex; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fcfb3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 454962f..da30d98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -34,7 +34,9 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; @@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -314,25 +317,33 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { */ private void splitIndividualAny(ResourceRequest originalResourceRequest, Set<SubClusterId> targetSubclusters, - AllocationBookkeeper allocationBookkeeper) { + AllocationBookkeeper allocationBookkeeper) throws YarnException { long allocationId = originalResourceRequest.getAllocationRequestId(); - - for (SubClusterId targetId : targetSubclusters) { - float numContainer = originalResourceRequest.getNumContainers(); - - // If the ANY request has 0 containers to begin with we must forward it to - // any RM we have previously contacted (this might be the user way - // to cancel a previous request). - if (numContainer == 0 && headroom.containsKey(targetId)) { - allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); + int numContainer = originalResourceRequest.getNumContainers(); + + // If the ANY request has 0 containers to begin with we must forward it to + // any RM we have previously contacted (this might be the user way + // to cancel a previous request). + if (numContainer == 0) { + for (SubClusterId targetId : targetSubclusters) { + if (headroom.containsKey(targetId)) { + allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); + } } + return; + } + + // List preserves iteration order + List<SubClusterId> targetSCs = new ArrayList<>(targetSubclusters); + // Compute the distribution weights + ArrayList<Float> weightsList = new ArrayList<>(); + for (SubClusterId targetId : targetSCs) { // If ANY is associated with localized asks, split based on their ratio if (allocationBookkeeper.getSubClustersForId(allocationId) != null) { - float localityBasedWeight = getLocalityBasedWeighting(allocationId, - targetId, allocationBookkeeper); - numContainer = numContainer * localityBasedWeight; + weightsList.add(getLocalityBasedWeighting(allocationId, targetId, + allocationBookkeeper)); } else { // split ANY based on load and policy configuration float headroomWeighting = @@ -340,12 +351,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { float policyWeighting = getPolicyConfigWeighting(targetId, allocationBookkeeper); // hrAlpha controls how much headroom influencing decision - numContainer = numContainer - * (hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting); + weightsList + .add(hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting); } + } + // Compute the integer container counts for each sub-cluster + ArrayList<Integer> containerNums = + computeIntegerAssignment(numContainer, weightsList); + int i = 0; + for (SubClusterId targetId : targetSCs) { // if the calculated request is non-empty add it to the answer - if (numContainer > 0) { + if (containerNums.get(i) > 0) { ResourceRequest out = ResourceRequest.newInstance(originalResourceRequest.getPriority(), originalResourceRequest.getResourceName(), @@ -355,14 +372,66 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { originalResourceRequest.getNodeLabelExpression(), originalResourceRequest.getExecutionTypeRequest()); out.setAllocationRequestId(allocationId); - out.setNumContainers((int) Math.ceil(numContainer)); + out.setNumContainers(containerNums.get(i)); if (ResourceRequest.isAnyLocation(out.getResourceName())) { allocationBookkeeper.addAnyRR(targetId, out); } else { allocationBookkeeper.addRackRR(targetId, out); } } + i++; + } + } + + /** + * Split the integer into bins according to the weights. + * + * @param totalNum total number of containers to split + * @param weightsList the weights for each subcluster + * @return the container allocation after split + * @throws YarnException if fails + */ + @VisibleForTesting + protected ArrayList<Integer> computeIntegerAssignment(int totalNum, + ArrayList<Float> weightsList) throws YarnException { + int i, residue; + ArrayList<Integer> ret = new ArrayList<>(); + float totalWeight = 0, totalNumFloat = totalNum; + + if (weightsList.size() == 0) { + return ret; + } + for (i = 0; i < weightsList.size(); i++) { + ret.add(0); + if (weightsList.get(i) > 0) { + totalWeight += weightsList.get(i); + } + } + if (totalWeight == 0) { + StringBuilder sb = new StringBuilder(); + for (Float weight : weightsList) { + sb.append(weight + ", "); + } + throw new FederationPolicyException( + "No positive value found in weight array " + sb.toString()); + } + + // First pass, do flooring for all bins + residue = totalNum; + for (i = 0; i < weightsList.size(); i++) { + if (weightsList.get(i) > 0) { + int base = (int) (totalNumFloat * weightsList.get(i) / totalWeight); + ret.set(i, ret.get(i) + base); + residue -= base; + } + } + + // By now residue < weights.length, assign one a time + for (i = 0; i < residue; i++) { + int index = FederationPolicyUtils.getWeightedRandom(weightsList); + ret.set(index, ret.get(index) + 1); } + return ret; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fcfb3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index aec7576..b143410 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.federation.policies.router; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Random; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This policy implements a weighted random sample among currently active @@ -38,10 +36,6 @@ import org.slf4j.LoggerFactory; */ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { - private static final Logger LOG = - LoggerFactory.getLogger(WeightedRandomRouterPolicy.class); - private Random rand = new Random(System.currentTimeMillis()); - @Override public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext, @@ -63,32 +57,25 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights(); - float totActiveWeight = 0; + ArrayList<Float> weightList = new ArrayList<>(); + ArrayList<SubClusterId> scIdList = new ArrayList<>(); for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) { if (blacklist != null && blacklist.contains(entry.getKey().toId())) { continue; } if (entry.getKey() != null && activeSubclusters.containsKey(entry.getKey().toId())) { - totActiveWeight += entry.getValue(); + weightList.add(entry.getValue()); + scIdList.add(entry.getKey().toId()); } } - float lookupValue = rand.nextFloat() * totActiveWeight; - for (SubClusterId id : activeSubclusters.keySet()) { - if (blacklist != null && blacklist.contains(id)) { - continue; - } - SubClusterIdInfo idInfo = new SubClusterIdInfo(id); - if (weights.containsKey(idInfo)) { - lookupValue -= weights.get(idInfo); - } - if (lookupValue <= 0) { - return id; - } + int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList); + if (pickedIndex == -1) { + throw new FederationPolicyException( + "No positive weight found on active subclusters"); } - // should never happen - return null; + return scIdList.get(pickedIndex); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fcfb3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyUtils.java new file mode 100644 index 0000000..d960978 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyUtils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import java.util.ArrayList; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit test for {@link FederationPolicyUtils}. + */ +public class TestFederationPolicyUtils { + + @Test + public void testGetWeightedRandom() { + int i; + float[] weights = + new float[] {0, 0.1f, 0.2f, 0.2f, -0.1f, 0.1f, 0.2f, 0.1f, 0.1f}; + float[] expectedWeights = + new float[] {0, 0.1f, 0.2f, 0.2f, 0, 0.1f, 0.2f, 0.1f, 0.1f}; + int[] result = new int[weights.length]; + + ArrayList<Float> weightsList = new ArrayList<>(); + for (float weight : weights) { + weightsList.add(weight); + } + + int n = 10000000; + for (i = 0; i < n; i++) { + int sample = FederationPolicyUtils.getWeightedRandom(weightsList); + result[sample]++; + } + for (i = 0; i < weights.length; i++) { + double actualWeight = (float) result[i] / n; + System.out.println(i + " " + actualWeight); + Assert.assertTrue( + "Index " + i + " Actual weight: " + actualWeight + + " expected weight: " + expectedWeights[i], + Math.abs(actualWeight - expectedWeights[i]) < 0.01); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fcfb3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 6e3a2f1..46a6011 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -157,18 +157,20 @@ public class TestLocalityMulticastAMRMProxyPolicy validateSplit(response, resourceRequests); - // based on headroom, we expect 75 containers to got to subcluster0, - // as it advertise lots of headroom (100), no containers for sublcuster1 - // as it advertise zero headroom, 1 to subcluster 2 (as it advertise little - // headroom (1), and 25 to subcluster5 which has unknown headroom, and so - // it gets 1/4th of the load - checkExpectedAllocation(response, "subcluster0", 1, 75); + /* + * based on headroom, we expect 75 containers to got to subcluster0 (60) and + * subcluster2 (15) according to the advertised headroom (40 and 10), no + * containers for sublcuster1 as it advertise zero headroom, and 25 to + * subcluster5 which has unknown headroom, and so it gets 1/4th of the load + */ + checkExpectedAllocation(response, "subcluster0", 1, 60); checkExpectedAllocation(response, "subcluster1", 1, -1); - checkExpectedAllocation(response, "subcluster2", 1, 1); + checkExpectedAllocation(response, "subcluster2", 1, 15); checkExpectedAllocation(response, "subcluster5", 1, 25); + checkTotalContainerAllocation(response, 100); // notify a change in headroom and try again - AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40); ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); response = ((FederationAMRMProxyPolicy) getPolicy()) @@ -178,14 +180,16 @@ public class TestLocalityMulticastAMRMProxyPolicy prettyPrintRequests(response); validateSplit(response, resourceRequests); - // we simulated a change in headroom for subcluster2, which will now - // have the same headroom of subcluster0 and so it splits the requests - // note that the total is still less or equal to (userAsk + numSubClusters) - checkExpectedAllocation(response, "subcluster0", 1, 38); + /* + * we simulated a change in headroom for subcluster2, which will now have + * the same headroom of subcluster0, so each 37.5, note that the odd one + * will be assigned to either one of the two subclusters + */ + checkExpectedAllocation(response, "subcluster0", 1, 37); checkExpectedAllocation(response, "subcluster1", 1, -1); - checkExpectedAllocation(response, "subcluster2", 1, 38); + checkExpectedAllocation(response, "subcluster2", 1, 37); checkExpectedAllocation(response, "subcluster5", 1, 25); - + checkTotalContainerAllocation(response, 100); } @Test(timeout = 5000) @@ -250,6 +254,7 @@ public class TestLocalityMulticastAMRMProxyPolicy checkExpectedAllocation(response, "subcluster3", -1, -1); checkExpectedAllocation(response, "subcluster4", -1, -1); checkExpectedAllocation(response, "subcluster5", -1, -1); + checkTotalContainerAllocation(response, 0); } @Test @@ -276,19 +281,19 @@ public class TestLocalityMulticastAMRMProxyPolicy validateSplit(response, resourceRequests); // in this case the headroom allocates 50 containers, while weights allocate - // the rest. due to weights we have 12.5 (round to 13) containers for each + // the rest. due to weights we have 12.5 containers for each // sublcuster, the rest is due to headroom. - checkExpectedAllocation(response, "subcluster0", 1, 50); - checkExpectedAllocation(response, "subcluster1", 1, 13); - checkExpectedAllocation(response, "subcluster2", 1, 13); + checkExpectedAllocation(response, "subcluster0", 1, 42); // 30 + 12.5 + checkExpectedAllocation(response, "subcluster1", 1, 12); // 0 + 12.5 + checkExpectedAllocation(response, "subcluster2", 1, 20); // 7.5 + 12.5 checkExpectedAllocation(response, "subcluster3", -1, -1); checkExpectedAllocation(response, "subcluster4", -1, -1); - checkExpectedAllocation(response, "subcluster5", 1, 25); - + checkExpectedAllocation(response, "subcluster5", 1, 25); // 12.5 + 12.5 + checkTotalContainerAllocation(response, 100); } private void prepPolicyWithHeadroom() throws YarnException { - AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40); ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar); @@ -296,7 +301,7 @@ public class TestLocalityMulticastAMRMProxyPolicy ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar); - ar = getAllocateResponseWithTargetHeadroom(1); + ar = getAllocateResponseWithTargetHeadroom(10); ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); } @@ -363,6 +368,9 @@ public class TestLocalityMulticastAMRMProxyPolicy // subcluster5 should get only part of the request-id 2 broadcast checkExpectedAllocation(response, "subcluster5", 1, 20); + // Check the total number of container asks in all RR + checkTotalContainerAllocation(response, 130); + // check that the allocations that show up are what expected for (ResourceRequest rr : response.get(getHomeSubCluster())) { Assert.assertTrue( @@ -401,8 +409,8 @@ public class TestLocalityMulticastAMRMProxyPolicy // response should be null private void checkExpectedAllocation( Map<SubClusterId, List<ResourceRequest>> response, String subCluster, - long totResourceRequests, long totContainers) { - if (totContainers == -1) { + long totResourceRequests, long minimumTotalContainers) { + if (minimumTotalContainers == -1) { Assert.assertNull(response.get(SubClusterId.newInstance(subCluster))); } else { SubClusterId sc = SubClusterId.newInstance(subCluster); @@ -412,8 +420,23 @@ public class TestLocalityMulticastAMRMProxyPolicy for (ResourceRequest rr : response.get(sc)) { actualContCount += rr.getNumContainers(); } - Assert.assertEquals(totContainers, actualContCount); + Assert.assertTrue( + "Actual count " + actualContCount + " should be at least " + + minimumTotalContainers, + minimumTotalContainers <= actualContCount); + } + } + + private void checkTotalContainerAllocation( + Map<SubClusterId, List<ResourceRequest>> response, long totalContainers) { + long actualContCount = 0; + for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response + .entrySet()) { + for (ResourceRequest rr : entry.getValue()) { + actualContCount += rr.getNumContainers(); + } } + Assert.assertEquals(totalContainers, actualContCount); } private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split, @@ -599,4 +622,41 @@ public class TestLocalityMulticastAMRMProxyPolicy return out; } + + public String printList(ArrayList<Integer> list) { + StringBuilder sb = new StringBuilder(); + for (Integer entry : list) { + sb.append(entry + ", "); + } + return sb.toString(); + } + + @Test + public void testIntegerAssignment() throws YarnException { + float[] weights = + new float[] {0, 0.1f, 0.2f, 0.2f, -0.1f, 0.1f, 0.2f, 0.1f, 0.1f}; + int[] expectedMin = new int[] {0, 1, 3, 3, 0, 1, 3, 1, 1}; + ArrayList<Float> weightsList = new ArrayList<>(); + for (float weight : weights) { + weightsList.add(weight); + } + + LocalityMulticastAMRMProxyPolicy policy = + (LocalityMulticastAMRMProxyPolicy) getPolicy(); + for (int i = 0; i < 500000; i++) { + ArrayList<Integer> allocations = + policy.computeIntegerAssignment(19, weightsList); + int sum = 0; + for (int j = 0; j < weights.length; j++) { + sum += allocations.get(j); + if (allocations.get(j) < expectedMin[j]) { + Assert.fail(allocations.get(j) + " at index " + j + + " should be at least " + expectedMin[j] + ". Allocation array: " + + printList(allocations)); + } + } + Assert.assertEquals( + "Expect sum to be 19 in array: " + printList(allocations), 19, sum); + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org