YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db923830 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db923830 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db923830 Branch: refs/heads/YARN-2915 Commit: db923830f7878fcd7b5cf4ef6c6e98abed9d21fc Parents: 0cb42a0 Author: Subru Krishnan <su...@apache.org> Authored: Thu Oct 13 17:59:13 2016 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Thu Jun 22 14:03:23 2017 -0700 ---------------------------------------------------------------------- .../AbstractConfigurableFederationPolicy.java | 155 +++++ .../policies/ConfigurableFederationPolicy.java | 9 +- .../FederationPolicyInitializationContext.java | 37 +- ...ionPolicyInitializationContextValidator.java | 28 +- .../policies/FederationPolicyManager.java | 59 +- .../amrmproxy/AbstractAMRMProxyPolicy.java | 47 ++ .../amrmproxy/BroadcastAMRMProxyPolicy.java | 85 +++ .../amrmproxy/FederationAMRMProxyPolicy.java | 25 +- .../LocalityMulticastAMRMProxyPolicy.java | 583 +++++++++++++++++++ .../policies/amrmproxy/package-info.java | 1 - .../policies/dao/WeightedPolicyInfo.java | 180 +++--- .../federation/policies/dao/package-info.java | 1 - .../policies/exceptions/package-info.java | 1 - .../federation/policies/package-info.java | 1 - .../policies/router/AbstractRouterPolicy.java | 47 ++ .../router/BaseWeightedRouterPolicy.java | 150 ----- .../policies/router/FederationRouterPolicy.java | 5 +- .../policies/router/LoadBasedRouterPolicy.java | 36 +- .../policies/router/PriorityRouterPolicy.java | 19 +- .../router/UniformRandomRouterPolicy.java | 28 +- .../router/WeightedRandomRouterPolicy.java | 32 +- .../policies/router/package-info.java | 1 - .../resolver/AbstractSubClusterResolver.java | 4 +- .../policies/BaseFederationPoliciesTest.java | 28 +- ...ionPolicyInitializationContextValidator.java | 25 +- .../TestBroadcastAMRMProxyFederationPolicy.java | 112 ++++ .../TestLocalityMulticastAMRMProxyPolicy.java | 566 ++++++++++++++++++ .../router/TestLoadBasedRouterPolicy.java | 18 +- .../router/TestPriorityRouterPolicy.java | 15 +- .../router/TestWeightedRandomRouterPolicy.java | 35 +- .../utils/FederationPoliciesTestUtil.java | 64 ++ .../src/test/resources/nodes | 6 +- 32 files changed, 1950 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java new file mode 100644 index 0000000..4cb9bbe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import java.util.Map; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * Base abstract class for a weighted {@link ConfigurableFederationPolicy}. + */ +public abstract class AbstractConfigurableFederationPolicy + implements ConfigurableFederationPolicy { + + private WeightedPolicyInfo policyInfo = null; + private FederationPolicyInitializationContext policyContext; + private boolean isDirty; + + public AbstractConfigurableFederationPolicy() { + } + + @Override + public void reinitialize( + FederationPolicyInitializationContext initializationContext) + throws FederationPolicyInitializationException { + isDirty = true; + FederationPolicyInitializationContextValidator + .validate(initializationContext, this.getClass().getCanonicalName()); + + // perform consistency checks + WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo.fromByteBuffer( + initializationContext.getSubClusterPolicyConfiguration().getParams()); + + // if nothing has changed skip the rest of initialization + // and signal to childs that the reinit is free via isDirty var. + if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { + isDirty = false; + return; + } + + validate(newPolicyInfo); + setPolicyInfo(newPolicyInfo); + this.policyContext = initializationContext; + } + + /** + * Overridable validation step for the policy configuration. + * + * @param newPolicyInfo the configuration to test. + * + * @throws FederationPolicyInitializationException if the configuration is not + * valid. + */ + public void validate(WeightedPolicyInfo newPolicyInfo) + throws FederationPolicyInitializationException { + if (newPolicyInfo == null) { + throw new FederationPolicyInitializationException( + "The policy to " + "validate should not be null."); + } + } + + /** + * Returns true whether the last reinitialization requires actual changes, or + * was "free" as the weights have not changed. This is used by subclasses + * overriding reinitialize and calling super.reinitialize() to know wheter to + * quit early. + * + * @return whether more work is needed to initialize. + */ + public boolean getIsDirty() { + return isDirty; + } + + /** + * Getter method for the configuration weights. + * + * @return the {@link WeightedPolicyInfo} representing the policy + * configuration. + */ + public WeightedPolicyInfo getPolicyInfo() { + return policyInfo; + } + + /** + * Setter method for the configuration weights. + * + * @param policyInfo the {@link WeightedPolicyInfo} representing the policy + * configuration. + */ + public void setPolicyInfo(WeightedPolicyInfo policyInfo) { + this.policyInfo = policyInfo; + } + + /** + * Getter method for the {@link FederationPolicyInitializationContext}. + * + * @return the context for this policy. + */ + public FederationPolicyInitializationContext getPolicyContext() { + return policyContext; + } + + /** + * Setter method for the {@link FederationPolicyInitializationContext}. + * + * @param policyContext the context to assign to this policy. + */ + public void setPolicyContext( + FederationPolicyInitializationContext policyContext) { + this.policyContext = policyContext; + } + + /** + * This methods gets active subclusters map from the {@code + * FederationStateStoreFacade} and validate it not being null/empty. + * + * @return the map of ids to info for all active subclusters. + * + * @throws YarnException if we can't get the list. + */ + protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters() + throws YarnException { + + Map<SubClusterId, SubClusterInfo> activeSubclusters = + getPolicyContext().getFederationStateStoreFacade().getSubClusters(true); + + if (activeSubclusters == null || activeSubclusters.size() < 1) { + throw new NoActiveSubclustersException( + "Zero active subclusters, cannot pick where to send job."); + } + return activeSubclusters; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java index fd6ceea..5245772 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java @@ -31,14 +31,11 @@ public interface ConfigurableFederationPolicy { * policies. The implementor should provide try-n-swap semantics, and retain * state if possible. * - * @param federationPolicyInitializationContext the new context to provide to - * implementor. + * @param policyContext the new context to provide to implementor. * * @throws FederationPolicyInitializationException in case the initialization - * fails. + * fails. */ - void reinitialize( - FederationPolicyInitializationContext - federationPolicyInitializationContext) + void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java index 9347fd0..46dd6eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -30,6 +31,7 @@ public class FederationPolicyInitializationContext { private SubClusterPolicyConfiguration federationPolicyConfiguration; private SubClusterResolver federationSubclusterResolver; private FederationStateStoreFacade federationStateStoreFacade; + private SubClusterId homeSubcluster; public FederationPolicyInitializationContext() { federationPolicyConfiguration = null; @@ -37,20 +39,19 @@ public class FederationPolicyInitializationContext { federationStateStoreFacade = null; } - public FederationPolicyInitializationContext(SubClusterPolicyConfiguration - policy, SubClusterResolver resolver, FederationStateStoreFacade - storeFacade) { + public FederationPolicyInitializationContext( + SubClusterPolicyConfiguration policy, SubClusterResolver resolver, + FederationStateStoreFacade storeFacade) { this.federationPolicyConfiguration = policy; this.federationSubclusterResolver = resolver; this.federationStateStoreFacade = storeFacade; } - /** * Getter for the {@link SubClusterPolicyConfiguration}. * * @return the {@link SubClusterPolicyConfiguration} to be used for - * initialization. + * initialization. */ public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() { return federationPolicyConfiguration; @@ -59,8 +60,8 @@ public class FederationPolicyInitializationContext { /** * Setter for the {@link SubClusterPolicyConfiguration}. * - * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} - * to be used for initialization. + * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} to + * be used for initialization. */ public void setSubClusterPolicyConfiguration( SubClusterPolicyConfiguration fedPolicyConfiguration) { @@ -80,7 +81,7 @@ public class FederationPolicyInitializationContext { * Setter for the {@link SubClusterResolver}. * * @param federationSubclusterResolver the {@link SubClusterResolver} to be - * used for initialization. + * used for initialization. */ public void setFederationSubclusterResolver( SubClusterResolver federationSubclusterResolver) { @@ -105,4 +106,24 @@ public class FederationPolicyInitializationContext { FederationStateStoreFacade federationStateStoreFacade) { this.federationStateStoreFacade = federationStateStoreFacade; } + + /** + * Returns the current home sub-cluster. Useful for default policy behaviors. + * + * @return the home sub-cluster. + */ + public SubClusterId getHomeSubcluster() { + return homeSubcluster; + } + + /** + * Sets in the context the home sub-cluster. Useful for default policy + * behaviors. + * + * @param homeSubcluster value to set. + */ + public void setHomeSubcluster(SubClusterId homeSubcluster) { + this.homeSubcluster = homeSubcluster; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java index 31f83d4..1b83bbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java @@ -25,50 +25,44 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo public final class FederationPolicyInitializationContextValidator { private FederationPolicyInitializationContextValidator() { - //disable constructor per checkstyle + // disable constructor per checkstyle } public static void validate( - FederationPolicyInitializationContext - federationPolicyInitializationContext, - String myType) throws FederationPolicyInitializationException { + FederationPolicyInitializationContext policyContext, String myType) + throws FederationPolicyInitializationException { if (myType == null) { - throw new FederationPolicyInitializationException("The myType parameter" - + " should not be null."); + throw new FederationPolicyInitializationException( + "The myType parameter" + " should not be null."); } - if (federationPolicyInitializationContext == null) { + if (policyContext == null) { throw new FederationPolicyInitializationException( "The FederationPolicyInitializationContext provided is null. Cannot" - + " reinitalize " - + "successfully."); + + " reinitalize " + "successfully."); } - if (federationPolicyInitializationContext.getFederationStateStoreFacade() - == null) { + if (policyContext.getFederationStateStoreFacade() == null) { throw new FederationPolicyInitializationException( "The FederationStateStoreFacade provided is null. Cannot" + " reinitalize successfully."); } - if (federationPolicyInitializationContext.getFederationSubclusterResolver() - == null) { + if (policyContext.getFederationSubclusterResolver() == null) { throw new FederationPolicyInitializationException( "The FederationStateStoreFacase provided is null. Cannot" + " reinitalize successfully."); } - if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration() - == null) { + if (policyContext.getSubClusterPolicyConfiguration() == null) { throw new FederationPolicyInitializationException( "The FederationSubclusterResolver provided is null. Cannot " + "reinitalize successfully."); } String intendedType = - federationPolicyInitializationContext.getSubClusterPolicyConfiguration() - .getType(); + policyContext.getSubClusterPolicyConfiguration().getType(); if (!myType.equals(intendedType)) { throw new FederationPolicyInitializationException( http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java index e5dba63..39fdba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java @@ -25,19 +25,19 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo /** * * Implementors need to provide the ability to serliaze a policy and its - * configuration as a {@link SubClusterPolicyConfiguration}, as well as - * provide (re)initialization mechanics for the underlying + * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide + * (re)initialization mechanics for the underlying * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}. * - * The serialization aspects are used by admin APIs or a policy engine to - * store a serialized configuration in the {@code FederationStateStore}, - * while the getters methods are used to obtain a propertly inizialized - * policy in the {@code Router} and {@code AMRMProxy} respectively. + * The serialization aspects are used by admin APIs or a policy engine to store + * a serialized configuration in the {@code FederationStateStore}, while the + * getters methods are used to obtain a propertly inizialized policy in the + * {@code Router} and {@code AMRMProxy} respectively. * - * This interface by design binds together - * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and - * provide lifecycle support for serialization and deserialization, to reduce - * configuration mistakes (combining incompatible policies). + * This interface by design binds together {@link FederationAMRMProxyPolicy} and + * {@link FederationRouterPolicy} and provide lifecycle support for + * serialization and deserialization, to reduce configuration mistakes + * (combining incompatible policies). * */ public interface FederationPolicyManager { @@ -50,23 +50,17 @@ public interface FederationPolicyManager { * the implementors should attempt to reinitalize (retaining state). To affect * a complete policy reset oldInstance should be null. * - * @param federationPolicyInitializationContext the current context - * @param oldInstance the existing (possibly null) - * instance. + * @param policyContext the current context + * @param oldInstance the existing (possibly null) instance. * - * @return an updated {@link FederationAMRMProxyPolicy - }. + * @return an updated {@link FederationAMRMProxyPolicy }. * * @throws FederationPolicyInitializationException if the initialization - * cannot be completed - * properly. The oldInstance - * should be still valid in - * case of failed - * initialization. + * cannot be completed properly. The oldInstance should be still + * valid in case of failed initialization. */ FederationAMRMProxyPolicy getAMRMPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationAMRMProxyPolicy oldInstance) throws FederationPolicyInitializationException; @@ -78,21 +72,17 @@ public interface FederationPolicyManager { * implementors should attempt to reinitalize (retaining state). To affect a * complete policy reset oldInstance shoulb be set to null. * - * @param federationPolicyInitializationContext the current context - * @param oldInstance the existing (possibly null) - * instance. + * @param policyContext the current context + * @param oldInstance the existing (possibly null) instance. * * @return an updated {@link FederationRouterPolicy}. * * @throws FederationPolicyInitializationException if the initalization cannot - * be completed properly. The - * oldInstance should be still - * valid in case of failed - * initialization. + * be completed properly. The oldInstance should be still valid in + * case of failed initialization. */ FederationRouterPolicy getRouterPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationRouterPolicy oldInstance) throws FederationPolicyInitializationException; @@ -102,23 +92,24 @@ public interface FederationPolicyManager { * store. * * @return a valid policy configuration representing this object - * parametrization. + * parametrization. * * @throws FederationPolicyInitializationException if the current state cannot - * be serialized properly + * be serialized properly */ SubClusterPolicyConfiguration serializeConf() throws FederationPolicyInitializationException; - /** * This method returns the queue this policy is configured for. + * * @return the name of the queue. */ String getQueue(); /** * This methods provides a setter for the queue this policy is specified for. + * * @param queue the name of the queue. */ void setQueue(String queue); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java new file mode 100644 index 0000000..e853744 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; + +/** + * Base abstract class for {@link FederationAMRMProxyPolicy} implementations, + * that provides common validation for reinitialization. + */ +public abstract class AbstractAMRMProxyPolicy extends + AbstractConfigurableFederationPolicy implements FederationAMRMProxyPolicy { + + @Override + public void validate(WeightedPolicyInfo newPolicyInfo) + throws FederationPolicyInitializationException { + super.validate(newPolicyInfo); + Map<SubClusterIdInfo, Float> newWeights = + newPolicyInfo.getAMRMPolicyWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java new file mode 100644 index 0000000..679f4d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * An implementation of the {@link FederationAMRMProxyPolicy} that simply + * broadcasts each {@link ResourceRequest} to all the available sub-clusters. + */ +public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { + + private Set<SubClusterId> knownClusterIds = new HashSet<>(); + + @Override + public void reinitialize( + FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + // overrides initialize to avoid weight checks that do no apply for + // this policy. + FederationPolicyInitializationContextValidator + .validate(policyContext, this.getClass().getCanonicalName()); + setPolicyContext(policyContext); + } + + @Override + public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( + List<ResourceRequest> resourceRequests) throws YarnException { + + Map<SubClusterId, SubClusterInfo> activeSubclusters = + getActiveSubclusters(); + + Map<SubClusterId, List<ResourceRequest>> answer = new HashMap<>(); + + // simply broadcast the resource request to all sub-clusters + for (SubClusterId subClusterId : activeSubclusters.keySet()) { + answer.put(subClusterId, resourceRequests); + knownClusterIds.add(subClusterId); + } + + return answer; + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + if (!knownClusterIds.contains(subClusterId)) { + throw new UnknownSubclusterException( + "The response is received from a subcluster that is unknown to this " + + "policy."); + } + // stateless policy does not care about responses + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java index 4a3305c..0541df4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java @@ -17,18 +17,18 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import java.util.List; -import java.util.Map; - /** - * Implementors of this interface provide logic to split the list of {@link - * ResourceRequest}s received by the AM among various RMs. + * Implementors of this interface provide logic to split the list of + * {@link ResourceRequest}s received by the AM among various RMs. */ public interface FederationAMRMProxyPolicy extends ConfigurableFederationPolicy { @@ -37,18 +37,17 @@ public interface FederationAMRMProxyPolicy * Splits the {@link ResourceRequest}s from the client across one or more * sub-clusters based on the policy semantics (e.g., broadcast, load-based). * - * @param resourceRequests the list of {@link ResourceRequest}s from the - * AM to be split + * @param resourceRequests the list of {@link ResourceRequest}s from the AM to + * be split * * @return map of sub-cluster as identified by {@link SubClusterId} to the - * list of {@link ResourceRequest}s that should be forwarded to it + * list of {@link ResourceRequest}s that should be forwarded to it * * @throws YarnException in case the request is malformed or no viable - * sub-clusters can be found. + * sub-clusters can be found. */ Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( - List<ResourceRequest> resourceRequests) - throws YarnException; + List<ResourceRequest> resourceRequests) throws YarnException; /** * This method should be invoked to notify the policy about responses being @@ -60,7 +59,7 @@ public interface FederationAMRMProxyPolicy * * @throws YarnException in case the response is not valid */ - void notifyOfResponse(SubClusterId subClusterId, - AllocateResponse response) throws YarnException; + void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) + throws YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java new file mode 100644 index 0000000..283f89e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * An implementation of the {@link FederationAMRMProxyPolicy} interface that + * carefully multicasts the requests with the following behavior: + * + * <p> + * Host localized {@link ResourceRequest}s are always forwarded to the RM that + * owns the corresponding node, based on the feedback of a + * {@link SubClusterResolver}. If the {@link SubClusterResolver} cannot resolve + * this node we default to forwarding the {@link ResourceRequest} to the home + * sub-cluster. + * </p> + * + * <p> + * Rack localized {@link ResourceRequest}s are forwarded to the RMs that owns + * the corresponding rack. Note that in some deployments each rack could be + * striped across multiple RMs. Thsi policy respects that. If the + * {@link SubClusterResolver} cannot resolve this rack we default to forwarding + * the {@link ResourceRequest} to the home sub-cluster. + * </p> + * + * <p> + * ANY requests corresponding to node/rack local requests are forwarded only to + * the set of RMs that owns the corresponding localized requests. The number of + * containers listed in each ANY is proportional to the number of localized + * container requests (associated to this ANY via the same allocateRequestId). + * </p> + * + * <p> + * ANY that are not associated to node/rack local requests are split among RMs + * based on the "weights" in the {@link WeightedPolicyInfo} configuration *and* + * headroom information. The {@code headroomAlpha} parameter of the policy + * configuration indicates how much headroom contributes to the splitting + * choice. Value of 1.0f indicates the weights are interpreted only as 0/1 + * boolean but all splitting is based on the advertised headroom (fallback to + * 1/N for RMs that we don't have headroom info from). An {@code headroomAlpha} + * value of 0.0f means headroom is ignored and all splitting decisions are + * proportional to the "weights" in the configuration of the policy. + * </p> + * + * <p> + * ANY of zero size are forwarded to all known subclusters (i.e., subclusters + * where we scheduled containers before), as they may represent a user attempt + * to cancel a previous request (and we are mostly stateless now, so should + * forward to all known RMs). + * </p> + * + * <p> + * Invariants: + * </p> + * + * <p> + * The policy always excludes non-active RMs. + * </p> + * + * <p> + * The policy always excludes RMs that do not appear in the policy configuration + * weights, or have a weight of 0 (even if localized resources explicit refer to + * it). + * </p> + * + * <p> + * (Bar rounding to closest ceiling of fractional containers) The sum of + * requests made to multiple RMs at the ANY level "adds-up" to the user request. + * The maximum possible excess in a given request is a number of containers less + * or equal to number of sub-clusters in the federation. + * </p> + */ +public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { + + public static final Logger LOG = + LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); + + private Map<SubClusterId, Float> weights; + private SubClusterResolver resolver; + + private Map<SubClusterId, Resource> headroom; + private float hrAlpha; + private FederationStateStoreFacade federationFacade; + private AllocationBookkeeper bookkeeper; + private SubClusterId homeSubcluster; + + @Override + public void reinitialize( + FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + + // save reference to old weights + WeightedPolicyInfo tempPolicy = getPolicyInfo(); + + super.reinitialize(policyContext); + if (!getIsDirty()) { + return; + } + + Map<SubClusterId, Float> newWeightsConverted = new HashMap<>(); + boolean allInactive = true; + WeightedPolicyInfo policy = getPolicyInfo(); + if (policy.getAMRMPolicyWeights() == null + || policy.getAMRMPolicyWeights().size() == 0) { + allInactive = false; + } else { + for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights() + .entrySet()) { + if (e.getValue() > 0) { + allInactive = false; + } + newWeightsConverted.put(e.getKey().toId(), e.getValue()); + } + } + if (allInactive) { + // reset the policyInfo and throw + setPolicyInfo(tempPolicy); + throw new FederationPolicyInitializationException( + "The weights used to configure " + + "this policy are all set to zero! (no ResourceRequest could be " + + "forwarded with this setting.)"); + } + + if (policyContext.getHomeSubcluster() == null) { + setPolicyInfo(tempPolicy); + throw new FederationPolicyInitializationException("The homeSubcluster " + + "filed in the context must be initialized to use this policy"); + } + + weights = newWeightsConverted; + resolver = policyContext.getFederationSubclusterResolver(); + + if (headroom == null) { + headroom = new ConcurrentHashMap<>(); + } + hrAlpha = policy.getHeadroomAlpha(); + + this.federationFacade = + policyContext.getFederationStateStoreFacade(); + this.bookkeeper = new AllocationBookkeeper(); + this.homeSubcluster = policyContext.getHomeSubcluster(); + + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + // stateless policy does not care about responses except tracking headroom + headroom.put(subClusterId, response.getAvailableResources()); + } + + @Override + public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( + List<ResourceRequest> resourceRequests) throws YarnException { + + // object used to accumulate statistics about the answer, initialize with + // active subclusters. + bookkeeper.reinitialize(federationFacade.getSubClusters(true)); + + List<ResourceRequest> nonLocalizedRequests = + new ArrayList<ResourceRequest>(); + + SubClusterId targetId = null; + Set<SubClusterId> targetIds = null; + + // if the RR is resolved to a local subcluster add it directly (node and + // resolvable racks) + for (ResourceRequest rr : resourceRequests) { + targetId = null; + targetIds = null; + + // Handle: ANY (accumulated for later) + if (ResourceRequest.isAnyLocation(rr.getResourceName())) { + nonLocalizedRequests.add(rr); + continue; + } + + // Handle "node" requests + try { + targetId = resolver.getSubClusterForNode(rr.getResourceName()); + } catch (YarnException e) { + // this might happen as we can't differentiate node from rack names + // we log altogether later + } + if (bookkeeper.isActiveAndEnabled(targetId)) { + bookkeeper.addLocalizedNodeRR(targetId, rr); + continue; + } + + // Handle "rack" requests + try { + targetIds = resolver.getSubClustersForRack(rr.getResourceName()); + } catch (YarnException e) { + // this might happen as we can't differentiate node from rack names + // we log altogether later + } + if (targetIds != null && targetIds.size() > 0) { + for (SubClusterId tid : targetIds) { + if (bookkeeper.isActiveAndEnabled(tid)) { + bookkeeper.addRackRR(tid, rr); + } + } + continue; + } + + // Handle node/rack requests that the SubClusterResolver cannot map to + // any cluster. Defaulting to home subcluster. + if (LOG.isDebugEnabled()) { + LOG.debug("ERROR resolving sub-cluster for resourceName: " + + rr.getResourceName() + " we are falling back to homeSubCluster:" + + homeSubcluster); + } + + // If home-subcluster is not active, ignore node/rack request + if (bookkeeper.isActiveAndEnabled(homeSubcluster)) { + bookkeeper.addLocalizedNodeRR(homeSubcluster, rr); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are " + + "defaulting to is not active, the ResourceRequest " + + "will be ignored."); + } + } + } + + // handle all non-localized requests (ANY) + splitAnyRequests(nonLocalizedRequests, bookkeeper); + + return bookkeeper.getAnswer(); + } + + /** + * It splits a list of non-localized resource requests among sub-clusters. + */ + private void splitAnyRequests(List<ResourceRequest> originalResourceRequests, + AllocationBookkeeper allocationBookkeeper) throws YarnException { + + for (ResourceRequest resourceRequest : originalResourceRequests) { + + // FIRST: pick the target set of subclusters (based on whether this RR + // is associated with other localized requests via an allocationId) + Long allocationId = resourceRequest.getAllocationRequestId(); + Set<SubClusterId> targetSubclusters; + if (allocationBookkeeper.getSubClustersForId(allocationId) != null) { + targetSubclusters = + allocationBookkeeper.getSubClustersForId(allocationId); + } else { + targetSubclusters = allocationBookkeeper.getActiveAndEnabledSC(); + } + + // SECOND: pick how much to ask to each RM for each request + splitIndividualAny(resourceRequest, targetSubclusters, + allocationBookkeeper); + } + } + + /** + * Return a projection of this ANY {@link ResourceRequest} that belongs to + * this sub-cluster. This is done based on the "count" of the containers that + * require locality in each sublcuster (if any) or based on the "weights" and + * headroom. + */ + private void splitIndividualAny(ResourceRequest originalResourceRequest, + Set<SubClusterId> targetSubclusters, + AllocationBookkeeper allocationBookkeeper) { + + long allocationId = originalResourceRequest.getAllocationRequestId(); + + for (SubClusterId targetId : targetSubclusters) { + float numContainer = originalResourceRequest.getNumContainers(); + + // If the ANY request has 0 containers to begin with we must forward it to + // any RM we have previously contacted (this might be the user way + // to cancel a previous request). + if (numContainer == 0 && headroom.containsKey(targetId)) { + allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); + } + + // If ANY is associated with localized asks, split based on their ratio + if (allocationBookkeeper.getSubClustersForId(allocationId) != null) { + float localityBasedWeight = getLocalityBasedWeighting(allocationId, + targetId, allocationBookkeeper); + numContainer = numContainer * localityBasedWeight; + } else { + // split ANY based on load and policy configuration + float headroomWeighting = + getHeadroomWeighting(targetId, allocationBookkeeper); + float policyWeighting = + getPolicyConfigWeighting(targetId, allocationBookkeeper); + // hrAlpha controls how much headroom influencing decision + numContainer = numContainer + * (hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting); + } + + // if the calculated request is non-empty add it to the answer + if (numContainer > 0) { + ResourceRequest out = + ResourceRequest.newInstance(originalResourceRequest.getPriority(), + originalResourceRequest.getResourceName(), + originalResourceRequest.getCapability(), + originalResourceRequest.getNumContainers(), + originalResourceRequest.getRelaxLocality(), + originalResourceRequest.getNodeLabelExpression(), + originalResourceRequest.getExecutionTypeRequest()); + out.setAllocationRequestId(allocationId); + out.setNumContainers((int) Math.ceil(numContainer)); + if (out.isAnyLocation(out.getResourceName())) { + allocationBookkeeper.addAnyRR(targetId, out); + } else { + allocationBookkeeper.addRackRR(targetId, out); + } + } + } + } + + /** + * Compute the weight to assign to a subcluster based on how many local + * requests a subcluster is target of. + */ + private float getLocalityBasedWeighting(long reqId, SubClusterId targetId, + AllocationBookkeeper allocationBookkeeper) { + float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(); + float localWeight = + allocationBookkeeper.getNumLocalizedContainers(reqId, targetId); + return totWeight > 0 ? localWeight / totWeight : 0; + } + + /** + * Compute the "weighting" to give to a sublcuster based on the configured + * policy weights (for the active subclusters). + */ + private float getPolicyConfigWeighting(SubClusterId targetId, + AllocationBookkeeper allocationBookkeeper) { + float totWeight = allocationBookkeeper.totPolicyWeight; + Float localWeight = weights.get(targetId); + return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0; + } + + /** + * Compute the weighting based on available headroom. This is proportional to + * the available headroom memory announced by RM, or to 1/N for RMs we have + * not seen yet. If all RMs report zero headroom, we fallback to 1/N again. + */ + private float getHeadroomWeighting(SubClusterId targetId, + AllocationBookkeeper allocationBookkeeper) { + + // baseline weight for all RMs + float headroomWeighting = + 1 / (float) allocationBookkeeper.getActiveAndEnabledSC().size(); + + // if we have headroom infomration for this sub-cluster (and we are safe + // from /0 issues) + if (headroom.containsKey(targetId) + && allocationBookkeeper.totHeadroomMemory > 0) { + // compute which portion of the RMs that are active/enabled have reported + // their headroom (needed as adjustment factor) + // (note: getActiveAndEnabledSC should never be null/zero) + float ratioHeadroomKnown = allocationBookkeeper.totHeadRoomEnabledRMs + / (float) allocationBookkeeper.getActiveAndEnabledSC().size(); + + // headroomWeighting is the ratio of headroom memory in the targetId + // cluster / total memory. The ratioHeadroomKnown factor is applied to + // adjust for missing information and ensure sum of allocated containers + // closely approximate what the user asked (small excess). + headroomWeighting = (headroom.get(targetId).getMemorySize() + / allocationBookkeeper.totHeadroomMemory) * (ratioHeadroomKnown); + } + return headroomWeighting; + } + + /** + * This helper class is used to book-keep the requests made to each + * subcluster, and maintain useful statistics to split ANY requests. + */ + private final class AllocationBookkeeper { + + // the answer being accumulated + private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>(); + + // stores how many containers we have allocated in each RM for localized + // asks, used to correctly "spread" the corresponding ANY + private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM = + new HashMap<>(); + + private Set<SubClusterId> activeAndEnabledSC = new HashSet<>(); + private long totNumLocalizedContainers = 0; + private float totHeadroomMemory = 0; + private int totHeadRoomEnabledRMs = 0; + private float totPolicyWeight = 0; + + private void reinitialize( + Map<SubClusterId, SubClusterInfo> activeSubclusters) + throws YarnException { + + // reset data structures + answer.clear(); + countContainersPerRM.clear(); + activeAndEnabledSC.clear(); + totNumLocalizedContainers = 0; + totHeadroomMemory = 0; + totHeadRoomEnabledRMs = 0; + totPolicyWeight = 0; + + // pre-compute the set of subclusters that are both active and enabled by + // the policy weights, and accumulate their total weight + for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) { + if (entry.getValue() > 0 + && activeSubclusters.containsKey(entry.getKey())) { + activeAndEnabledSC.add(entry.getKey()); + totPolicyWeight += entry.getValue(); + } + } + + if (activeAndEnabledSC.size() < 1) { + throw new NoActiveSubclustersException( + "None of the subclusters enabled in this policy (weight>0) are " + + "currently active we cannot forward the ResourceRequest(s)"); + } + + // pre-compute headroom-based weights for active/enabled subclusters + for (Map.Entry<SubClusterId, Resource> r : headroom.entrySet()) { + if (activeAndEnabledSC.contains(r.getKey())) { + totHeadroomMemory += r.getValue().getMemorySize(); + totHeadRoomEnabledRMs++; + } + } + + } + + /** + * Add to the answer a localized node request, and keeps track of statistics + * on a per-allocation-id and per-subcluster bases. + */ + private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) { + Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(targetId)) { + countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, + new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) + .addAndGet(rr.getNumContainers()); + + totNumLocalizedContainers += rr.getNumContainers(); + + internalAddToAnswer(targetId, rr); + } + + /** + * Add a rack-local request to the final asnwer. + */ + public void addRackRR(SubClusterId targetId, ResourceRequest rr) { + Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + internalAddToAnswer(targetId, rr); + } + + /** + * Add an ANY request to the final answer. + */ + private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { + Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName())); + internalAddToAnswer(targetId, rr); + } + + private void internalAddToAnswer(SubClusterId targetId, + ResourceRequest partialRR) { + if (!answer.containsKey(targetId)) { + answer.put(targetId, new ArrayList<ResourceRequest>()); + } + answer.get(targetId).add(partialRR); + } + + /** + * Return all known subclusters associated with an allocation id. + * + * @param allocationId the allocation id considered + * + * @return the list of {@link SubClusterId}s associated with this allocation + * id + */ + private Set<SubClusterId> getSubClustersForId(long allocationId) { + if (countContainersPerRM.get(allocationId) == null) { + return null; + } + return countContainersPerRM.get(allocationId).keySet(); + } + + /** + * Return the answer accumulated so far. + * + * @return the answer + */ + private Map<SubClusterId, List<ResourceRequest>> getAnswer() { + return answer; + } + + /** + * Return the set of sub-clusters that are both active and allowed by our + * policy (weight > 0). + * + * @return a set of active and enabled {@link SubClusterId}s + */ + private Set<SubClusterId> getActiveAndEnabledSC() { + return activeAndEnabledSC; + } + + /** + * Return the total number of container coming from localized requests. + */ + private long getTotNumLocalizedContainers() { + return totNumLocalizedContainers; + } + + /** + * Returns the number of containers matching an allocation Id that are + * localized in the targetId subcluster. + */ + private long getNumLocalizedContainers(long allocationId, + SubClusterId targetId) { + AtomicLong c = countContainersPerRM.get(allocationId).get(targetId); + return c == null ? 0 : c.get(); + } + + /** + * Returns true is the subcluster request is both active and enabled. + */ + private boolean isActiveAndEnabled(SubClusterId targetId) { + if (targetId == null) { + return false; + } else { + return getActiveAndEnabledSC().contains(targetId); + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java index 99da20b..ef72647 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java @@ -17,4 +17,3 @@ */ /** AMRMPRoxy policies. **/ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; - http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java index a0fa37f..62eb03b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java @@ -17,10 +17,19 @@ package org.apache.hadoop.yarn.server.federation.policies.dao; -import com.sun.jersey.api.json.JSONConfiguration; -import com.sun.jersey.api.json.JSONJAXBContext; -import com.sun.jersey.api.json.JSONMarshaller; -import com.sun.jersey.api.json.JSONUnmarshaller; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -29,24 +38,16 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlRootElement; -import java.io.StringReader; -import java.io.StringWriter; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.api.json.JSONUnmarshaller; /** * This is a DAO class for the configuration of parameteres for federation * policies. This generalizes several possible configurations as two lists of - * {@link SubClusterIdInfo} and corresponding weights as a - * {@link Float}. The interpretation of the weight is left to the logic in - * the policy. + * {@link SubClusterIdInfo} and corresponding weights as a {@link Float}. The + * interpretation of the weight is left to the logic in the policy. */ @InterfaceAudience.Private @@ -57,12 +58,14 @@ public class WeightedPolicyInfo { private static final Logger LOG = LoggerFactory.getLogger(WeightedPolicyInfo.class); - + private static JSONJAXBContext jsonjaxbContext = initContext(); private Map<SubClusterIdInfo, Float> routerPolicyWeights = new HashMap<>(); private Map<SubClusterIdInfo, Float> amrmPolicyWeights = new HashMap<>(); private float headroomAlpha; - private static JSONJAXBContext jsonjaxbContext = initContext(); + public WeightedPolicyInfo() { + // JAXB needs this + } private static JSONJAXBContext initContext() { try { @@ -74,46 +77,6 @@ public class WeightedPolicyInfo { return null; } - public WeightedPolicyInfo() { - //JAXB needs this - } - - /** - * Setter method for Router weights. - * - * @param policyWeights the router weights. - */ - public void setRouterPolicyWeights( - Map<SubClusterIdInfo, Float> policyWeights) { - this.routerPolicyWeights = policyWeights; - } - - /** - * Setter method for ARMRMProxy weights. - * - * @param policyWeights the amrmproxy weights. - */ - public void setAMRMPolicyWeights( - Map<SubClusterIdInfo, Float> policyWeights) { - this.amrmPolicyWeights = policyWeights; - } - - /** - * Getter of the router weights. - * @return the router weights. - */ - public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() { - return routerPolicyWeights; - } - - /** - * Getter for AMRMProxy weights. - * @return the AMRMProxy weights. - */ - public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() { - return amrmPolicyWeights; - } - /** * Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON * representation. @@ -123,14 +86,14 @@ public class WeightedPolicyInfo { * @return the {@link WeightedPolicyInfo} represented. * * @throws FederationPolicyInitializationException if a deserializaiton error - * occurs. + * occurs. */ public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb) throws FederationPolicyInitializationException { if (jsonjaxbContext == null) { - throw new FederationPolicyInitializationException("JSONJAXBContext should" - + " not be null."); + throw new FederationPolicyInitializationException( + "JSONJAXBContext should" + " not be null."); } try { @@ -139,9 +102,8 @@ public class WeightedPolicyInfo { bb.get(bytes); String params = new String(bytes, Charset.forName("UTF-8")); - WeightedPolicyInfo weightedPolicyInfo = unmarshaller - .unmarshalFromJSON(new StringReader(params), - WeightedPolicyInfo.class); + WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON( + new StringReader(params), WeightedPolicyInfo.class); return weightedPolicyInfo; } catch (JAXBException j) { throw new FederationPolicyInitializationException(j); @@ -149,19 +111,56 @@ public class WeightedPolicyInfo { } /** - * Converts the policy into a byte array representation in the input {@link - * ByteBuffer}. + * Getter of the router weights. + * + * @return the router weights. + */ + public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() { + return routerPolicyWeights; + } + + /** + * Setter method for Router weights. + * + * @param policyWeights the router weights. + */ + public void setRouterPolicyWeights( + Map<SubClusterIdInfo, Float> policyWeights) { + this.routerPolicyWeights = policyWeights; + } + + /** + * Getter for AMRMProxy weights. + * + * @return the AMRMProxy weights. + */ + public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() { + return amrmPolicyWeights; + } + + /** + * Setter method for ARMRMProxy weights. + * + * @param policyWeights the amrmproxy weights. + */ + public void setAMRMPolicyWeights(Map<SubClusterIdInfo, Float> policyWeights) { + this.amrmPolicyWeights = policyWeights; + } + + /** + * Converts the policy into a byte array representation in the input + * {@link ByteBuffer}. * * @return byte array representation of this policy configuration. * * @throws FederationPolicyInitializationException if a serialization error - * occurs. + * occurs. */ public ByteBuffer toByteBuffer() throws FederationPolicyInitializationException { if (jsonjaxbContext == null) { - throw new FederationPolicyInitializationException("JSONJAXBContext should" - + " not be null."); + throw new FederationPolicyInitializationException( + "JSONJAXBContext should" + " not be null."); } try { String s = toJSONString(); @@ -186,22 +185,21 @@ public class WeightedPolicyInfo { return false; } - WeightedPolicyInfo otherPolicy = - (WeightedPolicyInfo) other; + WeightedPolicyInfo otherPolicy = (WeightedPolicyInfo) other; Map<SubClusterIdInfo, Float> otherAMRMWeights = otherPolicy.getAMRMPolicyWeights(); Map<SubClusterIdInfo, Float> otherRouterWeights = otherPolicy.getRouterPolicyWeights(); - boolean amrmWeightsMatch = otherAMRMWeights != null && - getAMRMPolicyWeights() != null && - CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(), - getAMRMPolicyWeights().entrySet()); + boolean amrmWeightsMatch = + otherAMRMWeights != null && getAMRMPolicyWeights() != null + && CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(), + getAMRMPolicyWeights().entrySet()); - boolean routerWeightsMatch = otherRouterWeights != null && - getRouterPolicyWeights() != null && - CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(), - getRouterPolicyWeights().entrySet()); + boolean routerWeightsMatch = + otherRouterWeights != null && getRouterPolicyWeights() != null + && CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(), + getRouterPolicyWeights().entrySet()); return amrmWeightsMatch && routerWeightsMatch; } @@ -215,10 +213,10 @@ public class WeightedPolicyInfo { * Return the parameter headroomAlpha, used by policies that balance * weight-based and load-based considerations in their decisions. * - * For policies that use this parameter, values close to 1 indicate that - * most of the decision should be based on currently observed headroom from - * various sub-clusters, values close to zero, indicate that the decision - * should be mostly based on weights and practically ignore current load. + * For policies that use this parameter, values close to 1 indicate that most + * of the decision should be based on currently observed headroom from various + * sub-clusters, values close to zero, indicate that the decision should be + * mostly based on weights and practically ignore current load. * * @return the value of headroomAlpha. */ @@ -227,13 +225,13 @@ public class WeightedPolicyInfo { } /** - * Set the parameter headroomAlpha, used by policies that balance - * weight-based and load-based considerations in their decisions. + * Set the parameter headroomAlpha, used by policies that balance weight-based + * and load-based considerations in their decisions. * - * For policies that use this parameter, values close to 1 indicate that - * most of the decision should be based on currently observed headroom from - * various sub-clusters, values close to zero, indicate that the decision - * should be mostly based on weights and practically ignore current load. + * For policies that use this parameter, values close to 1 indicate that most + * of the decision should be based on currently observed headroom from various + * sub-clusters, values close to zero, indicate that the decision should be + * mostly based on weights and practically ignore current load. * * @param headroomAlpha the value to use for balancing. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java index 43f5b83..c292e52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java @@ -17,4 +17,3 @@ */ /** DAO objects for serializing/deserializing policy configurations. **/ package org.apache.hadoop.yarn.server.federation.policies.dao; - http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java index 3318da9..ad2d543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java @@ -17,4 +17,3 @@ */ /** Exceptions for policies. **/ package org.apache.hadoop.yarn.server.federation.policies.exceptions; - http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java index 7d9a121..fa3fcc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java @@ -17,4 +17,3 @@ */ /** Federation Policies. **/ package org.apache.hadoop.yarn.server.federation.policies; - http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java new file mode 100644 index 0000000..f49af1d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; + +/** + * Base abstract class for {@link FederationRouterPolicy} implementations, that + * provides common validation for reinitialization. + */ +public abstract class AbstractRouterPolicy extends + AbstractConfigurableFederationPolicy implements FederationRouterPolicy { + + @Override + public void validate(WeightedPolicyInfo newPolicyInfo) + throws FederationPolicyInitializationException { + super.validate(newPolicyInfo); + Map<SubClusterIdInfo, Float> newWeights = + newPolicyInfo.getRouterPolicyWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java deleted file mode 100644 index e888979..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies.router; - -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; -import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; - -import java.util.Map; - -/** - * Abstract class provides common validation of reinitialize(), for all - * policies that are "weight-based". - */ -public abstract class BaseWeightedRouterPolicy - implements FederationRouterPolicy { - - private WeightedPolicyInfo policyInfo = null; - private FederationPolicyInitializationContext policyContext; - - public BaseWeightedRouterPolicy() { - } - - @Override - public void reinitialize(FederationPolicyInitializationContext - federationPolicyContext) - throws FederationPolicyInitializationException { - FederationPolicyInitializationContextValidator - .validate(federationPolicyContext, this.getClass().getCanonicalName()); - - // perform consistency checks - WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo - .fromByteBuffer( - federationPolicyContext.getSubClusterPolicyConfiguration() - .getParams()); - - // if nothing has changed skip the rest of initialization - if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { - return; - } - - validate(newPolicyInfo); - setPolicyInfo(newPolicyInfo); - this.policyContext = federationPolicyContext; - } - - /** - * Overridable validation step for the policy configuration. - * @param newPolicyInfo the configuration to test. - * @throws FederationPolicyInitializationException if the configuration is - * not valid. - */ - public void validate(WeightedPolicyInfo newPolicyInfo) throws - FederationPolicyInitializationException { - if (newPolicyInfo == null) { - throw new FederationPolicyInitializationException("The policy to " - + "validate should not be null."); - } - Map<SubClusterIdInfo, Float> newWeights = - newPolicyInfo.getRouterPolicyWeights(); - if (newWeights == null || newWeights.size() < 1) { - throw new FederationPolicyInitializationException( - "Weight vector cannot be null/empty."); - } - } - - - /** - * Getter method for the configuration weights. - * - * @return the {@link WeightedPolicyInfo} representing the policy - * configuration. - */ - public WeightedPolicyInfo getPolicyInfo() { - return policyInfo; - } - - /** - * Setter method for the configuration weights. - * - * @param policyInfo the {@link WeightedPolicyInfo} representing the policy - * configuration. - */ - public void setPolicyInfo( - WeightedPolicyInfo policyInfo) { - this.policyInfo = policyInfo; - } - - /** - * Getter method for the {@link FederationPolicyInitializationContext}. - * @return the context for this policy. - */ - public FederationPolicyInitializationContext getPolicyContext() { - return policyContext; - } - - /** - * Setter method for the {@link FederationPolicyInitializationContext}. - * @param policyContext the context to assign to this policy. - */ - public void setPolicyContext( - FederationPolicyInitializationContext policyContext) { - this.policyContext = policyContext; - } - - /** - * This methods gets active subclusters map from the {@code - * FederationStateStoreFacade} and validate it not being null/empty. - * - * @return the map of ids to info for all active subclusters. - * @throws YarnException if we can't get the list. - */ - protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters() - throws YarnException { - - Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext() - .getFederationStateStoreFacade().getSubClusters(true); - - if (activeSubclusters == null || activeSubclusters.size() < 1) { - throw new NoActiveSubclustersException( - "Zero active subclusters, cannot pick where to send job."); - } - return activeSubclusters; - } - - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java index 42c86cc..90ea0a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java @@ -35,11 +35,10 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy { * @param appSubmissionContext the context for the app being submitted. * * @return the sub-cluster as identified by {@link SubClusterId} to route the - * request to. + * request to. * * @throws YarnException if the policy cannot determine a viable subcluster. */ SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException; + ApplicationSubmissionContext appSubmissionContext) throws YarnException; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org