YARN-6247. Share a single instance of SubClusterResolver instead of 
instantiating one per AM. (Botong Huang via Subru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8db7515c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8db7515c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8db7515c

Branch: refs/heads/YARN-2915
Commit: 8db7515ced61195378f83abf76cd639e9a98f9a4
Parents: 12cd0ac
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Mar 2 18:54:53 2017 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Jun 22 14:03:23 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  6 +++
 .../src/main/resources/yarn-default.xml         |  7 +++
 .../resolver/AbstractSubClusterResolver.java    |  6 +--
 .../federation/resolver/SubClusterResolver.java |  4 +-
 .../utils/FederationStateStoreFacade.java       | 48 +++++++++++++++++---
 5 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3436481..b103d2f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2576,6 +2576,12 @@ public class YarnConfiguration extends Configuration {
   public static final String FEDERATION_MACHINE_LIST =
       FEDERATION_PREFIX + "machine-list";
 
+  public static final String FEDERATION_CLUSTER_RESOLVER_CLASS =
+      FEDERATION_PREFIX + "subcluster-resolver.class";
+  public static final String DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS =
+      "org.apache.hadoop.yarn.server.federation.resolver."
+          + "DefaultSubClusterResolverImpl";
+
   public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
 
   public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0d40482..75b32ce 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2683,6 +2683,13 @@
     </description>
     <name>yarn.federation.machine-list</name>
   </property>
+  <property>
+    <description>
+      Class name for SubClusterResolver
+    </description>
+    <name>yarn.federation.subcluster-resolver.class</name>
+    
<value>org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl</value>
+  </property>
 
   <property>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
index 6b4f60c..bccff2d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.federation.resolver;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 
-import java.util.HashMap;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Map;
 
 /**
@@ -31,9 +31,9 @@ import java.util.Map;
  */
 public abstract class AbstractSubClusterResolver implements SubClusterResolver 
{
   private Map<String, SubClusterId> nodeToSubCluster =
-      new HashMap<String, SubClusterId>();
+      new ConcurrentHashMap<String, SubClusterId>();
   private Map<String, Set<SubClusterId>> rackToSubClusters =
-      new HashMap<String, Set<SubClusterId>>();
+      new ConcurrentHashMap<String, Set<SubClusterId>>();
 
   @Override
   public SubClusterId getSubClusterForNode(String nodename)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java
index c6adfa6..612d396 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/SubClusterResolver.java
@@ -25,8 +25,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 
 /**
- * An utility that helps to determine the sub-cluster that a specified node
- * belongs to.
+ * An utility that helps to determine the sub-cluster that a specified node or
+ * rack belongs to. All implementing classes should be thread-safe.
  */
 public interface SubClusterResolver extends Configurable {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8db7515c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 9b794de..e8f245e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -90,6 +91,7 @@ public final class FederationStateStoreFacade {
   private int cacheTimeToLive;
   private Configuration conf;
   private Cache<Object, Object> cache;
+  private SubClusterResolver subclusterResolver;
 
   private FederationStateStoreFacade() {
     initializeFacadeInternal(new Configuration());
@@ -104,6 +106,12 @@ public final class FederationStateStoreFacade {
           FederationStateStore.class, createRetryPolicy(conf));
       this.stateStore.init(conf);
 
+      this.subclusterResolver = createInstance(conf,
+          YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS,
+          YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS,
+          SubClusterResolver.class);
+      this.subclusterResolver.load();
+
       initCache();
 
     } catch (YarnException ex) {
@@ -348,6 +356,15 @@ public final class FederationStateStoreFacade {
   }
 
   /**
+   * Get the singleton instance of SubClusterResolver.
+   *
+   * @return SubClusterResolver instance
+   */
+  public SubClusterResolver getSubClusterResolver() {
+    return this.subclusterResolver;
+  }
+
+  /**
    * Helper method to create instances of Object using the class name defined 
in
    * the configuration object. The instances creates {@link RetryProxy} using
    * the specific {@link RetryPolicy}.
@@ -359,23 +376,40 @@ public final class FederationStateStoreFacade {
    * @param retryPolicy the policy for retrying method call failures
    * @return a retry proxy for the specified interface
    */
-  @SuppressWarnings("unchecked")
   public static <T> Object createRetryInstance(Configuration conf,
       String configuredClassName, String defaultValue, Class<T> type,
       RetryPolicy retryPolicy) {
 
+    return RetryProxy.create(type,
+        createInstance(conf, configuredClassName, defaultValue, type),
+        retryPolicy);
+  }
+
+  /**
+   * Helper method to create instances of Object using the class name specified
+   * in the configuration object.
+   *
+   * @param conf the yarn configuration
+   * @param configuredClassName the configuration provider key
+   * @param defaultValue the default implementation class
+   * @param type the required interface/base class
+   * @param <T> The type of the instance to create
+   * @return the instances created
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T createInstance(Configuration conf,
+      String configuredClassName, String defaultValue, Class<T> type) {
+
     String className = conf.get(configuredClassName, defaultValue);
     try {
       Class<?> clusterResolverClass = conf.getClassByName(className);
       if (type.isAssignableFrom(clusterResolverClass)) {
-        return RetryProxy.create(type,
-            (T) ReflectionUtils.newInstance(clusterResolverClass, conf),
-            retryPolicy);
+        return (T) ReflectionUtils.newInstance(clusterResolverClass, conf);
       } else {
-        throw new YarnRuntimeException(
-            "Class: " + className + " not instance of " + 
type.getSimpleName());
+        throw new YarnRuntimeException("Class: " + className
+            + " not instance of " + type.getCanonicalName());
       }
-    } catch (Exception e) {
+    } catch (ClassNotFoundException e) {
       throw new YarnRuntimeException("Could not instantiate : " + className, 
e);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to