YARN-6247. Share a single instance of SubClusterResolver instead of instantiating one per AM. (Botong Huang via Subru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8db7515c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8db7515c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8db7515c Branch: refs/heads/YARN-2915 Commit: 8db7515ced61195378f83abf76cd639e9a98f9a4 Parents: 12cd0ac Author: Subru Krishnan <su...@apache.org> Authored: Thu Mar 2 18:54:53 2017 -0800 Committer: Subru Krishnan <su...@apache.org> Committed: Thu Jun 22 14:03:23 2017 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 6 +++ .../src/main/resources/yarn-default.xml | 7 +++ .../resolver/AbstractSubClusterResolver.java | 6 +-- .../federation/resolver/SubClusterResolver.java | 4 +- .../utils/FederationStateStoreFacade.java | 48 +++++++++++++++++--- 5 files changed, 59 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3436481..b103d2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2576,6 +2576,12 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_MACHINE_LIST = FEDERATION_PREFIX + "machine-list"; + public static final String FEDERATION_CLUSTER_RESOLVER_CLASS = + FEDERATION_PREFIX + "subcluster-resolver.class"; + public static final String DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS = + "org.apache.hadoop.yarn.server.federation.resolver." + + "DefaultSubClusterResolverImpl"; + public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0d40482..75b32ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2683,6 +2683,13 @@ </description> <name>yarn.federation.machine-list</name> </property> + <property> + <description> + Class name for SubClusterResolver + </description> + <name>yarn.federation.subcluster-resolver.class</name> + <value>org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl</value> + </property> <property> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java index 6b4f60c..bccff2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java @@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.federation.resolver; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import java.util.HashMap; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.Map; /** @@ -31,9 +31,9 @@ import java.util.Map; */ public abstract class AbstractSubClusterResolver implements SubClusterResolver { private Map<String, SubClusterId> nodeToSubCluster = - new HashMap<String, SubClusterId>(); + new ConcurrentHashMap<String, SubClusterId>(); private Map<String, Set<SubClusterId>> rackToSubClusters = - new HashMap<String, Set<SubClusterId>>(); + new ConcurrentHashMap<String, Set<SubClusterId>>(); @Override public SubClusterId getSubClusterForNode(String nodename) http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java index c6adfa6..612d396 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java @@ -25,8 +25,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; /** - * An utility that helps to determine the sub-cluster that a specified node - * belongs to. + * An utility that helps to determine the sub-cluster that a specified node or + * rack belongs to. All implementing classes should be thread-safe. */ public interface SubClusterResolver extends Configurable { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 9b794de..e8f245e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -90,6 +91,7 @@ public final class FederationStateStoreFacade { private int cacheTimeToLive; private Configuration conf; private Cache<Object, Object> cache; + private SubClusterResolver subclusterResolver; private FederationStateStoreFacade() { initializeFacadeInternal(new Configuration()); @@ -104,6 +106,12 @@ public final class FederationStateStoreFacade { FederationStateStore.class, createRetryPolicy(conf)); this.stateStore.init(conf); + this.subclusterResolver = createInstance(conf, + YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS, + SubClusterResolver.class); + this.subclusterResolver.load(); + initCache(); } catch (YarnException ex) { @@ -348,6 +356,15 @@ public final class FederationStateStoreFacade { } /** + * Get the singleton instance of SubClusterResolver. + * + * @return SubClusterResolver instance + */ + public SubClusterResolver getSubClusterResolver() { + return this.subclusterResolver; + } + + /** * Helper method to create instances of Object using the class name defined in * the configuration object. The instances creates {@link RetryProxy} using * the specific {@link RetryPolicy}. @@ -359,23 +376,40 @@ public final class FederationStateStoreFacade { * @param retryPolicy the policy for retrying method call failures * @return a retry proxy for the specified interface */ - @SuppressWarnings("unchecked") public static <T> Object createRetryInstance(Configuration conf, String configuredClassName, String defaultValue, Class<T> type, RetryPolicy retryPolicy) { + return RetryProxy.create(type, + createInstance(conf, configuredClassName, defaultValue, type), + retryPolicy); + } + + /** + * Helper method to create instances of Object using the class name specified + * in the configuration object. + * + * @param conf the yarn configuration + * @param configuredClassName the configuration provider key + * @param defaultValue the default implementation class + * @param type the required interface/base class + * @param <T> The type of the instance to create + * @return the instances created + */ + @SuppressWarnings("unchecked") + public static <T> T createInstance(Configuration conf, + String configuredClassName, String defaultValue, Class<T> type) { + String className = conf.get(configuredClassName, defaultValue); try { Class<?> clusterResolverClass = conf.getClassByName(className); if (type.isAssignableFrom(clusterResolverClass)) { - return RetryProxy.create(type, - (T) ReflectionUtils.newInstance(clusterResolverClass, conf), - retryPolicy); + return (T) ReflectionUtils.newInstance(clusterResolverClass, conf); } else { - throw new YarnRuntimeException( - "Class: " + className + " not instance of " + type.getSimpleName()); + throw new YarnRuntimeException("Class: " + className + + " not instance of " + type.getCanonicalName()); } - } catch (Exception e) { + } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate : " + className, e); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org