YARN-8697. LocalityMulticastAMRMProxyPolicy should fallback to random 
sub-cluster when cannot resolve resource. Contributed by Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ed458b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ed458b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ed458b2

Branch: refs/heads/HDFS-12943
Commit: 7ed458b255e492fd5bc2ca36f216ff1b16054db7
Parents: 3e18b95
Author: Giovanni Matteo Fumarola <gif...@apache.org>
Authored: Tue Aug 28 16:01:35 2018 -0700
Committer: Giovanni Matteo Fumarola <gif...@apache.org>
Committed: Tue Aug 28 16:01:35 2018 -0700

----------------------------------------------------------------------
 .../LocalityMulticastAMRMProxyPolicy.java       | 105 +++++++++++++++----
 .../TestLocalityMulticastAMRMProxyPolicy.java   |  53 ++++++++--
 2 files changed, 125 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ed458b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
index 1ccd61c..e5f26d8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -21,8 +21,11 @@ package 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -123,6 +126,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
   public static final Logger LOG =
       LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
 
+  private static Random rand = new Random();
+
   private Map<SubClusterId, Float> weights;
   private SubClusterResolver resolver;
 
@@ -275,26 +280,18 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
       }
 
       // Handle node/rack requests that the SubClusterResolver cannot map to
-      // any cluster. Defaulting to home subcluster.
+      // any cluster. Pick a random sub-cluster from active and enabled ones.
+      targetId = getSubClusterForUnResolvedRequest(bookkeeper,
+          rr.getAllocationRequestId());
       if (LOG.isDebugEnabled()) {
         LOG.debug("ERROR resolving sub-cluster for resourceName: "
-            + rr.getResourceName() + " we are falling back to homeSubCluster:"
-            + homeSubcluster);
+            + rr.getResourceName() + ", picked a random subcluster to forward:"
+            + targetId);
       }
-
-      // If home-subcluster is not active, ignore node/rack request
-      if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
-        if (targetIds != null && targetIds.size() > 0) {
-          bookkeeper.addRackRR(homeSubcluster, rr);
-        } else {
-          bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
-        }
+      if (targetIds != null && targetIds.size() > 0) {
+        bookkeeper.addRackRR(targetId, rr);
       } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
-              + "defaulting to is not active, the ResourceRequest "
-              + "will be ignored.");
-        }
+        bookkeeper.addLocalizedNodeRR(targetId, rr);
       }
     }
 
