Repository: hadoop
Updated Branches:
  refs/heads/YARN-2915 365201fd9 -> 0c1b79dd4 (forced update)


YARN-6190. Validation and synchronization fixes in 
LocalityMulticastAMRMProxyPolicy. (Botong Huang via curino)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9a4fe582
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9a4fe582
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9a4fe582

Branch: refs/heads/YARN-2915
Commit: 9a4fe58213280ebbff9c9aed0f71ffdab8665706
Parents: 3b77ded
Author: Carlo Curino <cur...@apache.org>
Authored: Tue Feb 28 17:04:20 2017 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Tue Jun 20 16:07:34 2017 -0700

----------------------------------------------------------------------
 .../LocalityMulticastAMRMProxyPolicy.java       | 63 +++++++++++++-------
 .../TestLocalityMulticastAMRMProxyPolicy.java   | 21 ++++++-
 .../policies/manager/BasePolicyManagerTest.java |  3 -
 .../resolver/TestDefaultSubClusterResolver.java |  9 ++-
 .../utils/FederationPoliciesTestUtil.java       |  6 +-
 5 files changed, 73 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
index 283f89e..6f97a51 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@@ -143,10 +144,9 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
     boolean allInactive = true;
     WeightedPolicyInfo policy = getPolicyInfo();
-    if (policy.getAMRMPolicyWeights() == null
-        || policy.getAMRMPolicyWeights().size() == 0) {
-      allInactive = false;
-    } else {
+
+    if (policy.getAMRMPolicyWeights() != null
+        && policy.getAMRMPolicyWeights().size() > 0) {
       for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
           .entrySet()) {
         if (e.getValue() > 0) {
@@ -180,7 +180,6 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
 
     this.federationFacade =
         policyContext.getFederationStateStoreFacade();
-    this.bookkeeper = new AllocationBookkeeper();
     this.homeSubcluster = policyContext.getHomeSubcluster();
 
   }
@@ -197,7 +196,9 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
       List<ResourceRequest> resourceRequests) throws YarnException {
 
     // object used to accumulate statistics about the answer, initialize with
-    // active subclusters.
+    // active subclusters. Create a new instance per call because this method
+    // can be called concurrently.
+    bookkeeper = new AllocationBookkeeper();
     bookkeeper.reinitialize(federationFacade.getSubClusters(true));
 
     List<ResourceRequest> nonLocalizedRequests =
@@ -238,12 +239,16 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
         // we log altogether later
       }
       if (targetIds != null && targetIds.size() > 0) {
+        boolean hasActive = false;
         for (SubClusterId tid : targetIds) {
           if (bookkeeper.isActiveAndEnabled(tid)) {
             bookkeeper.addRackRR(tid, rr);
+            hasActive = true;
           }
         }
-        continue;
+        if (hasActive) {
+          continue;
+        }
       }
 
       // Handle node/rack requests that the SubClusterResolver cannot map to
@@ -347,7 +352,7 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
                 originalResourceRequest.getExecutionTypeRequest());
         out.setAllocationRequestId(allocationId);
         out.setNumContainers((int) Math.ceil(numContainer));
-        if (out.isAnyLocation(out.getResourceName())) {
+        if (ResourceRequest.isAnyLocation(out.getResourceName())) {
           allocationBookkeeper.addAnyRR(targetId, out);
         } else {
           allocationBookkeeper.addRackRR(targetId, out);
@@ -362,7 +367,7 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
    */
   private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
       AllocationBookkeeper allocationBookkeeper) {
-    float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
+    float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(reqId);
     float localWeight =
         allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
     return totWeight > 0 ? localWeight / totWeight : 0;
@@ -375,7 +380,7 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
   private float getPolicyConfigWeighting(SubClusterId targetId,
       AllocationBookkeeper allocationBookkeeper) {
     float totWeight = allocationBookkeeper.totPolicyWeight;
-    Float localWeight = weights.get(targetId);
+    Float localWeight = allocationBookkeeper.policyWeights.get(targetId);
     return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 
0;
   }
 
@@ -424,29 +429,36 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     // asks, used to correctly "spread" the corresponding ANY
     private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
         new HashMap<>();
+    private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
 
     private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
-    private long totNumLocalizedContainers = 0;
     private float totHeadroomMemory = 0;
     private int totHeadRoomEnabledRMs = 0;
+    private Map<SubClusterId, Float> policyWeights;
     private float totPolicyWeight = 0;
 
     private void reinitialize(
         Map<SubClusterId, SubClusterInfo> activeSubclusters)
         throws YarnException {
+      if (activeSubclusters == null) {
+        throw new YarnRuntimeException("null activeSubclusters received");
+      }
 
       // reset data structures
       answer.clear();
       countContainersPerRM.clear();
+      totNumLocalizedContainers.clear();
       activeAndEnabledSC.clear();
-      totNumLocalizedContainers = 0;
       totHeadroomMemory = 0;
       totHeadRoomEnabledRMs = 0;
+      // save the reference locally in case the weights get reinitialized
+      // concurrently
+      policyWeights = weights;
       totPolicyWeight = 0;
 
       // pre-compute the set of subclusters that are both active and enabled by
       // the policy weights, and accumulate their total weight
-      for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
+      for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) {
         if (entry.getValue() > 0
             && activeSubclusters.containsKey(entry.getKey())) {
           activeAndEnabledSC.add(entry.getKey());
@@ -467,7 +479,6 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
           totHeadRoomEnabledRMs++;
         }
       }
-
     }
 
     /**
@@ -475,7 +486,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
      * on a per-allocation-id and per-subcluster bases.
      */
     private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) 
{
-      Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+      Preconditions
+          .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
 
       if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
         countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
@@ -488,7 +500,12 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
       countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
           .addAndGet(rr.getNumContainers());
 
-      totNumLocalizedContainers += rr.getNumContainers();
+      if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) 
{
+        totNumLocalizedContainers.put(rr.getAllocationRequestId(),
+            new AtomicLong(0));
+      }
+      totNumLocalizedContainers.get(rr.getAllocationRequestId())
+          .addAndGet(rr.getNumContainers());
 
       internalAddToAnswer(targetId, rr);
     }
