http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index e57709f..5de749f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,8 +17,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.Map; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -30,34 +30,27 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; -import java.util.Map; - /** * This implements a simple load-balancing policy. The policy "weights" are * binary 0/1 values that enable/disable each sub-cluster, and the policy peaks * the sub-cluster with the least load to forward this application. */ -public class LoadBasedRouterPolicy - extends BaseWeightedRouterPolicy { - - private static final Log LOG = - LogFactory.getLog(LoadBasedRouterPolicy.class); +public class LoadBasedRouterPolicy extends AbstractRouterPolicy { @Override - public void reinitialize(FederationPolicyInitializationContext - federationPolicyContext) + public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { // remember old policyInfo WeightedPolicyInfo tempPolicy = getPolicyInfo(); - //attempt new initialization - super.reinitialize(federationPolicyContext); + // attempt new initialization + super.reinitialize(policyContext); - //check extra constraints + // check extra constraints for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) { if (weight != 0 && weight != 1) { - //reset to old policyInfo if check fails + // reset to old policyInfo if check fails setPolicyInfo(tempPolicy); throw new FederationPolicyInitializationException( this.getClass().getCanonicalName() @@ -69,18 +62,16 @@ public class LoadBasedRouterPolicy @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map<SubClusterId, SubClusterInfo> activeSubclusters = getActiveSubclusters(); - Map<SubClusterIdInfo, Float> weights = getPolicyInfo() - .getRouterPolicyWeights(); + Map<SubClusterIdInfo, Float> weights = + getPolicyInfo().getRouterPolicyWeights(); SubClusterIdInfo chosen = null; long currBestMem = -1; - for (Map.Entry<SubClusterId, SubClusterInfo> entry : - activeSubclusters + for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters .entrySet()) { SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey()); if (weights.containsKey(id) && weights.get(id) > 0) { @@ -95,8 +86,7 @@ public class LoadBasedRouterPolicy return chosen.toId(); } - private long getAvailableMemory(SubClusterInfo value) - throws YarnException { + private long getAvailableMemory(SubClusterInfo value) throws YarnException { try { long mem = -1; JSONObject obj = new JSONObject(value.getCapability());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index a8ac5f7..bc3a1f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,39 +17,32 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.Map; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import java.util.Map; - /** * This implements a policy that interprets "weights" as a ordered list of * preferences among sub-clusters. Highest weight among active subclusters is * chosen. */ -public class PriorityRouterPolicy - extends BaseWeightedRouterPolicy { - - private static final Log LOG = - LogFactory.getLog(PriorityRouterPolicy.class); +public class PriorityRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map<SubClusterId, SubClusterInfo> activeSubclusters = getActiveSubclusters(); // This finds the sub-cluster with the highest weight among the // currently active ones. - Map<SubClusterIdInfo, Float> weights = getPolicyInfo() - .getRouterPolicyWeights(); + Map<SubClusterIdInfo, Float> weights = + getPolicyInfo().getRouterPolicyWeights(); SubClusterId chosen = null; Float currentBest = Float.MIN_VALUE; for (SubClusterId id : activeSubclusters.keySet()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index 1774961..b8f9cc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -17,6 +17,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -25,11 +30,6 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; - /** * This simple policy picks at uniform random among any of the currently active * subclusters. This policy is easy to use and good for testing. @@ -39,7 +39,7 @@ import java.util.Random; * of the "weights", in which case the {@link UniformRandomRouterPolicy} send * load to them, while {@code WeightedRandomRouterPolicy} does not. */ -public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy { +public class UniformRandomRouterPolicy extends AbstractRouterPolicy { private Random rand; @@ -49,14 +49,14 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy { @Override public void reinitialize( - FederationPolicyInitializationContext federationPolicyContext) + FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { FederationPolicyInitializationContextValidator - .validate(federationPolicyContext, this.getClass().getCanonicalName()); + .validate(policyContext, this.getClass().getCanonicalName()); - //note: this overrides BaseWeighterRouterPolicy and ignores the weights + // note: this overrides AbstractRouterPolicy and ignores the weights - setPolicyContext(federationPolicyContext); + setPolicyContext(policyContext); } /** @@ -64,21 +64,19 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy { * depend on the weights in the policy). * * @param appSubmissionContext the context for the app being submitted - * (ignored). + * (ignored). * * @return a randomly chosen subcluster. * * @throws YarnException if there are no active subclusters. */ public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map<SubClusterId, SubClusterInfo> activeSubclusters = getActiveSubclusters(); - List<SubClusterId> list = - new ArrayList<>(activeSubclusters.keySet()); + List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); return list.get(rand.nextInt(list.size())); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index 0777677..ac75ae9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -18,32 +18,30 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; - -import java.util.Map; -import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This policy implements a weighted random sample among currently active * sub-clusters. */ -public class WeightedRandomRouterPolicy - extends BaseWeightedRouterPolicy { +public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { - private static final Log LOG = - LogFactory.getLog(WeightedRandomRouterPolicy.class); + private static final Logger LOG = + LoggerFactory.getLogger(WeightedRandomRouterPolicy.class); private Random rand = new Random(System.currentTimeMillis()); @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map<SubClusterId, SubClusterInfo> activeSubclusters = getActiveSubclusters(); @@ -52,13 +50,13 @@ public class WeightedRandomRouterPolicy // changes dynamically (and this would unfairly spread the load to // sub-clusters adjacent to an inactive one), hence we need to count/scan // the list and based on weight pick the next sub-cluster. - Map<SubClusterIdInfo, Float> weights = getPolicyInfo() - .getRouterPolicyWeights(); + Map<SubClusterIdInfo, Float> weights = + getPolicyInfo().getRouterPolicyWeights(); float totActiveWeight = 0; - for(Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()){ - if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey() - .toId())){ + for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) { + if (entry.getKey() != null + && activeSubclusters.containsKey(entry.getKey().toId())) { totActiveWeight += entry.getValue(); } } @@ -73,7 +71,7 @@ public class WeightedRandomRouterPolicy return id; } } - //should never happen + // should never happen return null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java index 5d0fcb6..e445ac3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java @@ -17,4 +17,3 @@ */ /** Router policies. **/ package org.apache.hadoop.yarn.server.federation.policies.router; - http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java index 8238633..6b4f60c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java @@ -57,11 +57,11 @@ public abstract class AbstractSubClusterResolver implements SubClusterResolver { return rackToSubClusters.get(rackname); } - protected Map<String, SubClusterId> getNodeToSubCluster() { + public Map<String, SubClusterId> getNodeToSubCluster() { return nodeToSubCluster; } - protected Map<String, Set<SubClusterId>> getRackToSubClusters() { + public Map<String, Set<SubClusterId>> getRackToSubClusters() { return rackToSubClusters; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 8da92b9..ba897da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -22,14 +22,17 @@ import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -49,6 +52,7 @@ public abstract class BaseFederationPoliciesTest { private ApplicationSubmissionContext applicationSubmissionContext = mock(ApplicationSubmissionContext.class); private Random rand = new Random(); + private SubClusterId homeSubCluster; @Test public void testReinitilialize() throws YarnException { @@ -88,16 +92,22 @@ public abstract class BaseFederationPoliciesTest { getPolicy().reinitialize(fpc); } - @Test(expected = NoActiveSubclustersException.class) + @Test(expected = FederationPolicyException.class) public void testNoSubclusters() throws YarnException { // empty the activeSubclusters map FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), getPolicyInfo(), new HashMap<>()); - ConfigurableFederationPolicy currentPolicy = getPolicy(); - if (currentPolicy instanceof FederationRouterPolicy) { - ((FederationRouterPolicy) currentPolicy) + ConfigurableFederationPolicy localPolicy = getPolicy(); + if (localPolicy instanceof FederationRouterPolicy) { + ((FederationRouterPolicy) localPolicy) .getHomeSubcluster(getApplicationSubmissionContext()); + } else { + String[] hosts = new String[] {"host1", "host2" }; + List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + ((FederationAMRMProxyPolicy) localPolicy) + .splitResourceRequests(resourceRequests); } } @@ -152,4 +162,12 @@ public abstract class BaseFederationPoliciesTest { this.rand = rand; } + public SubClusterId getHomeSubCluster() { + return homeSubCluster; + } + + public void setHomeSubCluster(SubClusterId homeSubCluster) { + this.homeSubCluster = homeSubCluster; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index e840b3f..c79fd2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -16,22 +16,20 @@ * limitations under the License. */ - package org.apache.hadoop.yarn.server.federation.policies; +import java.nio.ByteBuffer; + import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.junit.Before; import org.junit.Test; -import java.nio.ByteBuffer; - /** * Test class for {@link FederationPolicyInitializationContextValidator}. */ @@ -45,11 +43,10 @@ public class TestFederationPolicyInitializationContextValidator { @Before public void setUp() throws Exception { goodFacade = FederationPoliciesTestUtil.initFacade(); - goodConfig = - new MockPolicyManager().serializeConf(); - goodSR =FederationPoliciesTestUtil.initResolver(); - context = new - FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade); + goodConfig = new MockPolicyManager().serializeConf(); + goodSR = FederationPoliciesTestUtil.initResolver(); + context = new FederationPolicyInitializationContext(goodConfig, goodSR, + goodFacade); } @Test @@ -100,8 +97,7 @@ public class TestFederationPolicyInitializationContextValidator { @Override public FederationAMRMProxyPolicy getAMRMPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationAMRMProxyPolicy oldInstance) throws FederationPolicyInitializationException { return null; @@ -109,8 +105,7 @@ public class TestFederationPolicyInitializationContextValidator { @Override public FederationRouterPolicy getRouterPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationRouterPolicy oldInstance) throws FederationPolicyInitializationException { return null; @@ -120,8 +115,8 @@ public class TestFederationPolicyInitializationContextValidator { public SubClusterPolicyConfiguration serializeConf() throws FederationPolicyInitializationException { ByteBuffer buf = ByteBuffer.allocate(0); - return SubClusterPolicyConfiguration - .newInstance("queue1", this.getClass().getCanonicalName(), buf); + return SubClusterPolicyConfiguration.newInstance("queue1", + this.getClass().getCanonicalName(), buf); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java new file mode 100644 index 0000000..a21f53d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test class for the {@link BroadcastAMRMProxyPolicy}. + */ +public class TestBroadcastAMRMProxyFederationPolicy + extends BaseFederationPoliciesTest { + + @Before + public void setUp() throws Exception { + setPolicy(new BroadcastAMRMProxyPolicy()); + // needed for base test to work + setPolicyInfo(mock(WeightedPolicyInfo.class)); + + for (int i = 1; i <= 2; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + mock(WeightedPolicyInfo.class), getActiveSubclusters()); + + } + + @Test + public void testSplitAllocateRequest() throws Exception { + // verify the request is broadcasted to all subclusters + String[] hosts = new String[] {"host1", "host2" }; + List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + Assert.assertTrue(response.size() == 2); + for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response + .entrySet()) { + Assert.assertTrue(getActiveSubclusters().get(entry.getKey()) != null); + for (ResourceRequest r : entry.getValue()) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + for (SubClusterId subClusterId : getActiveSubclusters().keySet()) { + for (ResourceRequest r : response.get(subClusterId)) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + } + + @Test + public void testNotifyOfResponse() throws Exception { + String[] hosts = new String[] {"host1", "host2" }; + List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + try { + ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( + SubClusterId.newInstance("sc3"), mock(AllocateResponse.class)); + Assert.fail(); + } catch (FederationPolicyException f) { + System.out.println("Expected: " + f.getMessage()); + } + + ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( + SubClusterId.newInstance("sc1"), mock(AllocateResponse.class)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java new file mode 100644 index 0000000..2654a06 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple test class for the {@link LocalityMulticastAMRMProxyPolicy}. + */ +public class TestLocalityMulticastAMRMProxyPolicy + extends BaseFederationPoliciesTest { + + public static final Logger LOG = + LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class); + + @Before + public void setUp() throws Exception { + setPolicy(new LocalityMulticastAMRMProxyPolicy()); + setPolicyInfo(new WeightedPolicyInfo()); + Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); + Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 6; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i); + // sub-cluster 3 is not active + if (i != 3) { + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + + float weight = 1 / 10f; + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + // sub-cluster 4 is "disabled" in the weights + if (i == 4) { + routerWeights.put(sc, 0f); + amrmWeights.put(sc, 0f); + } + } + + getPolicyInfo().setRouterPolicyWeights(routerWeights); + getPolicyInfo().setAMRMPolicyWeights(amrmWeights); + getPolicyInfo().setHeadroomAlpha(0.5f); + setHomeSubCluster(SubClusterId.newInstance("homesubcluster")); + + } + + @Test + public void testReinitilialize() throws YarnException { + initializePolicy(); + } + + private void initializePolicy() throws YarnException { + setFederationPolicyContext(new FederationPolicyInitializationContext()); + SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver(); + getFederationPolicyContext().setFederationSubclusterResolver(resolver); + ByteBuffer buf = getPolicyInfo().toByteBuffer(); + getFederationPolicyContext().setSubClusterPolicyConfiguration( + SubClusterPolicyConfiguration.newInstance("queue1", + getPolicy().getClass().getCanonicalName(), buf)); + getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster()); + FederationPoliciesTestUtil.initializePolicyContext( + getFederationPolicyContext(), getPolicy(), getPolicyInfo(), + getActiveSubclusters()); + } + + @Test + public void testSplitBasedOnHeadroom() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + List<ResourceRequest> resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // pretty print requests + LOG.info("Initial headroom"); + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + // based on headroom, we expect 75 containers to got to subcluster0, + // as it advertise lots of headroom (100), no containers for sublcuster1 + // as it advertise zero headroom, 1 to subcluster 2 (as it advertise little + // headroom (1), and 25 to subcluster5 which has unknown headroom, and so + // it gets 1/4th of the load + checkExpectedAllocation(response, "subcluster0", 1, 75); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 1); + checkExpectedAllocation(response, "subcluster5", 1, 25); + + // notify a change in headroom and try again + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + LOG.info("After headroom update"); + prettyPrintRequests(response); + validateSplit(response, resourceRequests); + + // we simulated a change in headroom for subcluster2, which will now + // have the same headroom of subcluster0 and so it splits the requests + // note that the total is still less or equal to (userAsk + numSubClusters) + checkExpectedAllocation(response, "subcluster0", 1, 38); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 38); + checkExpectedAllocation(response, "subcluster5", 1, 25); + + } + + @Test(timeout = 5000) + public void testStressPolicy() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + + int numRR = 1000; + List<ResourceRequest> resourceRequests = createLargeRandomList(numRR); + + prepPolicyWithHeadroom(); + + int numIterations = 1000; + long tstart = System.currentTimeMillis(); + for (int i = 0; i < numIterations; i++) { + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + validateSplit(response, resourceRequests); + } + long tend = System.currentTimeMillis(); + + LOG.info("Performed " + numIterations + " policy invocations (and " + + "validations) in " + (tend - tstart) + "ms"); + } + + @Test + public void testFWDAllZeroANY() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(0.5f); + + initializePolicy(); + List<ResourceRequest> resourceRequests = createZeroSizedANYRequest(); + + // this receives responses from sc0,sc1,sc2 + prepPolicyWithHeadroom(); + + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // we expect all three to appear for a zero-sized ANY + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + // we expect the zero size request to be sent to the first 3 rm (due to + // the fact that we received responses only from these 3 sublcusters) + checkExpectedAllocation(response, "subcluster0", 1, 0); + checkExpectedAllocation(response, "subcluster1", 1, 0); + checkExpectedAllocation(response, "subcluster2", 1, 0); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", -1, -1); + } + + @Test + public void testSplitBasedOnHeadroomAndWeights() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + + // Configure policy to be 50% headroom based and 50% weight based + getPolicyInfo().setHeadroomAlpha(0.5f); + + initializePolicy(); + List<ResourceRequest> resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + // in this case the headroom allocates 50 containers, while weights allocate + // the rest. due to weights we have 12.5 (round to 13) containers for each + // sublcuster, the rest is due to headroom. + checkExpectedAllocation(response, "subcluster0", 1, 50); + checkExpectedAllocation(response, "subcluster1", 1, 13); + checkExpectedAllocation(response, "subcluster2", 1, 13); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", 1, 25); + + } + + private void prepPolicyWithHeadroom() throws YarnException { + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar); + + ar = getAllocateResponseWithTargetHeadroom(0); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar); + + ar = getAllocateResponseWithTargetHeadroom(1); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); + } + + private AllocateResponse getAllocateResponseWithTargetHeadroom( + int numContainers) { + return AllocateResponse.newInstance(0, null, null, + Collections.<NodeReport> emptyList(), + Resource.newInstance(numContainers * 1024, numContainers), null, 10, + null, Collections.<NMToken> emptyList()); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + // Test a complex List<ResourceRequest> is split correctly + initializePolicy(); + + // modify default initialization to include a "homesubcluster" + // which we will use as the default for when nodes or racks are unknown + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(getHomeSubCluster()); + getActiveSubclusters().put(getHomeSubCluster(), sci); + SubClusterIdInfo sc = new SubClusterIdInfo(getHomeSubCluster().getId()); + + getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f); + getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f); + + FederationPoliciesTestUtil.initializePolicyContext( + getFederationPolicyContext(), getPolicy(), getPolicyInfo(), + getActiveSubclusters()); + + List<ResourceRequest> resourceRequests = createComplexRequest(); + + Map<SubClusterId, List<ResourceRequest>> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + validateSplit(response, resourceRequests); + prettyPrintRequests(response); + + // we expect 4 entry for home subcluster (3 for request-id 4, and a part + // of the broadcast of request-id 2 + checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23); + + // for subcluster0 we expect 3 entry from request-id 0, and 3 from + // request-id 3, as well as part of the request-id 2 broadast + checkExpectedAllocation(response, "subcluster0", 7, 26); + + // we expect 5 entry for subcluster1 (4 from request-id 1, and part + // of the broadcast of request-id 2 + checkExpectedAllocation(response, "subcluster1", 5, 25); + + // sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the + // broadcast of request-id 2, and no request-id 0 + checkExpectedAllocation(response, "subcluster2", 4, 23); + + // subcluster id 3, 4 should not appear (due to weights or active/inactive) + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + + // subcluster5 should get only part of the request-id 2 broadcast + checkExpectedAllocation(response, "subcluster5", 1, 20); + + // check that the allocations that show up are what expected + for (ResourceRequest rr : response.get(getHomeSubCluster())) { + Assert.assertTrue(rr.getAllocationRequestId() == 4L + || rr.getAllocationRequestId() == 2L); + } + + for (ResourceRequest rr : response.get(getHomeSubCluster())) { + Assert.assertTrue(rr.getAllocationRequestId() != 1L); + } + + List<ResourceRequest> rrs = + response.get(SubClusterId.newInstance("subcluster0")); + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 1L); + } + + for (ResourceRequest rr : response + .get(SubClusterId.newInstance("subcluster2"))) { + Assert.assertTrue(rr.getAllocationRequestId() != 0L); + } + + for (ResourceRequest rr : response + .get(SubClusterId.newInstance("subcluster5"))) { + Assert.assertTrue(rr.getAllocationRequestId() >= 2); + Assert.assertTrue(rr.getRelaxLocality()); + } + } + + // check that the number of containers in the first ResourceRequest in + // response for this sub-cluster matches expectations. -1 indicate the + // response should be null + private void checkExpectedAllocation( + Map<SubClusterId, List<ResourceRequest>> response, String subCluster, + long totResourceRequests, long totContainers) { + if (totContainers == -1) { + Assert.assertNull(response.get(SubClusterId.newInstance(subCluster))); + } else { + SubClusterId sc = SubClusterId.newInstance(subCluster); + Assert.assertEquals(totResourceRequests, response.get(sc).size()); + + long actualContCount = 0; + for (ResourceRequest rr : response.get(sc)) { + actualContCount += rr.getNumContainers(); + } + Assert.assertEquals(totContainers, actualContCount); + } + } + + private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split, + List<ResourceRequest> original) throws YarnException { + + SubClusterResolver resolver = + getFederationPolicyContext().getFederationSubclusterResolver(); + + // Apply general validation rules + int numUsedSubclusters = split.size(); + + Set<Long> originalIds = new HashSet<>(); + Set<Long> splitIds = new HashSet<>(); + + int originalContainers = 0; + for (ResourceRequest rr : original) { + originalContainers += rr.getNumContainers(); + originalIds.add(rr.getAllocationRequestId()); + } + + int splitContainers = 0; + for (Map.Entry<SubClusterId, List<ResourceRequest>> rrs : split + .entrySet()) { + for (ResourceRequest rr : rrs.getValue()) { + splitContainers += rr.getNumContainers(); + splitIds.add(rr.getAllocationRequestId()); + // check node-local asks are sent to right RM (only) + SubClusterId fid = null; + try { + fid = resolver.getSubClusterForNode(rr.getResourceName()); + } catch (YarnException e) { + // ignore code will handle + } + if (!rrs.getKey().equals(getHomeSubCluster()) && fid != null + && !fid.equals(rrs.getKey())) { + Assert.fail("A node-local (or resolvable rack-local) RR should not " + + "be send to an RM other than what it resolves to."); + } + } + } + + // check we are not inventing Allocation Ids + Assert.assertEquals(originalIds, splitIds); + + // check we are not exceedingly replicating the container asks among + // RMs (a little is allowed due to rounding of fractional splits) + Assert.assertTrue( + " Containers requested (" + splitContainers + ") should " + + "not exceed the original count of containers (" + + originalContainers + ") by more than the number of subclusters (" + + numUsedSubclusters + ")", + originalContainers + numUsedSubclusters >= splitContainers); + + // Test target Ids + for (SubClusterId targetId : split.keySet()) { + Assert.assertTrue("Target subclusters should be in the active set", + getActiveSubclusters().containsKey(targetId)); + Assert.assertTrue( + "Target subclusters (" + targetId + ") should have weight >0 in " + + "the policy ", + getPolicyInfo().getRouterPolicyWeights() + .get(new SubClusterIdInfo(targetId)) > 0); + } + } + + private void prettyPrintRequests( + Map<SubClusterId, List<ResourceRequest>> response) { + for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response + .entrySet()) { + String str = ""; + for (ResourceRequest rr : entry.getValue()) { + str += " [id:" + rr.getAllocationRequestId() + " loc:" + + rr.getResourceName() + " numCont:" + rr.getNumContainers() + + "], "; + } + LOG.info(entry.getKey() + " --> " + str); + } + } + + private List<ResourceRequest> createLargeRandomList(int numRR) + throws Exception { + + List<ResourceRequest> out = new ArrayList<>(); + Random rand = new Random(1); + DefaultSubClusterResolverImpl resolver = + (DefaultSubClusterResolverImpl) getFederationPolicyContext() + .getFederationSubclusterResolver(); + + List<String> nodes = + new ArrayList<>(resolver.getNodeToSubCluster().keySet()); + + for (int i = 0; i < numRR; i++) { + String nodeName = nodes.get(rand.nextInt(nodes.size())); + long allocationId = (long) rand.nextInt(20); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId, + nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean())); + } + return out; + } + + private List<ResourceRequest> createSimpleRequest() throws Exception { + + List<ResourceRequest> out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, true)); + return out; + } + + private List<ResourceRequest> createZeroSizedANYRequest() throws Exception { + + List<ResourceRequest> out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 0, null, true)); + return out; + } + + private List<ResourceRequest> createComplexRequest() throws Exception { + + List<ResourceRequest> out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 1, null, false)); + + // create a single container request with 3 alternative hosts across sc1,sc2 + // where we want 2 containers in sc1 and 1 in sc2 + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster1-rack1-host1", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster1-rack1-host2", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster2-rack3-host3", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster1-rack1", 1024, 1, 1, 2, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster2-rack3", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + ResourceRequest.ANY, 1024, 1, 1, 2, null, false)); + + // create a non-local ANY request that can span anything + out.add(FederationPoliciesTestUtil.createResourceRequest(2L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, true)); + + // create a single container request in sc0 with relaxed locality + out.add(FederationPoliciesTestUtil.createResourceRequest(3L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, true)); + out.add(FederationPoliciesTestUtil.createResourceRequest(3L, + "subcluster0-rack0", 1024, 1, 1, 1, null, true)); + out.add(FederationPoliciesTestUtil.createResourceRequest(3L, + ResourceRequest.ANY, 1024, 1, 1, 1, null, true)); + + // create a request of an unknown node/rack and expect this to show up + // in homesubcluster + out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode", + 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack", + 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(4L, + ResourceRequest.ANY, 1024, 1, 1, 1, null, false)); + + return out; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index 9e94f72..906e35f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -17,6 +17,9 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -29,12 +32,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; - /** - * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the - * load is properly considered for allocation. + * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load + * is properly considered for allocation. */ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { @@ -47,12 +47,10 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { // simulate 20 active subclusters for (int i = 0; i < 20; i++) { - SubClusterIdInfo sc = - new SubClusterIdInfo(String.format("sc%02d", i)); + SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i)); SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1, - SubClusterState.SC_RUNNING, -1, - generateClusterMetricsInfo(i)); + SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i)); getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); float weight = getRand().nextInt(2); if (i == 5) { @@ -76,7 +74,7 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { private String generateClusterMetricsInfo(int id) { long mem = 1024 * getRand().nextInt(277 * 100 - 1); - //plant a best cluster + // plant a best cluster if (id == 5) { mem = 1024 * 277 * 100; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index ff5175d..eefcfd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -16,6 +16,12 @@ */ package org.apache.hadoop.yarn.server.federation.policies.router; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -28,12 +34,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Simple test class for the {@link PriorityRouterPolicy}. Tests that the * weights are correctly used for ordering the choice of sub-clusters. @@ -72,8 +72,7 @@ public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest { getPolicyInfo().setRouterPolicyWeights(routerWeights); getPolicyInfo().setAMRMPolicyWeights(amrmWeights); FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), - getActiveSubclusters()); + getPolicyInfo(), getActiveSubclusters()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index a612685..78967d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -17,6 +17,13 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -29,13 +36,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large * number of randomized tests to check we are weighiting correctly even if @@ -71,8 +71,7 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest { getPolicyInfo().setAMRMPolicyWeights(amrmWeights); FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), - getActiveSubclusters()); + getPolicyInfo(), getActiveSubclusters()); } @@ -88,8 +87,8 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest { float numberOfDraws = 1000000; for (float i = 0; i < numberOfDraws; i++) { - SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()). - getHomeSubcluster(getApplicationSubmissionContext()); + SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()) + .getHomeSubcluster(getApplicationSubmissionContext()); counter.get(chosenId).incrementAndGet(); } @@ -113,13 +112,15 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest { if (getActiveSubclusters().containsKey(counterEntry.getKey())) { Assert.assertTrue( "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight - + " expected weight: " + expectedWeight, expectedWeight == 0 || - (actualWeight / expectedWeight) < 1.1 - && (actualWeight / expectedWeight) > 0.9); + + " expected weight: " + expectedWeight, + expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1 + && (actualWeight / expectedWeight) > 0.9); } else { - Assert.assertTrue( - "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight - + " expected weight: " + expectedWeight, actualWeight == 0); + Assert + .assertTrue( + "Id " + counterEntry.getKey() + " Actual weight: " + + actualWeight + " expected weight: " + expectedWeight, + actualWeight == 0); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index f901329..87ed8d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.utils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolv import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.*; +import org.apache.hadoop.yarn.util.Records; import java.net.URL; import java.nio.ByteBuffer; @@ -48,6 +50,68 @@ public final class FederationPoliciesTestUtil { // disabled. } + private static final String FEDR_NODE_PREFIX = "fedr-test-node-"; + + + public static List<ResourceRequest> createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws YarnException { + List<ResourceRequest> reqs = new ArrayList<ResourceRequest>(); + for (String host : hosts) { + ResourceRequest hostReq = + createResourceRequest(host, memory, vCores, priority, containers, + labelExpression, relaxLocality); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression, relaxLocality); + reqs.add(rackReq); + } + + ResourceRequest offRackReq = + createResourceRequest(ResourceRequest.ANY, memory, vCores, priority, + containers, labelExpression, relaxLocality); + reqs.add(offRackReq); + return reqs; + } + + protected static ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + boolean relaxLocality) throws YarnException { + return createResourceRequest(resource, memory, vCores, priority, containers, + null, relaxLocality); + } + + @SuppressWarnings("checkstyle:parameternumber") + public static ResourceRequest createResourceRequest(long id, String resource, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws YarnException { + ResourceRequest out = + createResourceRequest(resource, memory, vCores, priority, containers, + labelExpression, relaxLocality); + out.setAllocationRequestId(id); + return out; + } + + public static ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws YarnException { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setResourceName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemorySize(memory); + capability.setVirtualCores(vCores); + req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } + req.setRelaxLocality(relaxLocality); + return req; + } public static void initializePolicyContext( FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b615145/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes index e4d6112..2b7e237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes @@ -1,4 +1,8 @@ node1,subcluster1,rack1 node2 , subcluster2, RACK1 noDE3,subcluster3, rack2 -node4, subcluster3, rack2 \ No newline at end of file +node4, subcluster3, rack2 +subcluster0-rack0-host0,subcluster0, subcluster0-rack0 +Subcluster1-RACK1-HOST1,subcluster1, subCluster1-RACK1 +SUBCLUSTER1-RACK1-HOST2,subcluster1, subCluster1-RACK1 +SubCluster2-RACK3-HOST3,subcluster2, subcluster2-rack3 --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org