@@ -314,6 +311,14 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
   }
 
   /**
+   * For unit test to override.
+   */
+  protected SubClusterId getSubClusterForUnResolvedRequest(
+      AllocationBookkeeper bookKeeper, long allocationId) {
+    return bookKeeper.getSubClusterForUnResolvedRequest(allocationId);
+  }
+
+  /**
    * It splits a list of non-localized resource requests among sub-clusters.
    */
   private void splitAnyRequests(List<ResourceRequest> originalResourceRequests,
@@ -512,10 +517,11 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
    * This helper class is used to book-keep the requests made to each
    * subcluster, and maintain useful statistics to split ANY requests.
    */
-  private final class AllocationBookkeeper {
+  protected final class AllocationBookkeeper {
 
     // the answer being accumulated
     private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>();
+    private Map<SubClusterId, Set<Long>> maskForRackDeletion = new HashMap<>();
 
     // stores how many containers we have allocated in each RM for localized
     // asks, used to correctly "spread" the corresponding ANY
@@ -523,6 +529,10 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
         new HashMap<>();
     private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
 
+    // Store the randomly selected subClusterId for unresolved resource 
requests
+    // keyed by requestId
+    private Map<Long, SubClusterId> unResolvedRequestLocation = new 
HashMap<>();
+
     private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
     private float totHeadroomMemory = 0;
     private int totHeadRoomEnabledRMs = 0;
@@ -538,6 +548,7 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
 
       // reset data structures
       answer.clear();
+      maskForRackDeletion.clear();
       countContainersPerRM.clear();
       totNumLocalizedContainers.clear();
       activeAndEnabledSC.clear();
@@ -628,16 +639,16 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
             .addAndGet(rr.getNumContainers());
       }
 
-      internalAddToAnswer(targetId, rr);
+      internalAddToAnswer(targetId, rr, false);
     }
 
     /**
      * Add a rack-local request to the final asnwer.
      */
-    public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
+    private void addRackRR(SubClusterId targetId, ResourceRequest rr) {
       Preconditions
           .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
-      internalAddToAnswer(targetId, rr);
+      internalAddToAnswer(targetId, rr, true);
     }
 
     /**
@@ -646,11 +657,18 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
       Preconditions
           .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
-      internalAddToAnswer(targetId, rr);
+      internalAddToAnswer(targetId, rr, false);
     }
 
     private void internalAddToAnswer(SubClusterId targetId,
-        ResourceRequest partialRR) {
+        ResourceRequest partialRR, boolean isRack) {
+      if (!isRack) {
+        if (!maskForRackDeletion.containsKey(targetId)) {
+          maskForRackDeletion.put(targetId, new HashSet<Long>());
+        }
+        maskForRackDeletion.get(targetId)
+            .add(partialRR.getAllocationRequestId());
+      }
       if (!answer.containsKey(targetId)) {
         answer.put(targetId, new ArrayList<ResourceRequest>());
       }
@@ -658,6 +676,27 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     }
 
     /**
+     * For requests whose location cannot be resolved, choose an active and
+     * enabled sub-cluster to forward this requestId to.
+     */
+    private SubClusterId getSubClusterForUnResolvedRequest(long allocationId) {
+      if (unResolvedRequestLocation.containsKey(allocationId)) {
+        return unResolvedRequestLocation.get(allocationId);
+      }
+      int id = rand.nextInt(activeAndEnabledSC.size());
+      for (SubClusterId subclusterId : activeAndEnabledSC) {
+        if (id == 0) {
+          unResolvedRequestLocation.put(allocationId, subclusterId);
+          return subclusterId;
+        }
+        id--;
+      }
+      throw new RuntimeException(
+          "Should not be here. activeAndEnabledSC size = "
+              + activeAndEnabledSC.size() + " id = " + id);
+    }
+
+    /**
      * Return all known subclusters associated with an allocation id.
      *
      * @param allocationId the allocation id considered
@@ -678,6 +717,28 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
      * @return the answer
      */
     private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
+      Iterator<Entry<SubClusterId, List<ResourceRequest>>> answerIter =
+          answer.entrySet().iterator();
+      // Remove redundant rack RR before returning the answer
+      while (answerIter.hasNext()) {
+        Entry<SubClusterId, List<ResourceRequest>> entry = answerIter.next();
+        SubClusterId scId = entry.getKey();
+        Set<Long> mask = maskForRackDeletion.get(scId);
+        if (mask != null) {
+          Iterator<ResourceRequest> rrIter = entry.getValue().iterator();
+          while (rrIter.hasNext()) {
+            ResourceRequest rr = rrIter.next();
+            if (!mask.contains(rr.getAllocationRequestId())) {
+              rrIter.remove();
+            }
+          }
+        }
+        if (mask == null || entry.getValue().size() == 0) {
+          answerIter.remove();
+          LOG.info("removing {} from output because it has only rack RR",
+              scId);
+        }
+      }
       return answer;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ed458b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
index cf9ac53..c49ab60 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -69,12 +69,12 @@ public class TestLocalityMulticastAMRMProxyPolicy
 
   @Before
   public void setUp() throws Exception {
-    setPolicy(new LocalityMulticastAMRMProxyPolicy());
+    setPolicy(new TestableLocalityMulticastAMRMProxyPolicy());
     setPolicyInfo(new WeightedPolicyInfo());
     Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
     Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
 
-    // simulate 20 subclusters with a 5% chance of being inactive
+    // Six sub-clusters with one inactive and one disabled
     for (int i = 0; i < 6; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
       // sub-cluster 3 is not active
@@ -207,6 +207,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     getPolicyInfo().setHeadroomAlpha(1.0f);
 
     initializePolicy();
+    addHomeSubClusterAsActive();
 
     int numRR = 1000;
     List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
@@ -324,14 +325,11 @@ public class TestLocalityMulticastAMRMProxyPolicy
         null, Collections.<NMToken> emptyList());
   }
 
-  @Test
-  public void testSplitAllocateRequest() throws Exception {
-
-    // Test a complex List<ResourceRequest> is split correctly
-    initializePolicy();
-
-    // modify default initialization to include a "homesubcluster"
-    // which we will use as the default for when nodes or racks are unknown
+  /**
+   * modify default initialization to include a "homesubcluster" which we will
+   * use as the default for when nodes or racks are unknown.
+   */
+  private void addHomeSubClusterAsActive() {
     SubClusterInfo sci = mock(SubClusterInfo.class);
     when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
     when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
@@ -340,6 +338,14 @@ public class TestLocalityMulticastAMRMProxyPolicy
 
     getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
     getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
+  }
+
+  @Test
+  public void testSplitAllocateRequest() throws Exception {
+
+    // Test a complex List<ResourceRequest> is split correctly
+    initializePolicy();
+    addHomeSubClusterAsActive();
 
     FederationPoliciesTestUtil.initializePolicyContext(
         getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
@@ -502,7 +508,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
 
     // Test target Ids
     for (SubClusterId targetId : split.keySet()) {
-      Assert.assertTrue("Target subclusters should be in the active set",
+      Assert.assertTrue(
+          "Target subcluster " + targetId + " should be in the active set",
           getActiveSubclusters().containsKey(targetId));
       Assert.assertTrue(
           "Target subclusters (" + targetId + ") should have weight >0 in "
@@ -787,4 +794,28 @@ public class TestLocalityMulticastAMRMProxyPolicy
     checkTotalContainerAllocation(response, 100);
   }
 
+  /**
+   * A testable version of LocalityMulticastAMRMProxyPolicy that
+   * deterministically falls back to home sub-cluster for unresolved requests.
+   */
+  private class TestableLocalityMulticastAMRMProxyPolicy
+      extends LocalityMulticastAMRMProxyPolicy {
+    @Override
+    protected SubClusterId getSubClusterForUnResolvedRequest(
+        AllocationBookkeeper bookkeeper, long allocationId) {
+      SubClusterId originalResult =
+          super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId);
+      Map<SubClusterId, SubClusterInfo> activeClusters = null;
+      try {
+        activeClusters = getActiveSubclusters();
+      } catch (YarnException e) {
+        throw new RuntimeException(e);
+      }
+      // The randomly selected sub-cluster should at least be active
+      Assert.assertTrue(activeClusters.containsKey(originalResult));
+
+      // Alwasy use home sub-cluster so that unit test is deterministic
+      return getHomeSubCluster();
+    }
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to