YARN-5391. PolicyManager to tie together Router/AMRM Federation policies. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27765f13 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27765f13 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27765f13 Branch: refs/heads/YARN-2915 Commit: 27765f13898655be69ff8be9d2b1930f78bb00d1 Parents: 27b133b Author: Subru Krishnan <su...@apache.org> Authored: Tue Nov 1 19:54:18 2016 -0700 Committer: Carlo Curino <cur...@apache.org> Committed: Tue May 16 08:52:37 2017 -0700 ---------------------------------------------------------------------- .../policies/AbstractPolicyManager.java | 175 +++++++++++++++++++ .../FederationPolicyInitializationContext.java | 3 +- .../policies/UniformBroadcastPolicyManager.java | 56 ++++++ .../policies/WeightedLocalityPolicyManager.java | 67 +++++++ .../records/SubClusterPolicyConfiguration.java | 13 ++ .../policies/BasePolicyManagerTest.java | 108 ++++++++++++ ...ionPolicyInitializationContextValidator.java | 5 +- .../TestUniformBroadcastPolicyManager.java | 40 +++++ .../TestWeightedLocalityPolicyManager.java | 79 +++++++++ .../utils/FederationPoliciesTestUtil.java | 2 +- 10 files changed, 545 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java new file mode 100644 index 0000000..e77f2e3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides basic implementation for common methods that multiple + * policies will need to implement. + */ +public abstract class AbstractPolicyManager implements + FederationPolicyManager { + + private String queue; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class routerFederationPolicy; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class amrmProxyFederationPolicy; + + public static final Logger LOG = + LoggerFactory.getLogger(AbstractPolicyManager.class); + /** + * This default implementation validates the + * {@link FederationPolicyInitializationContext}, + * then checks whether it needs to reinstantiate the class (null or + * mismatching type), and reinitialize the policy. + * + * @param federationPolicyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy} + * instance + * + * @throws FederationPolicyInitializationException if the reinitalization is + * not valid, and ensure + * previous state is preserved + */ + public FederationAMRMProxyPolicy getAMRMPolicy( + FederationPolicyInitializationContext federationPolicyContext, + FederationAMRMProxyPolicy oldInstance) + throws FederationPolicyInitializationException { + + if (amrmProxyFederationPolicy == null) { + throw new FederationPolicyInitializationException("The parameter " + + "amrmProxyFederationPolicy should be initialized in " + + this.getClass().getSimpleName() + " constructor."); + } + + try { + return (FederationAMRMProxyPolicy) internalPolicyGetter( + federationPolicyContext, oldInstance, amrmProxyFederationPolicy); + } catch (ClassCastException e) { + throw new FederationPolicyInitializationException(e); + } + + } + + /** + * This default implementation validates the + * {@link FederationPolicyInitializationContext}, + * then checks whether it needs to reinstantiate the class (null or + * mismatching type), and reinitialize the policy. + * + * @param federationPolicyContext the current context + * @param oldInstance the existing (possibly null) instance. + * + * @return a valid and fully reinitalized {@link FederationRouterPolicy} + * instance + * + * @throws FederationPolicyInitializationException if the reinitalization is + * not valid, and ensure + * previous state is preserved + */ + + public FederationRouterPolicy getRouterPolicy( + FederationPolicyInitializationContext federationPolicyContext, + FederationRouterPolicy oldInstance) + throws FederationPolicyInitializationException { + + //checks that sub-types properly initialize the types of policies + if (routerFederationPolicy == null) { + throw new FederationPolicyInitializationException("The policy " + + "type should be initialized in " + this.getClass().getSimpleName() + + " constructor."); + } + + try { + return (FederationRouterPolicy) internalPolicyGetter( + federationPolicyContext, oldInstance, routerFederationPolicy); + } catch (ClassCastException e) { + throw new FederationPolicyInitializationException(e); + } + } + + @Override + public String getQueue() { + return queue; + } + + @Override + public void setQueue(String queue) { + this.queue = queue; + } + + /** + * Common functionality to instantiate a reinitialize a {@link + * ConfigurableFederationPolicy}. + */ + private ConfigurableFederationPolicy internalPolicyGetter( + final FederationPolicyInitializationContext federationPolicyContext, + ConfigurableFederationPolicy oldInstance, Class policy) + throws FederationPolicyInitializationException { + + FederationPolicyInitializationContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + if (oldInstance == null || !oldInstance.getClass().equals(policy)) { + try { + oldInstance = (ConfigurableFederationPolicy) policy.newInstance(); + } catch (InstantiationException e) { + throw new FederationPolicyInitializationException(e); + } catch (IllegalAccessException e) { + throw new FederationPolicyInitializationException(e); + } + } + + //copying the context to avoid side-effects + FederationPolicyInitializationContext modifiedContext = + updateContext(federationPolicyContext, + oldInstance.getClass().getCanonicalName()); + + oldInstance.reinitialize(modifiedContext); + return oldInstance; + } + + /** + * This method is used to copy-on-write the context, that will be passed + * downstream to the router/amrmproxy policies. + */ + private FederationPolicyInitializationContext updateContext( + FederationPolicyInitializationContext federationPolicyContext, + String type) { + // copying configuration and context to avoid modification of original + SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration + .newInstance(federationPolicyContext + .getSubClusterPolicyConfiguration()); + newConf.setType(type); + + return new FederationPolicyInitializationContext(newConf, + federationPolicyContext.getFederationSubclusterResolver(), + federationPolicyContext.getFederationStateStoreFacade(), + federationPolicyContext.getHomeSubcluster()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java index 46dd6eb..4d29a41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java @@ -41,10 +41,11 @@ public class FederationPolicyInitializationContext { public FederationPolicyInitializationContext( SubClusterPolicyConfiguration policy, SubClusterResolver resolver, - FederationStateStoreFacade storeFacade) { + FederationStateStoreFacade storeFacade, SubClusterId home) { this.federationPolicyConfiguration = policy; this.federationSubclusterResolver = resolver; this.federationStateStoreFacade = storeFacade; + this.homeSubcluster = home; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java new file mode 100644 index 0000000..a01f8fa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import java.nio.ByteBuffer; + +/** + * This class represents a simple implementation of a {@code + * FederationPolicyManager}. + * + * It combines the basic policies: {@link UniformRandomRouterPolicy} and + * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and + * "spread" the load among sub-clusters uniformly. + * + * This simple policy might impose heavy load on the RMs and return more + * containers than a job requested as all requests are (replicated and) + * broadcasted. + */ +public class UniformBroadcastPolicyManager + extends AbstractPolicyManager { + + public UniformBroadcastPolicyManager() { + //this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = UniformRandomRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = ByteBuffer.allocate(0); + return SubClusterPolicyConfiguration + .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java new file mode 100644 index 0000000..f3c6673 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import java.nio.ByteBuffer; + +/** + * Policy that allows operator to configure "weights" for routing. This picks a + * {@link WeightedRandomRouterPolicy} for the router and a {@link + * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to + * work together. + */ +public class WeightedLocalityPolicyManager + extends AbstractPolicyManager { + + private WeightedPolicyInfo weightedPolicyInfo; + + public WeightedLocalityPolicyManager() { + //this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = WeightedRandomRouterPolicy.class; + amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class; + weightedPolicyInfo = new WeightedPolicyInfo(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); + return SubClusterPolicyConfiguration + .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); + } + + @VisibleForTesting + public WeightedPolicyInfo getWeightedPolicyInfo() { + return weightedPolicyInfo; + } + + @VisibleForTesting + public void setWeightedPolicyInfo( + WeightedPolicyInfo weightedPolicyInfo) { + this.weightedPolicyInfo = weightedPolicyInfo; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java index 2839139..52807d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java @@ -40,6 +40,7 @@ import java.nio.ByteBuffer; @Unstable public abstract class SubClusterPolicyConfiguration { + @Private @Unstable public static SubClusterPolicyConfiguration newInstance(String queue, @@ -52,6 +53,18 @@ public abstract class SubClusterPolicyConfiguration { return policy; } + @Private + @Unstable + public static SubClusterPolicyConfiguration newInstance( + SubClusterPolicyConfiguration conf) { + SubClusterPolicyConfiguration policy = + Records.newRecord(SubClusterPolicyConfiguration.class); + policy.setQueue(conf.getQueue()); + policy.setType(conf.getType()); + policy.setParams(conf.getParams()); + return policy; + } + /** * Get the name of the queue for which we are configuring a policy. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java new file mode 100644 index 0000000..c609886 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Test; + +/** + * This class provides common test methods for testing {@code + * FederationPolicyManager}s. + */ +public abstract class BasePolicyManagerTest { + + + @SuppressWarnings("checkstyle:visibilitymodifier") + protected FederationPolicyManager wfp = null; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class expectedPolicyManager; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class expectedAMRMProxyPolicy; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected Class expectedRouterPolicy; + + + @Test + public void testSerializeAndInstantiate() throws Exception { + serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, + expectedAMRMProxyPolicy, + expectedRouterPolicy); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testSerializeAndInstantiateBad1() throws Exception { + serializeAndDeserializePolicyManager(wfp, String.class, + expectedAMRMProxyPolicy, expectedRouterPolicy); + } + + @Test(expected = AssertionError.class) + public void testSerializeAndInstantiateBad2() throws Exception { + serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, + String.class, expectedRouterPolicy); + } + + @Test(expected = AssertionError.class) + public void testSerializeAndInstantiateBad3() throws Exception { + serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, + expectedAMRMProxyPolicy, String.class); + } + + protected static void serializeAndDeserializePolicyManager( + FederationPolicyManager wfp, Class policyManagerType, + Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception { + + // serializeConf it in a context + SubClusterPolicyConfiguration fpc = + wfp.serializeConf(); + fpc.setType(policyManagerType.getCanonicalName()); + FederationPolicyInitializationContext context = new + FederationPolicyInitializationContext(); + context.setSubClusterPolicyConfiguration(fpc); + context + .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade()); + context.setFederationSubclusterResolver( + FederationPoliciesTestUtil.initResolver()); + context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster")); + + // based on the "context" created instantiate new class and use it + Class c = Class.forName(wfp.getClass().getCanonicalName()); + FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance(); + + FederationAMRMProxyPolicy federationAMRMProxyPolicy = + wfp2.getAMRMPolicy(context, null); + + //needed only for tests (getARMRMPolicy change the "type" in conf) + fpc.setType(wfp.getClass().getCanonicalName()); + + FederationRouterPolicy federationRouterPolicy = + wfp2.getRouterPolicy(context, null); + + Assert.assertEquals(federationAMRMProxyPolicy.getClass(), + expAMRMProxyPolicy); + + Assert.assertEquals(federationRouterPolicy.getClass(), + expRouterPolicy); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index c79fd2a..d906b92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMR import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -38,6 +39,7 @@ public class TestFederationPolicyInitializationContextValidator { private SubClusterPolicyConfiguration goodConfig; private SubClusterResolver goodSR; private FederationStateStoreFacade goodFacade; + private SubClusterId goodHome; private FederationPolicyInitializationContext context; @Before @@ -45,8 +47,9 @@ public class TestFederationPolicyInitializationContextValidator { goodFacade = FederationPoliciesTestUtil.initFacade(); goodConfig = new MockPolicyManager().serializeConf(); goodSR = FederationPoliciesTestUtil.initResolver(); + goodHome = SubClusterId.newInstance("homesubcluster"); context = new FederationPolicyInitializationContext(goodConfig, goodSR, - goodFacade); + goodFacade, goodHome); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java new file mode 100644 index 0000000..542a5ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; +import org.junit.Before; + +/** + * Simple test of {@link UniformBroadcastPolicyManager}. + */ +public class TestUniformBroadcastPolicyManager extends BasePolicyManagerTest { + + @Before + public void setup() { + //config policy + wfp = new UniformBroadcastPolicyManager(); + wfp.setQueue("queue1"); + + //set expected params that the base test class will use for tests + expectedPolicyManager = UniformBroadcastPolicyManager.class; + expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class; + expectedRouterPolicy = UniformRandomRouterPolicy.class; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java new file mode 100644 index 0000000..ab9cec4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Simple test of {@link WeightedLocalityPolicyManager}. + */ +public class TestWeightedLocalityPolicyManager extends + BasePolicyManagerTest { + + private WeightedPolicyInfo policyInfo; + + @Before + public void setup() { + // configure a policy + + wfp = new WeightedLocalityPolicyManager(); + wfp.setQueue("queue1"); + SubClusterId sc1 = SubClusterId.newInstance("sc1"); + SubClusterId sc2 = SubClusterId.newInstance("sc2"); + policyInfo = new WeightedPolicyInfo(); + + Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); + routerWeights.put(new SubClusterIdInfo(sc1), 0.2f); + routerWeights.put(new SubClusterIdInfo(sc2), 0.8f); + policyInfo.setRouterPolicyWeights(routerWeights); + + Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>(); + amrmWeights.put(new SubClusterIdInfo(sc1), 0.2f); + amrmWeights.put(new SubClusterIdInfo(sc2), 0.8f); + policyInfo.setAMRMPolicyWeights(amrmWeights); + + ((WeightedLocalityPolicyManager) wfp).setWeightedPolicyInfo( + policyInfo); + + //set expected params that the base test class will use for tests + expectedPolicyManager = WeightedLocalityPolicyManager.class; + expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class; + expectedRouterPolicy = WeightedRandomRouterPolicy.class; + } + + @Test + public void testPolicyInfoSetCorrectly() throws Exception { + serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, + expectedAMRMProxyPolicy, + expectedRouterPolicy); + + //check the policyInfo propagates through ser/der correctly + Assert.assertEquals(((WeightedLocalityPolicyManager) wfp) + .getWeightedPolicyInfo(), policyInfo); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index 87ed8d1..85fdc96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -143,7 +143,7 @@ public final class FederationPoliciesTestUtil { SubClusterInfo> activeSubclusters) throws YarnException { FederationPolicyInitializationContext context = new FederationPolicyInitializationContext(null, initResolver(), - initFacade()); + initFacade(), SubClusterId.newInstance("homesubcluster")); initializePolicyContext(context, policy, policyInfo, activeSubclusters); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org