YARN-5634. Simplify initialization/use of RouterPolicy via a RouterPolicyFacade. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5579b223 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5579b223 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5579b223 Branch: refs/heads/YARN-2915 Commit: 5579b2234d5732ab93004854a76d169f24070e23 Parents: f5fafdb Author: Subru Krishnan <su...@apache.org> Authored: Wed Nov 16 19:39:25 2016 -0800 Committer: Subru Krishnan <su...@apache.org> Committed: Tue Nov 22 18:37:54 2016 -0800 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 9 + .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../yarn/conf/TestYarnConfigurationFields.java | 12 + ...ionPolicyInitializationContextValidator.java | 2 +- .../PriorityBroadcastPolicyManager.java | 66 +++++ .../federation/policies/RouterPolicyFacade.java | 266 +++++++++++++++++++ .../policies/dao/WeightedPolicyInfo.java | 6 +- .../utils/FederationStateStoreFacade.java | 16 +- .../TestPriorityBroadcastPolicyManager.java | 72 +++++ .../policies/TestRouterPolicyFacade.java | 220 +++++++++++++++ .../utils/FederationStateStoreTestUtil.java | 22 +- 11 files changed, 693 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 0e94b70..148ecc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -314,6 +314,15 @@ <Bug pattern="IS2_INCONSISTENT_SYNC"/> </Match> + <Match> + <Class name="org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade"/> + <Or> + <Field name="globalConfMap"/> + <Field name="globalPolicyMap"/> + </Or> + <Bug pattern="IS2_INCONSISTENT_SYNC"/> + </Match> + <!-- Don't care if putIfAbsent value is ignored --> <Match> <Package name="org.apache.hadoop.yarn.factories.impl.pb" /> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index eeb8258..6def749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2522,6 +2522,19 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; + public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; + + public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX + + "policy-manager"; + + public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache" + + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager"; + + public static final String FEDERATION_POLICY_MANAGER_PARAMS = + FEDERATION_PREFIX + "policy-manager-params"; + + public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = ""; + //////////////////////////////// // Other Configs //////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 5958d4d..b3230ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -102,6 +102,18 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPropsToSkipCompare .add(YarnConfiguration.RM_EPOCH); + // Federation policies configs to be ignored + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_POLICY_MANAGER); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); + // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" configurationPropsToSkipCompare.add(YarnConfiguration. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java index 1b83bbc..3c44e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java @@ -57,7 +57,7 @@ public final class FederationPolicyInitializationContextValidator { if (policyContext.getSubClusterPolicyConfiguration() == null) { throw new FederationPolicyInitializationException( - "The FederationSubclusterResolver provided is null. Cannot " + "The SubClusterPolicyConfiguration provided is null. Cannot " + "reinitalize successfully."); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java new file mode 100644 index 0000000..ebdcf42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Policy that allows operator to configure "weights" for routing. This picks a + * {@link PriorityRouterPolicy} for the router and a + * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to + * work together. + */ +public class PriorityBroadcastPolicyManager extends AbstractPolicyManager { + + private WeightedPolicyInfo weightedPolicyInfo; + + public PriorityBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = PriorityRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + weightedPolicyInfo = new WeightedPolicyInfo(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); + return SubClusterPolicyConfiguration.newInstance(getQueue(), + this.getClass().getCanonicalName(), buf); + } + + @VisibleForTesting + public WeightedPolicyInfo getWeightedPolicyInfo() { + return weightedPolicyInfo; + } + + @VisibleForTesting + public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) { + this.weightedPolicyInfo = weightedPolicyInfo; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java new file mode 100644 index 0000000..a3fd15a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class provides a facade to the policy subsystem, and handles the + * lifecycle of policies (e.g., refresh from remote, default behaviors etc.). + */ +public class RouterPolicyFacade { + + private static final Log LOG = + LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class); + + private final SubClusterResolver subClusterResolver; + private final FederationStateStoreFacade federationFacade; + private Map<String, SubClusterPolicyConfiguration> globalConfMap; + + @VisibleForTesting + Map<String, FederationRouterPolicy> globalPolicyMap; + + public RouterPolicyFacade(YarnConfiguration conf, + FederationStateStoreFacade facade, SubClusterResolver resolver, + SubClusterId homeSubcluster) + throws FederationPolicyInitializationException { + + this.federationFacade = facade; + this.subClusterResolver = resolver; + this.globalConfMap = new ConcurrentHashMap<>(); + this.globalPolicyMap = new ConcurrentHashMap<>(); + + // load default behavior from store if possible + String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + SubClusterPolicyConfiguration configuration = null; + try { + configuration = federationFacade.getPolicyConfiguration(defaulKey); + } catch (YarnException e) { + LOG.warn("No fallback behavior defined in store, defaulting to XML " + + "configuration fallback behavior."); + } + + // or from XML conf otherwise. + if (configuration == null) { + String defaultFederationPolicyManager = + conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER, + YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); + String defaultPolicyParamString = + conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS, + YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); + ByteBuffer defaultPolicyParam = ByteBuffer + .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8)); + + configuration = SubClusterPolicyConfiguration.newInstance(defaulKey, + defaultFederationPolicyManager, defaultPolicyParam); + } + + // construct the required policy manager + FederationPolicyInitializationContext fallbackContext = + new FederationPolicyInitializationContext(configuration, + subClusterResolver, federationFacade, homeSubcluster); + FederationPolicyManager fallbackPolicyManager = + instantiatePolicyManager(configuration.getType()); + fallbackPolicyManager.setQueue(defaulKey); + + // add to the cache the fallback behavior + globalConfMap.put(defaulKey, + fallbackContext.getSubClusterPolicyConfiguration()); + globalPolicyMap.put(defaulKey, + fallbackPolicyManager.getRouterPolicy(fallbackContext, null)); + + } + + /** + * This method provides a wrapper of all policy functionalities for routing . + * Internally it manages configuration changes, and policy init/reinit. + * + * @param appSubmissionContext the application to route. + * + * @return the id of the subcluster that will be the "home" for this + * application. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this app. + */ + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) throws YarnException { + + // the maps are concurrent, but we need to protect from reset() + // reinitialization mid-execution by creating a new reference local to this + // method. + Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap; + Map<String, FederationRouterPolicy> policyMap = globalPolicyMap; + + if (appSubmissionContext == null) { + throw new FederationPolicyException( + "The ApplicationSubmissionContext " + "cannot be null."); + } + + String queue = appSubmissionContext.getQueue(); + + // respecting YARN behavior we assume default queue if the queue is not + // specified. This also ensures that "null" can be used as a key to get the + // default behavior. + if (queue == null) { + queue = YarnConfiguration.DEFAULT_QUEUE_NAME; + } + + // the facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + + try { + configuration = federationFacade.getPolicyConfiguration(queue); + } catch (YarnException e) { + LOG.debug(e); + } + + // If there is no policy configured for this queue, fallback to the baseline + // policy that is configured either in the store or via XML config (and + // cached) + if (configuration == null) { + try { + LOG.warn("There is no policies configured for queue: " + queue + " we" + + " fallback to default policy for: " + + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + + queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + configuration = federationFacade.getPolicyConfiguration( + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + } catch (YarnException e) { + // the fallback is not configure via store, but via XML, using + // previously loaded configuration. + configuration = + cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + } + } + + // if the configuration has changed since last loaded, reinit the policy + // based on current configuration + if (!cachedConfs.containsKey(queue) + || !cachedConfs.get(queue).equals(configuration)) { + singlePolicyReinit(policyMap, cachedConfs, queue, configuration); + } + + FederationRouterPolicy policy = policyMap.get(queue); + if (policy == null) { + // this should never happen, as the to maps are updated together + throw new FederationPolicyException("No FederationRouterPolicy found " + + "for queue: " + appSubmissionContext.getQueue() + " (for " + + "application: " + appSubmissionContext.getApplicationId() + ") " + + "and no default specified."); + } + + return policy.getHomeSubcluster(appSubmissionContext); + } + + /** + * This method reinitializes a policy and loads it in the policyMap. + * + * @param queue the queue to initialize a policy for. + * @param conf the configuration to use for initalization. + * + * @throws FederationPolicyInitializationException if initialization fails. + */ + private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap, + Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue, + SubClusterPolicyConfiguration conf) + throws FederationPolicyInitializationException { + + FederationPolicyInitializationContext context = + new FederationPolicyInitializationContext(conf, subClusterResolver, + federationFacade, null); + String newType = context.getSubClusterPolicyConfiguration().getType(); + FederationRouterPolicy routerPolicy = policyMap.get(queue); + + FederationPolicyManager federationPolicyManager = + instantiatePolicyManager(newType); + // set queue, reinit policy if required (implementation lazily check + // content of conf), and cache it + federationPolicyManager.setQueue(queue); + routerPolicy = + federationPolicyManager.getRouterPolicy(context, routerPolicy); + + // we need the two put to be atomic (across multiple threads invoking + // this and reset operations) + synchronized (this) { + policyMap.put(queue, routerPolicy); + cachedConfs.put(queue, conf); + } + } + + private static FederationPolicyManager instantiatePolicyManager( + String newType) throws FederationPolicyInitializationException { + FederationPolicyManager federationPolicyManager = null; + try { + // create policy instance and set queue + Class c = Class.forName(newType); + federationPolicyManager = (FederationPolicyManager) c.newInstance(); + } catch (ClassNotFoundException e) { + throw new FederationPolicyInitializationException(e); + } catch (InstantiationException e) { + throw new FederationPolicyInitializationException(e); + } catch (IllegalAccessException e) { + throw new FederationPolicyInitializationException(e); + } + return federationPolicyManager; + } + + /** + * This method flushes all cached configurations and policies. This should be + * invoked if the facade remains activity after very large churn of queues in + * the system. + */ + public synchronized void reset() { + + // remember the fallBack + SubClusterPolicyConfiguration conf = + globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + FederationRouterPolicy policy = + globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + + globalConfMap = new ConcurrentHashMap<>(); + globalPolicyMap = new ConcurrentHashMap<>(); + + // add to the cache a fallback with keyword null + globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf); + globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, + policy); + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java index 62eb03b..e7b8afe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java @@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.federation.policies.dao; import java.io.StringReader; import java.io.StringWriter; import java.nio.ByteBuffer; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -100,7 +100,7 @@ public class WeightedPolicyInfo { JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); final byte[] bytes = new byte[bb.remaining()]; bb.get(bytes); - String params = new String(bytes, Charset.forName("UTF-8")); + String params = new String(bytes, StandardCharsets.UTF_8); WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON( new StringReader(params), WeightedPolicyInfo.class); @@ -164,7 +164,7 @@ public class WeightedPolicyInfo { } try { String s = toJSONString(); - return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8"))); + return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); } catch (JAXBException j) { throw new FederationPolicyInitializationException(j); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 66a0b60..9b794de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoR import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -262,12 +263,17 @@ public final class FederationStateStoreFacade { if (isCachingEnabled()) { return getPoliciesConfigurations().get(queue); } else { - return stateStore - .getPolicyConfiguration( - GetSubClusterPolicyConfigurationRequest.newInstance(queue)) - .getPolicyConfiguration(); - } + GetSubClusterPolicyConfigurationResponse response = + stateStore.getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest.newInstance(queue)); + if (response == null) { + throw new YarnException("The stateStore returned a null for " + + "GetSubClusterPolicyConfigurationResponse for queue " + queue); + } else { + return response.getPolicyConfiguration(); + } + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java new file mode 100644 index 0000000..5e5bc83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test of {@link PriorityBroadcastPolicyManager}. + */ +public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest { + + private WeightedPolicyInfo policyInfo; + + @Before + public void setup() { + // configure a policy + + wfp = new PriorityBroadcastPolicyManager(); + wfp.setQueue("queue1"); + SubClusterId sc1 = SubClusterId.newInstance("sc1"); + SubClusterId sc2 = SubClusterId.newInstance("sc2"); + policyInfo = new WeightedPolicyInfo(); + + Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); + routerWeights.put(new SubClusterIdInfo(sc1), 0.2f); + routerWeights.put(new SubClusterIdInfo(sc2), 0.8f); + policyInfo.setRouterPolicyWeights(routerWeights); + + ((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo); + + // set expected params that the base test class will use for tests + expectedPolicyManager = PriorityBroadcastPolicyManager.class; + expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class; + expectedRouterPolicy = PriorityRouterPolicy.class; + } + + @Test + public void testPolicyInfoSetCorrectly() throws Exception { + serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, + expectedAMRMProxyPolicy, expectedRouterPolicy); + + // check the policyInfo propagates through ser/der correctly + Assert.assertEquals( + ((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(), + policyInfo); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java new file mode 100644 index 0000000..4975a9f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test of {@link RouterPolicyFacade}. + */ +public class TestRouterPolicyFacade { + + private RouterPolicyFacade routerFacade; + private List<SubClusterId> subClusterIds; + private FederationStateStore store; + private String queue1 = "queue1"; + private String defQueueKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + + @Before + public void setup() throws YarnException { + + // setting up a store and its facade (with caching off) + FederationStateStoreFacade fedFacade = + FederationStateStoreFacade.getInstance(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0"); + store = new MemoryFederationStateStore(); + store.init(conf); + fedFacade.reinitialize(store, conf); + + FederationStateStoreTestUtil storeTestUtil = + new FederationStateStoreTestUtil(store); + storeTestUtil.registerSubClusters(10); + + subClusterIds = storeTestUtil.getAllSubClusterIds(true); + store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest + .newInstance(getUniformPolicy(queue1))); + + SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver(); + routerFacade = new RouterPolicyFacade(new YarnConfiguration(), fedFacade, + resolver, subClusterIds.get(0)); + } + + @Test + public void testConfigurationUpdate() throws YarnException { + + // in this test we see what happens when the configuration is changed + // between calls. We achieve this by changing what is in the store. + + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(applicationSubmissionContext.getQueue()).thenReturn(queue1); + + // first call runs using standard UniformRandomRouterPolicy + SubClusterId chosen = + routerFacade.getHomeSubcluster(applicationSubmissionContext); + Assert.assertTrue(subClusterIds.contains(chosen)); + Assert.assertTrue(routerFacade.globalPolicyMap + .get(queue1) instanceof UniformRandomRouterPolicy); + + // then the operator changes how queue1 is routed setting it to + // PriorityRouterPolicy with weights favoring the first subcluster in + // subClusterIds. + store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest + .newInstance(getPriorityPolicy(queue1))); + + // second call is routed by new policy PriorityRouterPolicy + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + Assert.assertTrue(chosen.equals(subClusterIds.get(0))); + Assert.assertTrue(routerFacade.globalPolicyMap + .get(queue1) instanceof PriorityRouterPolicy); + } + + @Test + public void testGetHomeSubcluster() throws YarnException { + + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(applicationSubmissionContext.getQueue()).thenReturn(queue1); + + // the facade only contains the fallback behavior + Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey) + && routerFacade.globalPolicyMap.size() == 1); + + // when invoked it returns the expected SubClusterId. + SubClusterId chosen = + routerFacade.getHomeSubcluster(applicationSubmissionContext); + Assert.assertTrue(subClusterIds.contains(chosen)); + + // now the caching of policies must have added an entry for this queue + Assert.assertTrue(routerFacade.globalPolicyMap.size() == 2); + + // after the facade is used the policyMap contains the expected policy type. + Assert.assertTrue(routerFacade.globalPolicyMap + .get(queue1) instanceof UniformRandomRouterPolicy); + + // the facade is again empty after reset + routerFacade.reset(); + // the facade only contains the fallback behavior + Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey) + && routerFacade.globalPolicyMap.size() == 1); + + } + + @Test + public void testFallbacks() throws YarnException { + + // this tests the behavior of the system when the queue requested is + // not configured (or null) and there is no default policy configured + // for DEFAULT_FEDERATION_POLICY_KEY (*). This is our second line of + // defense. + + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + + // The facade answers also for non-initialized policies (using the + // defaultPolicy) + String uninitQueue = "non-initialized-queue"; + when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue); + SubClusterId chosen = + routerFacade.getHomeSubcluster(applicationSubmissionContext); + Assert.assertTrue(subClusterIds.contains(chosen)); + Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); + + // empty string + when(applicationSubmissionContext.getQueue()).thenReturn(""); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + Assert.assertTrue(subClusterIds.contains(chosen)); + Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); + + // null queue also falls back to default + when(applicationSubmissionContext.getQueue()).thenReturn(null); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + Assert.assertTrue(subClusterIds.contains(chosen)); + Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); + + } + + public static SubClusterPolicyConfiguration getUniformPolicy(String queue) + throws FederationPolicyInitializationException { + + // we go through standard lifecycle instantiating a policyManager and + // configuring it and serializing it to a conf. + UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager(); + wfp.setQueue(queue); + + SubClusterPolicyConfiguration fpc = wfp.serializeConf(); + + return fpc; + } + + public SubClusterPolicyConfiguration getPriorityPolicy(String queue) + throws FederationPolicyInitializationException { + + // we go through standard lifecycle instantiating a policyManager and + // configuring it and serializing it to a conf. + PriorityBroadcastPolicyManager wfp = new PriorityBroadcastPolicyManager(); + + // equal weight to all subcluster + Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); + for (SubClusterId s : subClusterIds) { + routerWeights.put(new SubClusterIdInfo(s), 0.9f / subClusterIds.size()); + } + + // beside the first one who gets more weight + SubClusterIdInfo favorite = new SubClusterIdInfo((subClusterIds.get(0))); + routerWeights.put(favorite, (0.1f + 0.9f / subClusterIds.size())); + + WeightedPolicyInfo policyInfo = new WeightedPolicyInfo(); + policyInfo.setRouterPolicyWeights(routerWeights); + wfp.setWeightedPolicyInfo(policyInfo); + wfp.setQueue(queue); + + // serializeConf it in a context + SubClusterPolicyConfiguration fpc = wfp.serializeConf(); + + return fpc; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java index c179521..649a61b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.federation.utils; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -29,6 +31,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHome import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -62,8 +65,8 @@ public class FederationStateStoreTestUtil { String webAppAddress = "1.2.3.4:4"; return SubClusterInfo.newInstance(subClusterId, amRMAddress, - clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, - CLOCK.getTime(), "capability"); + clientRMAddress, rmAdminAddress, webAppAddress, + SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability"); } private void registerSubCluster(SubClusterId subClusterId) @@ -97,6 +100,21 @@ public class FederationStateStoreTestUtil { } } + public List<SubClusterId> getAllSubClusterIds( + boolean filterInactiveSubclusters) throws YarnException { + + List<SubClusterInfo> infos = stateStore + .getSubClusters( + GetSubClustersInfoRequest.newInstance(filterInactiveSubclusters)) + .getSubClusters(); + List<SubClusterId> ids = new ArrayList<>(); + for (SubClusterInfo s : infos) { + ids.add(s.getSubClusterId()); + } + + return ids; + } + private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, String policyType) { return SubClusterPolicyConfiguration.newInstance(queueName, policyType, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org