YARN-8697. LocalityMulticastAMRMProxyPolicy should fallback to random sub-cluster when cannot resolve resource. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ed458b2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ed458b2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ed458b2 Branch: refs/heads/HDFS-12943 Commit: 7ed458b255e492fd5bc2ca36f216ff1b16054db7 Parents: 3e18b95 Author: Giovanni Matteo Fumarola <gif...@apache.org> Authored: Tue Aug 28 16:01:35 2018 -0700 Committer: Giovanni Matteo Fumarola <gif...@apache.org> Committed: Tue Aug 28 16:01:35 2018 -0700 ---------------------------------------------------------------------- .../LocalityMulticastAMRMProxyPolicy.java | 105 +++++++++++++++---- .../TestLocalityMulticastAMRMProxyPolicy.java | 53 ++++++++-- 2 files changed, 125 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ed458b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 1ccd61c..e5f26d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -123,6 +126,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { public static final Logger LOG = LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); + private static Random rand = new Random(); + private Map<SubClusterId, Float> weights; private SubClusterResolver resolver; @@ -275,26 +280,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { } // Handle node/rack requests that the SubClusterResolver cannot map to - // any cluster. Defaulting to home subcluster. + // any cluster. Pick a random sub-cluster from active and enabled ones. + targetId = getSubClusterForUnResolvedRequest(bookkeeper, + rr.getAllocationRequestId()); if (LOG.isDebugEnabled()) { LOG.debug("ERROR resolving sub-cluster for resourceName: " - + rr.getResourceName() + " we are falling back to homeSubCluster:" - + homeSubcluster); + + rr.getResourceName() + ", picked a random subcluster to forward:" + + targetId); } - - // If home-subcluster is not active, ignore node/rack request - if (bookkeeper.isActiveAndEnabled(homeSubcluster)) { - if (targetIds != null && targetIds.size() > 0) { - bookkeeper.addRackRR(homeSubcluster, rr); - } else { - bookkeeper.addLocalizedNodeRR(homeSubcluster, rr); - } + if (targetIds != null && targetIds.size() > 0) { + bookkeeper.addRackRR(targetId, rr); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are " - + "defaulting to is not active, the ResourceRequest " - + "will be ignored."); - } + bookkeeper.addLocalizedNodeRR(targetId, rr); } } @@ -314,6 +311,14 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { } /** + * For unit test to override. + */ + protected SubClusterId getSubClusterForUnResolvedRequest( + AllocationBookkeeper bookKeeper, long allocationId) { + return bookKeeper.getSubClusterForUnResolvedRequest(allocationId); + } + + /** * It splits a list of non-localized resource requests among sub-clusters. */ private void splitAnyRequests(List<ResourceRequest> originalResourceRequests, @@ -512,10 +517,11 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * This helper class is used to book-keep the requests made to each * subcluster, and maintain useful statistics to split ANY requests. */ - private final class AllocationBookkeeper { + protected final class AllocationBookkeeper { // the answer being accumulated private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>(); + private Map<SubClusterId, Set<Long>> maskForRackDeletion = new HashMap<>(); // stores how many containers we have allocated in each RM for localized // asks, used to correctly "spread" the corresponding ANY @@ -523,6 +529,10 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { new HashMap<>(); private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>(); + // Store the randomly selected subClusterId for unresolved resource requests + // keyed by requestId + private Map<Long, SubClusterId> unResolvedRequestLocation = new HashMap<>(); + private Set<SubClusterId> activeAndEnabledSC = new HashSet<>(); private float totHeadroomMemory = 0; private int totHeadRoomEnabledRMs = 0; @@ -538,6 +548,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // reset data structures answer.clear(); + maskForRackDeletion.clear(); countContainersPerRM.clear(); totNumLocalizedContainers.clear(); activeAndEnabledSC.clear(); @@ -628,16 +639,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { .addAndGet(rr.getNumContainers()); } - internalAddToAnswer(targetId, rr); + internalAddToAnswer(targetId, rr, false); } /** * Add a rack-local request to the final asnwer. */ - public void addRackRR(SubClusterId targetId, ResourceRequest rr) { + private void addRackRR(SubClusterId targetId, ResourceRequest rr) { Preconditions .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); - internalAddToAnswer(targetId, rr); + internalAddToAnswer(targetId, rr, true); } /** @@ -646,11 +657,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { Preconditions .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName())); - internalAddToAnswer(targetId, rr); + internalAddToAnswer(targetId, rr, false); } private void internalAddToAnswer(SubClusterId targetId, - ResourceRequest partialRR) { + ResourceRequest partialRR, boolean isRack) { + if (!isRack) { + if (!maskForRackDeletion.containsKey(targetId)) { + maskForRackDeletion.put(targetId, new HashSet<Long>()); + } + maskForRackDeletion.get(targetId) + .add(partialRR.getAllocationRequestId()); + } if (!answer.containsKey(targetId)) { answer.put(targetId, new ArrayList<ResourceRequest>()); } @@ -658,6 +676,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { } /** + * For requests whose location cannot be resolved, choose an active and + * enabled sub-cluster to forward this requestId to. + */ + private SubClusterId getSubClusterForUnResolvedRequest(long allocationId) { + if (unResolvedRequestLocation.containsKey(allocationId)) { + return unResolvedRequestLocation.get(allocationId); + } + int id = rand.nextInt(activeAndEnabledSC.size()); + for (SubClusterId subclusterId : activeAndEnabledSC) { + if (id == 0) { + unResolvedRequestLocation.put(allocationId, subclusterId); + return subclusterId; + } + id--; + } + throw new RuntimeException( + "Should not be here. activeAndEnabledSC size = " + + activeAndEnabledSC.size() + " id = " + id); + } + + /** * Return all known subclusters associated with an allocation id. * * @param allocationId the allocation id considered @@ -678,6 +717,28 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * @return the answer */ private Map<SubClusterId, List<ResourceRequest>> getAnswer() { + Iterator<Entry<SubClusterId, List<ResourceRequest>>> answerIter = + answer.entrySet().iterator(); + // Remove redundant rack RR before returning the answer + while (answerIter.hasNext()) { + Entry<SubClusterId, List<ResourceRequest>> entry = answerIter.next(); + SubClusterId scId = entry.getKey(); + Set<Long> mask = maskForRackDeletion.get(scId); + if (mask != null) { + Iterator<ResourceRequest> rrIter = entry.getValue().iterator(); + while (rrIter.hasNext()) { + ResourceRequest rr = rrIter.next(); + if (!mask.contains(rr.getAllocationRequestId())) { + rrIter.remove(); + } + } + } + if (mask == null || entry.getValue().size() == 0) { + answerIter.remove(); + LOG.info("removing {} from output because it has only rack RR", + scId); + } + } return answer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ed458b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index cf9ac53..c49ab60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -69,12 +69,12 @@ public class TestLocalityMulticastAMRMProxyPolicy @Before public void setUp() throws Exception { - setPolicy(new LocalityMulticastAMRMProxyPolicy()); + setPolicy(new TestableLocalityMulticastAMRMProxyPolicy()); setPolicyInfo(new WeightedPolicyInfo()); Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>(); - // simulate 20 subclusters with a 5% chance of being inactive + // Six sub-clusters with one inactive and one disabled for (int i = 0; i < 6; i++) { SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i); // sub-cluster 3 is not active @@ -207,6 +207,7 @@ public class TestLocalityMulticastAMRMProxyPolicy getPolicyInfo().setHeadroomAlpha(1.0f); initializePolicy(); + addHomeSubClusterAsActive(); int numRR = 1000; List<ResourceRequest> resourceRequests = createLargeRandomList(numRR); @@ -324,14 +325,11 @@ public class TestLocalityMulticastAMRMProxyPolicy null, Collections.<NMToken> emptyList()); } - @Test - public void testSplitAllocateRequest() throws Exception { - - // Test a complex List<ResourceRequest> is split correctly - initializePolicy(); - - // modify default initialization to include a "homesubcluster" - // which we will use as the default for when nodes or racks are unknown + /** + * modify default initialization to include a "homesubcluster" which we will + * use as the default for when nodes or racks are unknown. + */ + private void addHomeSubClusterAsActive() { SubClusterInfo sci = mock(SubClusterInfo.class); when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); when(sci.getSubClusterId()).thenReturn(getHomeSubCluster()); @@ -340,6 +338,14 @@ public class TestLocalityMulticastAMRMProxyPolicy getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f); getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + // Test a complex List<ResourceRequest> is split correctly + initializePolicy(); + addHomeSubClusterAsActive(); FederationPoliciesTestUtil.initializePolicyContext( getFederationPolicyContext(), getPolicy(), getPolicyInfo(), @@ -502,7 +508,8 @@ public class TestLocalityMulticastAMRMProxyPolicy // Test target Ids for (SubClusterId targetId : split.keySet()) { - Assert.assertTrue("Target subclusters should be in the active set", + Assert.assertTrue( + "Target subcluster " + targetId + " should be in the active set", getActiveSubclusters().containsKey(targetId)); Assert.assertTrue( "Target subclusters (" + targetId + ") should have weight >0 in " @@ -787,4 +794,28 @@ public class TestLocalityMulticastAMRMProxyPolicy checkTotalContainerAllocation(response, 100); } + /** + * A testable version of LocalityMulticastAMRMProxyPolicy that + * deterministically falls back to home sub-cluster for unresolved requests. + */ + private class TestableLocalityMulticastAMRMProxyPolicy + extends LocalityMulticastAMRMProxyPolicy { + @Override + protected SubClusterId getSubClusterForUnResolvedRequest( + AllocationBookkeeper bookkeeper, long allocationId) { + SubClusterId originalResult = + super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId); + Map<SubClusterId, SubClusterInfo> activeClusters = null; + try { + activeClusters = getActiveSubclusters(); + } catch (YarnException e) { + throw new RuntimeException(e); + } + // The randomly selected sub-cluster should at least be active + Assert.assertTrue(activeClusters.containsKey(originalResult)); + + // Alwasy use home sub-cluster so that unit test is deterministic + return getHomeSubCluster(); + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org