@@ -497,7 +514,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
      * Add a rack-local request to the final asnwer.
      */
     public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
-      Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+      Preconditions
+          .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
       internalAddToAnswer(targetId, rr);
     }
 
@@ -505,7 +523,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
      * Add an ANY request to the final answer.
      */
     private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
-      Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
+      Preconditions
+          .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
       internalAddToAnswer(targetId, rr);
     }
 
@@ -552,10 +571,12 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     }
 
     /**
-     * Return the total number of container coming from localized requests.
+     * Return the total number of container coming from localized requests
+     * matching an allocation Id.
      */
-    private long getTotNumLocalizedContainers() {
-      return totNumLocalizedContainers;
+    private long getTotNumLocalizedContainers(long allocationId) {
+      AtomicLong c = totNumLocalizedContainers.get(allocationId);
+      return c == null ? 0 : c.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
index 2654a06..5b3cf74 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import 
org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import 
org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -117,6 +119,21 @@ public class TestLocalityMulticastAMRMProxyPolicy
         getActiveSubclusters());
   }
 
+  @Test(expected = FederationPolicyInitializationException.class)
+  public void testNullWeights() throws Exception {
+    getPolicyInfo().setAMRMPolicyWeights(null);
+    initializePolicy();
+    fail();
+  }
+
+  @Test(expected = FederationPolicyInitializationException.class)
+  public void testEmptyWeights() throws Exception {
+    getPolicyInfo()
+        .setAMRMPolicyWeights(new HashMap<SubClusterIdInfo, Float>());
+    initializePolicy();
+    fail();
+  }
+
   @Test
   public void testSplitBasedOnHeadroom() throws Exception {
 
@@ -154,7 +171,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
     ((FederationAMRMProxyPolicy) getPolicy())
         .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
-    ((FederationAMRMProxyPolicy) getPolicy())
+    response = ((FederationAMRMProxyPolicy) getPolicy())
         .splitResourceRequests(resourceRequests);
 
     LOG.info("After headroom update");
@@ -332,7 +349,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
 
     // we expect 5 entry for subcluster1 (4 from request-id 1, and part
     // of the broadcast of request-id 2
-    checkExpectedAllocation(response, "subcluster1", 5, 25);
+    checkExpectedAllocation(response, "subcluster1", 5, 26);
 
     // sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
     // broadcast of request-id 2, and no request-id 0

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
index 3cf73b6..bd99cb5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
@@ -89,9 +89,6 @@ public abstract class BasePolicyManagerTest {
     FederationAMRMProxyPolicy federationAMRMProxyPolicy =
         wfp2.getAMRMPolicy(context, null);
 
-    // needed only for tests (getARMRMPolicy change the "type" in conf)
-    fpc.setType(wfp.getClass().getCanonicalName());
-
     FederationRouterPolicy federationRouterPolicy =
         wfp2.getRouterPolicy(context, null);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
index 7396942..25d246e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.federation.resolver;
 
+import java.io.File;
 import java.net.URL;
 import java.util.HashSet;
 import java.util.Set;
@@ -46,8 +47,10 @@ public class TestDefaultSubClusterResolver {
       throw new RuntimeException(
           "Could not find 'nodes' dummy file in classpath");
     }
+    // This will get rid of the beginning '/' in the url in Windows env
+    File file = new File(url.getPath());
 
-    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
     resolver.setConf(conf);
     resolver.load();
   }
@@ -62,8 +65,10 @@ public class TestDefaultSubClusterResolver {
       throw new RuntimeException(
           "Could not find 'nodes-malformed' dummy file in classpath");
     }
+    // This will get rid of the beginning '/' in the url in Windows env
+    File file = new File(url.getPath());
 
-    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
     resolver.setConf(conf);
     resolver.load();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index 85fdc96..acc14dd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.*;
 import org.apache.hadoop.yarn.util.Records;
 
+import java.io.File;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -162,7 +163,10 @@ public final class FederationPoliciesTestUtil {
       throw new RuntimeException(
           "Could not find 'nodes' dummy file in classpath");
     }
-    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+    // This will get rid of the beginning '/' in the url in Windows env
+    File file = new File(url.getPath());
+
+    conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
     resolver.setConf(conf);
     resolver.load();
     return resolver;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to