YARN-5634. Simplify initialization/use of RouterPolicy via a 
RouterPolicyFacade. (Carlo Curino via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5579b223
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5579b223
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5579b223

Branch: refs/heads/YARN-2915
Commit: 5579b2234d5732ab93004854a76d169f24070e23
Parents: f5fafdb
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Nov 16 19:39:25 2016 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Tue Nov 22 18:37:54 2016 -0800

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   9 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  13 +
 .../yarn/conf/TestYarnConfigurationFields.java  |  12 +
 ...ionPolicyInitializationContextValidator.java |   2 +-
 .../PriorityBroadcastPolicyManager.java         |  66 +++++
 .../federation/policies/RouterPolicyFacade.java | 266 +++++++++++++++++++
 .../policies/dao/WeightedPolicyInfo.java        |   6 +-
 .../utils/FederationStateStoreFacade.java       |  16 +-
 .../TestPriorityBroadcastPolicyManager.java     |  72 +++++
 .../policies/TestRouterPolicyFacade.java        | 220 +++++++++++++++
 .../utils/FederationStateStoreTestUtil.java     |  22 +-
 11 files changed, 693 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0e94b70..148ecc2 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -314,6 +314,15 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC"/>
   </Match>
 
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade"/>
+    <Or>
+      <Field name="globalConfMap"/>
+      <Field name="globalPolicyMap"/>
+    </Or>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+  </Match>
+
   <!-- Don't care if putIfAbsent value is ignored -->
   <Match>
     <Package name="org.apache.hadoop.yarn.factories.impl.pb" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index eeb8258..6def749 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2522,6 +2522,19 @@ public class YarnConfiguration extends Configuration {
   public static final String FEDERATION_MACHINE_LIST =
       FEDERATION_PREFIX + "machine-list";
 
+  public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
+
+  public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+      + "policy-manager";
+
+  public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
+      + 
".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager";
+
+  public static final String FEDERATION_POLICY_MANAGER_PARAMS =
+      FEDERATION_PREFIX + "policy-manager-params";
+
+  public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 5958d4d..b3230ac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -102,6 +102,18 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(YarnConfiguration.RM_EPOCH);
 
+    // Federation policies configs to be ignored
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_POLICY_MANAGER);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+
     // Ignore blacklisting nodes for AM failures feature since it is still a
     // "work in progress"
     configurationPropsToSkipCompare.add(YarnConfiguration.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
index 1b83bbc..3c44e7e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
@@ -57,7 +57,7 @@ public final class 
FederationPolicyInitializationContextValidator {
 
     if (policyContext.getSubClusterPolicyConfiguration() == null) {
       throw new FederationPolicyInitializationException(
-          "The FederationSubclusterResolver provided is null. Cannot "
+          "The SubClusterPolicyConfiguration provided is null. Cannot "
               + "reinitalize successfully.");
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
new file mode 100644
index 0000000..ebdcf42
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link PriorityRouterPolicy} for the router and a
+ * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
+
+  private WeightedPolicyInfo weightedPolicyInfo;
+
+  public PriorityBroadcastPolicyManager() {
+    // this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = PriorityRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+    weightedPolicyInfo = new WeightedPolicyInfo();
+  }
+
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+    return SubClusterPolicyConfiguration.newInstance(getQueue(),
+        this.getClass().getCanonicalName(), buf);
+  }
+
+  @VisibleForTesting
+  public WeightedPolicyInfo getWeightedPolicyInfo() {
+    return weightedPolicyInfo;
+  }
+
+  @VisibleForTesting
+  public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
+    this.weightedPolicyInfo = weightedPolicyInfo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
new file mode 100644
index 0000000..a3fd15a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class provides a facade to the policy subsystem, and handles the
+ * lifecycle of policies (e.g., refresh from remote, default behaviors etc.).
+ */
+public class RouterPolicyFacade {
+
+  private static final Log LOG =
+      LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class);
+
+  private final SubClusterResolver subClusterResolver;
+  private final FederationStateStoreFacade federationFacade;
+  private Map<String, SubClusterPolicyConfiguration> globalConfMap;
+
+  @VisibleForTesting
+  Map<String, FederationRouterPolicy> globalPolicyMap;
+
+  public RouterPolicyFacade(YarnConfiguration conf,
+      FederationStateStoreFacade facade, SubClusterResolver resolver,
+      SubClusterId homeSubcluster)
+      throws FederationPolicyInitializationException {
+
+    this.federationFacade = facade;
+    this.subClusterResolver = resolver;
+    this.globalConfMap = new ConcurrentHashMap<>();
+    this.globalPolicyMap = new ConcurrentHashMap<>();
+
+    // load default behavior from store if possible
+    String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+    SubClusterPolicyConfiguration configuration = null;
+    try {
+      configuration = federationFacade.getPolicyConfiguration(defaulKey);
+    } catch (YarnException e) {
+      LOG.warn("No fallback behavior defined in store, defaulting to XML "
+          + "configuration fallback behavior.");
+    }
+
+    // or from XML conf otherwise.
+    if (configuration == null) {
+      String defaultFederationPolicyManager =
+          conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+              YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+      String defaultPolicyParamString =
+          conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
+              YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+      ByteBuffer defaultPolicyParam = ByteBuffer
+          .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
+
+      configuration = SubClusterPolicyConfiguration.newInstance(defaulKey,
+          defaultFederationPolicyManager, defaultPolicyParam);
+    }
+
+    // construct the required policy manager
+    FederationPolicyInitializationContext fallbackContext =
+        new FederationPolicyInitializationContext(configuration,
+            subClusterResolver, federationFacade, homeSubcluster);
+    FederationPolicyManager fallbackPolicyManager =
+        instantiatePolicyManager(configuration.getType());
+    fallbackPolicyManager.setQueue(defaulKey);
+
+    // add to the cache the fallback behavior
+    globalConfMap.put(defaulKey,
+        fallbackContext.getSubClusterPolicyConfiguration());
+    globalPolicyMap.put(defaulKey,
+        fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
+
+  }
+
+  /**
+   * This method provides a wrapper of all policy functionalities for routing .
+   * Internally it manages configuration changes, and policy init/reinit.
+   *
+   * @param appSubmissionContext the application to route.
+   *
+   * @return the id of the subcluster that will be the "home" for this
+   *         application.
+   *
+   * @throws YarnException if there are issues initializing policies, or no
+   *           valid sub-cluster id could be found for this app.
+   */
+  public SubClusterId getHomeSubcluster(
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+
+    // the maps are concurrent, but we need to protect from reset()
+    // reinitialization mid-execution by creating a new reference local to this
+    // method.
+    Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
+    Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
+
+    if (appSubmissionContext == null) {
+      throw new FederationPolicyException(
+          "The ApplicationSubmissionContext " + "cannot be null.");
+    }
+
+    String queue = appSubmissionContext.getQueue();
+
+    // respecting YARN behavior we assume default queue if the queue is not
+    // specified. This also ensures that "null" can be used as a key to get the
+    // default behavior.
+    if (queue == null) {
+      queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
+    }
+
+    // the facade might cache this request, based on its parameterization
+    SubClusterPolicyConfiguration configuration = null;
+
+    try {
+      configuration = federationFacade.getPolicyConfiguration(queue);
+    } catch (YarnException e) {
+      LOG.debug(e);
+    }
+
+    // If there is no policy configured for this queue, fallback to the 
baseline
+    // policy that is configured either in the store or via XML config (and
+    // cached)
+    if (configuration == null) {
+      try {
+        LOG.warn("There is no policies configured for queue: " + queue + " we"
+            + " fallback to default policy for: "
+            + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+
+        queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+        configuration = federationFacade.getPolicyConfiguration(
+            YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+      } catch (YarnException e) {
+        // the fallback is not configure via store, but via XML, using
+        // previously loaded configuration.
+        configuration =
+            cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+      }
+    }
+
+    // if the configuration has changed since last loaded, reinit the policy
+    // based on current configuration
+    if (!cachedConfs.containsKey(queue)
+        || !cachedConfs.get(queue).equals(configuration)) {
+      singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
+    }
+
+    FederationRouterPolicy policy = policyMap.get(queue);
+    if (policy == null) {
+      // this should never happen, as the to maps are updated together
+      throw new FederationPolicyException("No FederationRouterPolicy found "
+          + "for queue: " + appSubmissionContext.getQueue() + " (for "
+          + "application: " + appSubmissionContext.getApplicationId() + ") "
+          + "and no default specified.");
+    }
+
+    return policy.getHomeSubcluster(appSubmissionContext);
+  }
+
+  /**
+   * This method reinitializes a policy and loads it in the policyMap.
+   *
+   * @param queue the queue to initialize a policy for.
+   * @param conf the configuration to use for initalization.
+   *
+   * @throws FederationPolicyInitializationException if initialization fails.
+   */
+  private void singlePolicyReinit(Map<String, FederationRouterPolicy> 
policyMap,
+      Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
+      SubClusterPolicyConfiguration conf)
+      throws FederationPolicyInitializationException {
+
+    FederationPolicyInitializationContext context =
+        new FederationPolicyInitializationContext(conf, subClusterResolver,
+            federationFacade, null);
+    String newType = context.getSubClusterPolicyConfiguration().getType();
+    FederationRouterPolicy routerPolicy = policyMap.get(queue);
+
+    FederationPolicyManager federationPolicyManager =
+        instantiatePolicyManager(newType);
+    // set queue, reinit policy if required (implementation lazily check
+    // content of conf), and cache it
+    federationPolicyManager.setQueue(queue);
+    routerPolicy =
+        federationPolicyManager.getRouterPolicy(context, routerPolicy);
+
+    // we need the two put to be atomic (across multiple threads invoking
+    // this and reset operations)
+    synchronized (this) {
+      policyMap.put(queue, routerPolicy);
+      cachedConfs.put(queue, conf);
+    }
+  }
+
+  private static FederationPolicyManager instantiatePolicyManager(
+      String newType) throws FederationPolicyInitializationException {
+    FederationPolicyManager federationPolicyManager = null;
+    try {
+      // create policy instance and set queue
+      Class c = Class.forName(newType);
+      federationPolicyManager = (FederationPolicyManager) c.newInstance();
+    } catch (ClassNotFoundException e) {
+      throw new FederationPolicyInitializationException(e);
+    } catch (InstantiationException e) {
+      throw new FederationPolicyInitializationException(e);
+    } catch (IllegalAccessException e) {
+      throw new FederationPolicyInitializationException(e);
+    }
+    return federationPolicyManager;
+  }
+
+  /**
+   * This method flushes all cached configurations and policies. This should be
+   * invoked if the facade remains activity after very large churn of queues in
+   * the system.
+   */
+  public synchronized void reset() {
+
+    // remember the fallBack
+    SubClusterPolicyConfiguration conf =
+        globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+    FederationRouterPolicy policy =
+        globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+
+    globalConfMap = new ConcurrentHashMap<>();
+    globalPolicyMap = new ConcurrentHashMap<>();
+
+    // add to the cache a fallback with keyword null
+    globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf);
+    globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY,
+        policy);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
index 62eb03b..e7b8afe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.federation.policies.dao;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -100,7 +100,7 @@ public class WeightedPolicyInfo {
       JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
       final byte[] bytes = new byte[bb.remaining()];
       bb.get(bytes);
-      String params = new String(bytes, Charset.forName("UTF-8"));
+      String params = new String(bytes, StandardCharsets.UTF_8);
 
       WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON(
           new StringReader(params), WeightedPolicyInfo.class);
@@ -164,7 +164,7 @@ public class WeightedPolicyInfo {
     }
     try {
       String s = toJSONString();
-      return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8")));
+      return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
     } catch (JAXBException j) {
       throw new FederationPolicyInitializationException(j);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 66a0b60..9b794de 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoR
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -262,12 +263,17 @@ public final class FederationStateStoreFacade {
     if (isCachingEnabled()) {
       return getPoliciesConfigurations().get(queue);
     } else {
-      return stateStore
-          .getPolicyConfiguration(
-              GetSubClusterPolicyConfigurationRequest.newInstance(queue))
-          .getPolicyConfiguration();
-    }
 
+      GetSubClusterPolicyConfigurationResponse response =
+          stateStore.getPolicyConfiguration(
+              GetSubClusterPolicyConfigurationRequest.newInstance(queue));
+      if (response == null) {
+        throw new YarnException("The stateStore returned a null for "
+            + "GetSubClusterPolicyConfigurationResponse for queue " + queue);
+      } else {
+        return response.getPolicyConfiguration();
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
new file mode 100644
index 0000000..5e5bc83
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test of {@link PriorityBroadcastPolicyManager}.
+ */
+public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest {
+
+  private WeightedPolicyInfo policyInfo;
+
+  @Before
+  public void setup() {
+    // configure a policy
+
+    wfp = new PriorityBroadcastPolicyManager();
+    wfp.setQueue("queue1");
+    SubClusterId sc1 = SubClusterId.newInstance("sc1");
+    SubClusterId sc2 = SubClusterId.newInstance("sc2");
+    policyInfo = new WeightedPolicyInfo();
+
+    Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+    routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
+    routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
+    policyInfo.setRouterPolicyWeights(routerWeights);
+
+    ((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo);
+
+    // set expected params that the base test class will use for tests
+    expectedPolicyManager = PriorityBroadcastPolicyManager.class;
+    expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
+    expectedRouterPolicy = PriorityRouterPolicy.class;
+  }
+
+  @Test
+  public void testPolicyInfoSetCorrectly() throws Exception {
+    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+        expectedAMRMProxyPolicy, expectedRouterPolicy);
+
+    // check the policyInfo propagates through ser/der correctly
+    Assert.assertEquals(
+        ((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(),
+        policyInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
new file mode 100644
index 0000000..4975a9f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test of {@link RouterPolicyFacade}.
+ */
+public class TestRouterPolicyFacade {
+
+  private RouterPolicyFacade routerFacade;
+  private List<SubClusterId> subClusterIds;
+  private FederationStateStore store;
+  private String queue1 = "queue1";
+  private String defQueueKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+
+  @Before
+  public void setup() throws YarnException {
+
+    // setting up a store and its facade (with caching off)
+    FederationStateStoreFacade fedFacade =
+        FederationStateStoreFacade.getInstance();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0");
+    store = new MemoryFederationStateStore();
+    store.init(conf);
+    fedFacade.reinitialize(store, conf);
+
+    FederationStateStoreTestUtil storeTestUtil =
+        new FederationStateStoreTestUtil(store);
+    storeTestUtil.registerSubClusters(10);
+
+    subClusterIds = storeTestUtil.getAllSubClusterIds(true);
+    store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+        .newInstance(getUniformPolicy(queue1)));
+
+    SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
+    routerFacade = new RouterPolicyFacade(new YarnConfiguration(), fedFacade,
+        resolver, subClusterIds.get(0));
+  }
+
+  @Test
+  public void testConfigurationUpdate() throws YarnException {
+
+    // in this test we see what happens when the configuration is changed
+    // between calls. We achieve this by changing what is in the store.
+
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
+
+    // first call runs using standard UniformRandomRouterPolicy
+    SubClusterId chosen =
+        routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertTrue(subClusterIds.contains(chosen));
+    Assert.assertTrue(routerFacade.globalPolicyMap
+        .get(queue1) instanceof UniformRandomRouterPolicy);
+
+    // then the operator changes how queue1 is routed setting it to
+    // PriorityRouterPolicy with weights favoring the first subcluster in
+    // subClusterIds.
+    store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+        .newInstance(getPriorityPolicy(queue1)));
+
+    // second call is routed by new policy PriorityRouterPolicy
+    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
+    Assert.assertTrue(routerFacade.globalPolicyMap
+        .get(queue1) instanceof PriorityRouterPolicy);
+  }
+
+  @Test
+  public void testGetHomeSubcluster() throws YarnException {
+
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
+
+    // the facade only contains the fallback behavior
+    Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
+        && routerFacade.globalPolicyMap.size() == 1);
+
+    // when invoked it returns the expected SubClusterId.
+    SubClusterId chosen =
+        routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertTrue(subClusterIds.contains(chosen));
+
+    // now the caching of policies must have added an entry for this queue
+    Assert.assertTrue(routerFacade.globalPolicyMap.size() == 2);
+
+    // after the facade is used the policyMap contains the expected policy 
type.
+    Assert.assertTrue(routerFacade.globalPolicyMap
+        .get(queue1) instanceof UniformRandomRouterPolicy);
+
+    // the facade is again empty after reset
+    routerFacade.reset();
+    // the facade only contains the fallback behavior
+    Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
+        && routerFacade.globalPolicyMap.size() == 1);
+
+  }
+
+  @Test
+  public void testFallbacks() throws YarnException {
+
+    // this tests the behavior of the system when the queue requested is
+    // not configured (or null) and there is no default policy configured
+    // for DEFAULT_FEDERATION_POLICY_KEY (*). This is our second line of
+    // defense.
+
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+
+    // The facade answers also for non-initialized policies (using the
+    // defaultPolicy)
+    String uninitQueue = "non-initialized-queue";
+    when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
+    SubClusterId chosen =
+        routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertTrue(subClusterIds.contains(chosen));
+    Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
+
+    // empty string
+    when(applicationSubmissionContext.getQueue()).thenReturn("");
+    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertTrue(subClusterIds.contains(chosen));
+    Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
+
+    // null queue also falls back to default
+    when(applicationSubmissionContext.getQueue()).thenReturn(null);
+    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertTrue(subClusterIds.contains(chosen));
+    Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
+
+  }
+
+  public static SubClusterPolicyConfiguration getUniformPolicy(String queue)
+      throws FederationPolicyInitializationException {
+
+    // we go through standard lifecycle instantiating a policyManager and
+    // configuring it and serializing it to a conf.
+    UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager();
+    wfp.setQueue(queue);
+
+    SubClusterPolicyConfiguration fpc = wfp.serializeConf();
+
+    return fpc;
+  }
+
+  public SubClusterPolicyConfiguration getPriorityPolicy(String queue)
+      throws FederationPolicyInitializationException {
+
+    // we go through standard lifecycle instantiating a policyManager and
+    // configuring it and serializing it to a conf.
+    PriorityBroadcastPolicyManager wfp = new PriorityBroadcastPolicyManager();
+
+    // equal weight to all subcluster
+    Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+    for (SubClusterId s : subClusterIds) {
+      routerWeights.put(new SubClusterIdInfo(s), 0.9f / subClusterIds.size());
+    }
+
+    // beside the first one who gets more weight
+    SubClusterIdInfo favorite = new SubClusterIdInfo((subClusterIds.get(0)));
+    routerWeights.put(favorite, (0.1f + 0.9f / subClusterIds.size()));
+
+    WeightedPolicyInfo policyInfo = new WeightedPolicyInfo();
+    policyInfo.setRouterPolicyWeights(routerWeights);
+    wfp.setWeightedPolicyInfo(policyInfo);
+    wfp.setQueue(queue);
+
+    // serializeConf it in a context
+    SubClusterPolicyConfiguration fpc = wfp.serializeConf();
+
+    return fpc;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5579b223/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
index c179521..649a61b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.federation.utils;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -29,6 +31,7 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHome
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -62,8 +65,8 @@ public class FederationStateStoreTestUtil {
     String webAppAddress = "1.2.3.4:4";
 
     return SubClusterInfo.newInstance(subClusterId, amRMAddress,
-        clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
-        CLOCK.getTime(), "capability");
+        clientRMAddress, rmAdminAddress, webAppAddress,
+        SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
   }
 
   private void registerSubCluster(SubClusterId subClusterId)
@@ -97,6 +100,21 @@ public class FederationStateStoreTestUtil {
     }
   }
 
+  public List<SubClusterId> getAllSubClusterIds(
+      boolean filterInactiveSubclusters) throws YarnException {
+
+    List<SubClusterInfo> infos = stateStore
+        .getSubClusters(
+            GetSubClustersInfoRequest.newInstance(filterInactiveSubclusters))
+        .getSubClusters();
+    List<SubClusterId> ids = new ArrayList<>();
+    for (SubClusterInfo s : infos) {
+      ids.add(s.getSubClusterId());
+    }
+
+    return ids;
+  }
+
   private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
       String policyType) {
     return SubClusterPolicyConfiguration.newInstance(queueName, policyType,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to