Repository: hadoop Updated Branches: refs/heads/YARN-2915 365201fd9 -> 0c1b79dd4 (forced update)
YARN-6190. Validation and synchronization fixes in LocalityMulticastAMRMProxyPolicy. (Botong Huang via curino) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9a4fe582 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9a4fe582 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9a4fe582 Branch: refs/heads/YARN-2915 Commit: 9a4fe58213280ebbff9c9aed0f71ffdab8665706 Parents: 3b77ded Author: Carlo Curino <cur...@apache.org> Authored: Tue Feb 28 17:04:20 2017 -0800 Committer: Subru Krishnan <su...@apache.org> Committed: Tue Jun 20 16:07:34 2017 -0700 ---------------------------------------------------------------------- .../LocalityMulticastAMRMProxyPolicy.java | 63 +++++++++++++------- .../TestLocalityMulticastAMRMProxyPolicy.java | 21 ++++++- .../policies/manager/BasePolicyManagerTest.java | 3 - .../resolver/TestDefaultSubClusterResolver.java | 9 ++- .../utils/FederationPoliciesTestUtil.java | 6 +- 5 files changed, 73 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 283f89e..6f97a51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; @@ -143,10 +144,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { Map<SubClusterId, Float> newWeightsConverted = new HashMap<>(); boolean allInactive = true; WeightedPolicyInfo policy = getPolicyInfo(); - if (policy.getAMRMPolicyWeights() == null - || policy.getAMRMPolicyWeights().size() == 0) { - allInactive = false; - } else { + + if (policy.getAMRMPolicyWeights() != null + && policy.getAMRMPolicyWeights().size() > 0) { for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights() .entrySet()) { if (e.getValue() > 0) { @@ -180,7 +180,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { this.federationFacade = policyContext.getFederationStateStoreFacade(); - this.bookkeeper = new AllocationBookkeeper(); this.homeSubcluster = policyContext.getHomeSubcluster(); } @@ -197,7 +196,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { List<ResourceRequest> resourceRequests) throws YarnException { // object used to accumulate statistics about the answer, initialize with - // active subclusters. + // active subclusters. Create a new instance per call because this method + // can be called concurrently. + bookkeeper = new AllocationBookkeeper(); bookkeeper.reinitialize(federationFacade.getSubClusters(true)); List<ResourceRequest> nonLocalizedRequests = @@ -238,12 +239,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // we log altogether later } if (targetIds != null && targetIds.size() > 0) { + boolean hasActive = false; for (SubClusterId tid : targetIds) { if (bookkeeper.isActiveAndEnabled(tid)) { bookkeeper.addRackRR(tid, rr); + hasActive = true; } } - continue; + if (hasActive) { + continue; + } } // Handle node/rack requests that the SubClusterResolver cannot map to @@ -347,7 +352,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { originalResourceRequest.getExecutionTypeRequest()); out.setAllocationRequestId(allocationId); out.setNumContainers((int) Math.ceil(numContainer)); - if (out.isAnyLocation(out.getResourceName())) { + if (ResourceRequest.isAnyLocation(out.getResourceName())) { allocationBookkeeper.addAnyRR(targetId, out); } else { allocationBookkeeper.addRackRR(targetId, out); @@ -362,7 +367,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { */ private float getLocalityBasedWeighting(long reqId, SubClusterId targetId, AllocationBookkeeper allocationBookkeeper) { - float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(); + float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(reqId); float localWeight = allocationBookkeeper.getNumLocalizedContainers(reqId, targetId); return totWeight > 0 ? localWeight / totWeight : 0; @@ -375,7 +380,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private float getPolicyConfigWeighting(SubClusterId targetId, AllocationBookkeeper allocationBookkeeper) { float totWeight = allocationBookkeeper.totPolicyWeight; - Float localWeight = weights.get(targetId); + Float localWeight = allocationBookkeeper.policyWeights.get(targetId); return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0; } @@ -424,29 +429,36 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // asks, used to correctly "spread" the corresponding ANY private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM = new HashMap<>(); + private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>(); private Set<SubClusterId> activeAndEnabledSC = new HashSet<>(); - private long totNumLocalizedContainers = 0; private float totHeadroomMemory = 0; private int totHeadRoomEnabledRMs = 0; + private Map<SubClusterId, Float> policyWeights; private float totPolicyWeight = 0; private void reinitialize( Map<SubClusterId, SubClusterInfo> activeSubclusters) throws YarnException { + if (activeSubclusters == null) { + throw new YarnRuntimeException("null activeSubclusters received"); + } // reset data structures answer.clear(); countContainersPerRM.clear(); + totNumLocalizedContainers.clear(); activeAndEnabledSC.clear(); - totNumLocalizedContainers = 0; totHeadroomMemory = 0; totHeadRoomEnabledRMs = 0; + // save the reference locally in case the weights get reinitialized + // concurrently + policyWeights = weights; totPolicyWeight = 0; // pre-compute the set of subclusters that are both active and enabled by // the policy weights, and accumulate their total weight - for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) { + for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) { if (entry.getValue() > 0 && activeSubclusters.containsKey(entry.getKey())) { activeAndEnabledSC.add(entry.getKey()); @@ -467,7 +479,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { totHeadRoomEnabledRMs++; } } - } /** @@ -475,7 +486,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * on a per-allocation-id and per-subcluster bases. */ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) { - Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + Preconditions + .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); @@ -488,7 +500,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) .addAndGet(rr.getNumContainers()); - totNumLocalizedContainers += rr.getNumContainers(); + if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) { + totNumLocalizedContainers.put(rr.getAllocationRequestId(), + new AtomicLong(0)); + } + totNumLocalizedContainers.get(rr.getAllocationRequestId()) + .addAndGet(rr.getNumContainers()); internalAddToAnswer(targetId, rr); } @@ -497,7 +514,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * Add a rack-local request to the final asnwer. */ public void addRackRR(SubClusterId targetId, ResourceRequest rr) { - Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + Preconditions + .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); internalAddToAnswer(targetId, rr); } @@ -505,7 +523,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * Add an ANY request to the final answer. */ private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { - Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName())); + Preconditions + .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName())); internalAddToAnswer(targetId, rr); } @@ -552,10 +571,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { } /** - * Return the total number of container coming from localized requests. + * Return the total number of container coming from localized requests + * matching an allocation Id. */ - private long getTotNumLocalizedContainers() { - return totNumLocalizedContainers; + private long getTotNumLocalizedContainers(long allocationId) { + AtomicLong c = totNumLocalizedContainers.get(allocationId); + return c == null ? 0 : c.get(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 2654a06..5b3cf74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -117,6 +119,21 @@ public class TestLocalityMulticastAMRMProxyPolicy getActiveSubclusters()); } + @Test(expected = FederationPolicyInitializationException.class) + public void testNullWeights() throws Exception { + getPolicyInfo().setAMRMPolicyWeights(null); + initializePolicy(); + fail(); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testEmptyWeights() throws Exception { + getPolicyInfo() + .setAMRMPolicyWeights(new HashMap<SubClusterIdInfo, Float>()); + initializePolicy(); + fail(); + } + @Test public void testSplitBasedOnHeadroom() throws Exception { @@ -154,7 +171,7 @@ public class TestLocalityMulticastAMRMProxyPolicy AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); - ((FederationAMRMProxyPolicy) getPolicy()) + response = ((FederationAMRMProxyPolicy) getPolicy()) .splitResourceRequests(resourceRequests); LOG.info("After headroom update"); @@ -332,7 +349,7 @@ public class TestLocalityMulticastAMRMProxyPolicy // we expect 5 entry for subcluster1 (4 from request-id 1, and part // of the broadcast of request-id 2 - checkExpectedAllocation(response, "subcluster1", 5, 25); + checkExpectedAllocation(response, "subcluster1", 5, 26); // sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the // broadcast of request-id 2, and no request-id 0 http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java index 3cf73b6..bd99cb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java @@ -89,9 +89,6 @@ public abstract class BasePolicyManagerTest { FederationAMRMProxyPolicy federationAMRMProxyPolicy = wfp2.getAMRMPolicy(context, null); - // needed only for tests (getARMRMPolicy change the "type" in conf) - fpc.setType(wfp.getClass().getCanonicalName()); - FederationRouterPolicy federationRouterPolicy = wfp2.getRouterPolicy(context, null); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java index 7396942..25d246e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.resolver; +import java.io.File; import java.net.URL; import java.util.HashSet; import java.util.Set; @@ -46,8 +47,10 @@ public class TestDefaultSubClusterResolver { throw new RuntimeException( "Could not find 'nodes' dummy file in classpath"); } + // This will get rid of the beginning '/' in the url in Windows env + File file = new File(url.getPath()); - conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath()); + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath()); resolver.setConf(conf); resolver.load(); } @@ -62,8 +65,10 @@ public class TestDefaultSubClusterResolver { throw new RuntimeException( "Could not find 'nodes-malformed' dummy file in classpath"); } + // This will get rid of the beginning '/' in the url in Windows env + File file = new File(url.getPath()); - conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath()); + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath()); resolver.setConf(conf); resolver.load(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a4fe582/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index 85fdc96..acc14dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.*; import org.apache.hadoop.yarn.util.Records; +import java.io.File; import java.net.URL; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -162,7 +163,10 @@ public final class FederationPoliciesTestUtil { throw new RuntimeException( "Could not find 'nodes' dummy file in classpath"); } - conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath()); + // This will get rid of the beginning '/' in the url in Windows env + File file = new File(url.getPath()); + + conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath()); resolver.setConf(conf); resolver.load(); return resolver; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org