YARN-5407. In-memory based implementation of the 
FederationApplicationStateStore/FederationPolicyStateStore. (Ellen Hui via 
Subru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/97944fe3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/97944fe3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/97944fe3

Branch: refs/heads/YARN-2915
Commit: 97944fe336a27b69db4ed28602372cf1c6d4f180
Parents: a46ceac
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Aug 9 16:07:55 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Oct 7 17:18:07 2016 -0700

----------------------------------------------------------------------
 .../store/impl/MemoryFederationStateStore.java  | 158 +++++++-
 ...SubClusterPoliciesConfigurationsRequest.java |   2 +-
 ...ubClusterPoliciesConfigurationsResponse.java |   2 +-
 ...GetSubClusterPolicyConfigurationRequest.java |   3 +-
 ...etSubClusterPolicyConfigurationResponse.java |   2 +-
 ...SetSubClusterPolicyConfigurationRequest.java |  20 +-
 ...etSubClusterPolicyConfigurationResponse.java |   2 +-
 .../records/SubClusterPolicyConfiguration.java  |  27 +-
 ...tApplicationHomeSubClusterRequestPBImpl.java |   4 +
 ...ClusterPolicyConfigurationRequestPBImpl.java |  17 -
 .../pb/SubClusterPolicyConfigurationPBImpl.java |  17 +
 .../proto/yarn_server_federation_protos.proto   |   8 +-
 .../impl/FederationStateStoreBaseTest.java      | 367 ++++++++++++++++++-
 .../impl/TestMemoryFederationStateStore.java    |   4 +-
 14 files changed, 558 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index cea4ac2..a540dff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -20,35 +20,72 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import 
org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 
 /**
- * In-memory implementation of FederationMembershipStateStore.
+ * In-memory implementation of {@link FederationStateStore}.
  */
-public class MemoryFederationStateStore
-    implements FederationMembershipStateStore {
+public class MemoryFederationStateStore implements FederationStateStore {
+
+  private Map<SubClusterId, SubClusterInfo> membership;
+  private Map<ApplicationId, SubClusterId> applications;
+  private Map<String, SubClusterPolicyConfiguration> policies;
 
-  private final Map<SubClusterId, SubClusterInfo> membership =
-      new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
   private final MonotonicClock clock = new MonotonicClock();
 
   @Override
+  public void init(Configuration conf) {
+    membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
+    applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
+    policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
+  }
+
+  @Override
+  public void close() {
+    membership = null;
+    applications = null;
+    policies = null;
+  }
+
+  @Override
   public SubClusterRegisterResponse registerSubCluster(
       SubClusterRegisterRequest request) throws YarnException {
     SubClusterInfo subClusterInfo = request.getSubClusterInfo();
@@ -116,4 +153,113 @@ public class MemoryFederationStateStore
     return GetSubClustersInfoResponse.newInstance(result);
   }
 
+  // FederationApplicationHomeSubClusterStore methods
+
+  @Override
+  public AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap(
+      AddApplicationHomeSubClusterRequest request) throws YarnException {
+    ApplicationId appId =
+        request.getApplicationHomeSubCluster().getApplicationId();
+    if (applications.containsKey(appId)) {
+      throw new YarnException("Application " + appId + " already exists");
+    }
+
+    applications.put(appId,
+        request.getApplicationHomeSubCluster().getHomeSubCluster());
+    return AddApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public UpdateApplicationHomeSubClusterResponse 
updateApplicationHomeSubClusterMap(
+      UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+    ApplicationId appId =
+        request.getApplicationHomeSubCluster().getApplicationId();
+    if (!applications.containsKey(appId)) {
+      throw new YarnException("Application " + appId + " does not exist");
+    }
+
+    applications.put(appId,
+        request.getApplicationHomeSubCluster().getHomeSubCluster());
+    return UpdateApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
+      GetApplicationHomeSubClusterRequest request) throws YarnException {
+    ApplicationId appId = request.getApplicationId();
+    if (!applications.containsKey(appId)) {
+      throw new YarnException("Application " + appId + " does not exist");
+    }
+
+    return GetApplicationHomeSubClusterResponse.newInstance(
+        ApplicationHomeSubCluster.newInstance(appId, applications.get(appId)));
+  }
+
+  @Override
+  public GetApplicationsHomeSubClusterResponse 
getApplicationsHomeSubClusterMap(
+      GetApplicationsHomeSubClusterRequest request) throws YarnException {
+    List<ApplicationHomeSubCluster> result =
+        new ArrayList<ApplicationHomeSubCluster>();
+    for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
+      result
+          .add(ApplicationHomeSubCluster.newInstance(e.getKey(), 
e.getValue()));
+    }
+
+    GetApplicationsHomeSubClusterResponse.newInstance(result);
+    return GetApplicationsHomeSubClusterResponse.newInstance(result);
+  }
+
+  @Override
+  public DeleteApplicationHomeSubClusterResponse 
deleteApplicationHomeSubClusterMap(
+      DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+    ApplicationId appId = request.getApplicationId();
+    if (!applications.containsKey(appId)) {
+      throw new YarnException("Application " + appId + " does not exist");
+    }
+
+    applications.remove(appId);
+    return DeleteApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+      GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+    String queue = request.getQueue();
+    if (!policies.containsKey(queue)) {
+      throw new YarnException("Policy for queue " + queue + " does not exist");
+    }
+
+    return GetSubClusterPolicyConfigurationResponse
+        .newInstance(policies.get(queue));
+  }
+
+  @Override
+  public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+      SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+    policies.put(request.getPolicyConfiguration().getQueue(),
+        request.getPolicyConfiguration());
+    return SetSubClusterPolicyConfigurationResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+      GetSubClusterPoliciesConfigurationsRequest request) throws YarnException 
{
+    ArrayList<SubClusterPolicyConfiguration> result =
+        new ArrayList<SubClusterPolicyConfiguration>();
+    for (SubClusterPolicyConfiguration policy : policies.values()) {
+      result.add(policy);
+    }
+    return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
+  }
+
+  @Override
+  public Version getCurrentVersion() {
+    return null;
+  }
+
+  @Override
+  public Version loadVersion() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
index 404521b..8cb84f3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.Records;
 @Private
 @Unstable
 public abstract class GetSubClusterPoliciesConfigurationsRequest {
-  public GetSubClusterPoliciesConfigurationsRequest newInstance() {
+  public static GetSubClusterPoliciesConfigurationsRequest newInstance() {
     return Records.newRecord(GetSubClusterPoliciesConfigurationsRequest.class);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
index 6554d68..2eaeb51 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
@@ -36,7 +36,7 @@ public abstract class 
GetSubClusterPoliciesConfigurationsResponse {
 
   @Private
   @Unstable
-  public GetSubClusterPoliciesConfigurationsResponse newInstance(
+  public static GetSubClusterPoliciesConfigurationsResponse newInstance(
       List<SubClusterPolicyConfiguration> policyConfigurations) {
     GetSubClusterPoliciesConfigurationsResponse response =
         Records.newRecord(GetSubClusterPoliciesConfigurationsResponse.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
index 7b7d8c4..c3f49e1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
@@ -33,7 +33,8 @@ public abstract class GetSubClusterPolicyConfigurationRequest 
{
 
   @Private
   @Unstable
-  public GetSubClusterPolicyConfigurationRequest newInstance(String queueName) 
{
+  public static GetSubClusterPolicyConfigurationRequest newInstance(
+      String queueName) {
     GetSubClusterPolicyConfigurationRequest request =
         Records.newRecord(GetSubClusterPolicyConfigurationRequest.class);
     request.setQueue(queueName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
index 11a46e0..350b239 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
@@ -34,7 +34,7 @@ public abstract class 
GetSubClusterPolicyConfigurationResponse {
 
   @Private
   @Unstable
-  public GetSubClusterPolicyConfigurationResponse newInstance(
+  public static GetSubClusterPolicyConfigurationResponse newInstance(
       SubClusterPolicyConfiguration policy) {
     GetSubClusterPolicyConfigurationResponse response =
         Records.newRecord(GetSubClusterPolicyConfigurationResponse.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
index 06d5399..743ad0e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.util.Records;
 public abstract class SetSubClusterPolicyConfigurationRequest {
   @Private
   @Unstable
-  public SetSubClusterPolicyConfigurationRequest newInstance(
+  public static SetSubClusterPolicyConfigurationRequest newInstance(
       SubClusterPolicyConfiguration policy) {
     SetSubClusterPolicyConfigurationRequest request =
         Records.newRecord(SetSubClusterPolicyConfigurationRequest.class);
@@ -41,24 +41,6 @@ public abstract class 
SetSubClusterPolicyConfigurationRequest {
   }
 
   /**
-   * Get the name of the queue for which we are configuring a policy.
-   *
-   * @return the name of the queue
-   */
-  @Public
-  @Unstable
-  public abstract String getQueue();
-
-  /**
-   * Sets the name of the queue for which we are configuring a policy.
-   *
-   * @param queueName the name of the queue
-   */
-  @Private
-  @Unstable
-  public abstract void setQueue(String queueName);
-
-  /**
    * Get the policy configuration assigned to the queue.
    *
    * @return the policy for the specified queue

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
index 33c4043..401e984 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.util.Records;
 @Private
 @Unstable
 public abstract class SetSubClusterPolicyConfigurationResponse {
-  public SetSubClusterPolicyConfigurationResponse newInstance() {
+  public static SetSubClusterPolicyConfigurationResponse newInstance() {
     return Records.newRecord(SetSubClusterPolicyConfigurationResponse.class);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
index bc12acb..2839139 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
@@ -29,8 +29,8 @@ import java.nio.ByteBuffer;
 
 /**
  * {@link SubClusterPolicyConfiguration} is a class that represents a
- * configuration of a policy. It contains a policy type (resolve to a class
- * name) and its params as an opaque {@link ByteBuffer}.
+ * configuration of a policy. For a single queue, it contains a policy type
+ * (resolve to a class name) and its params as an opaque {@link ByteBuffer}.
  *
  * Note: by design the params are an opaque ByteBuffer, this allows for enough
  * flexibility to evolve the policies without impacting the protocols to/from
@@ -42,16 +42,35 @@ public abstract class SubClusterPolicyConfiguration {
 
   @Private
   @Unstable
-  public static SubClusterPolicyConfiguration newInstance(String policyType,
-      ByteBuffer policyParams) {
+  public static SubClusterPolicyConfiguration newInstance(String queue,
+      String policyType, ByteBuffer policyParams) {
     SubClusterPolicyConfiguration policy =
         Records.newRecord(SubClusterPolicyConfiguration.class);
+    policy.setQueue(queue);
     policy.setType(policyType);
     policy.setParams(policyParams);
     return policy;
   }
 
   /**
+   * Get the name of the queue for which we are configuring a policy.
+   *
+   * @return the name of the queue
+   */
+  @Public
+  @Unstable
+  public abstract String getQueue();
+
+  /**
+   * Sets the name of the queue for which we are configuring a policy.
+   *
+   * @param queueName the name of the queue
+   */
+  @Private
+  @Unstable
+  public abstract void setQueue(String queueName);
+
+  /**
    * Get the type of the policy. This could be random, round-robin, load-based,
    * etc.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
index 865d0c4..585ba81 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
@@ -108,6 +108,10 @@ public class GetApplicationHomeSubClusterRequestPBImpl
   public ApplicationId getApplicationId() {
     GetApplicationHomeSubClusterRequestProtoOrBuilder p =
         viaProto ? proto : builder;
+    if (applicationId != null) {
+      return applicationId;
+    }
+
     if (!p.hasApplicationId()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
index 5e29bd5..7b7f89d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
@@ -107,23 +107,6 @@ public class SetSubClusterPolicyConfigurationRequestPBImpl
   }
 
   @Override
-  public String getQueue() {
-    SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
-        viaProto ? proto : builder;
-    return p.getQueue();
-  }
-
-  @Override
-  public void setQueue(String queueName) {
-    maybeInitBuilder();
-    if (queueName == null) {
-      builder.clearQueue();
-      return;
-    }
-    builder.setQueue(queueName);
-  }
-
-  @Override
   public SubClusterPolicyConfiguration getPolicyConfiguration() {
     SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
         viaProto ? proto : builder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
index fe9d9db..305a8d3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
@@ -87,6 +87,23 @@ public class SubClusterPolicyConfigurationPBImpl
   }
 
   @Override
+  public String getQueue() {
+    SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getQueue();
+  }
+
+  @Override
+  public void setQueue(String queueName) {
+    maybeInitBuilder();
+    if (queueName == null) {
+      builder.clearType();
+      return;
+    }
+    builder.setQueue(queueName);
+
+  }
+
+  @Override
   public String getType() {
     SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
     return p.getType();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
index 3f1cee9..11f786f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
@@ -136,8 +136,9 @@ message DeleteApplicationHomeSubClusterResponseProto {
 }
 
 message SubClusterPolicyConfigurationProto {
-  optional string type = 1;
-  optional bytes params = 2;
+  optional string queue = 1;
+  optional string type = 2;
+  optional bytes params = 3;
 }
 
 message GetSubClusterPolicyConfigurationRequestProto {
@@ -149,8 +150,7 @@ message GetSubClusterPolicyConfigurationResponseProto {
 }
 
 message SetSubClusterPolicyConfigurationRequestProto {
-  optional string queue = 1;
-  optional SubClusterPolicyConfigurationProto policy_configuration = 2;
+  optional SubClusterPolicyConfigurationProto policy_configuration = 1;
 }
 
 message SetSubClusterPolicyConfigurationResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index c76a485..165dd78 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -18,18 +18,39 @@
 package org.apache.hadoop.yarn.server.federation.store.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import 
org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.junit.After;
 import org.junit.Assert;
@@ -42,20 +63,21 @@ import org.junit.Test;
 public abstract class FederationStateStoreBaseTest {
 
   private static final MonotonicClock CLOCK = new MonotonicClock();
+  private FederationStateStore stateStore = createStateStore();
 
-  private FederationMembershipStateStore stateStore;
+  protected abstract FederationStateStore createStateStore();
 
   @Before
-  public void before() throws IOException {
-    stateStore = getCleanStateStore();
+  public void before() throws IOException, YarnException {
+    stateStore.init(new Configuration());
   }
 
   @After
-  public void after() {
-    stateStore = null;
+  public void after() throws Exception {
+    stateStore.close();
   }
 
-  protected abstract FederationMembershipStateStore getCleanStateStore();
+  // Test FederationMembershipStateStore
 
   @Test
   public void testRegisterSubCluster() throws Exception {
@@ -72,10 +94,7 @@ public abstract class FederationStateStoreBaseTest {
   @Test
   public void testDeregisterSubCluster() throws Exception {
     SubClusterId subClusterId = SubClusterId.newInstance("SC");
-    SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
-
-    stateStore.registerSubCluster(
-        SubClusterRegisterRequest.newInstance(subClusterInfo));
+    registerSubCluster(subClusterId);
 
     SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
         .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
@@ -105,9 +124,7 @@ public abstract class FederationStateStoreBaseTest {
 
     SubClusterId subClusterId = SubClusterId.newInstance("SC");
     SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
-
-    stateStore.registerSubCluster(
-        SubClusterRegisterRequest.newInstance(subClusterInfo));
+    registerSubCluster(subClusterId);
 
     GetSubClusterInfoRequest request =
         GetSubClusterInfoRequest.newInstance(subClusterId);
@@ -167,10 +184,7 @@ public abstract class FederationStateStoreBaseTest {
   @Test
   public void testSubClusterHeartbeat() throws Exception {
     SubClusterId subClusterId = SubClusterId.newInstance("SC");
-    SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
-
-    stateStore.registerSubCluster(
-        SubClusterRegisterRequest.newInstance(subClusterInfo));
+    registerSubCluster(subClusterId);
 
     SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
         .newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
@@ -196,6 +210,271 @@ public abstract class FederationStateStoreBaseTest {
     }
   }
 
+  // Test FederationApplicationHomeSubClusterStore
+
+  @Test
+  public void testAddApplicationHomeSubClusterMap() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId = SubClusterId.newInstance("SC");
+    ApplicationHomeSubCluster ahsc =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+
+    AddApplicationHomeSubClusterRequest request =
+        AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+    AddApplicationHomeSubClusterResponse response =
+        stateStore.addApplicationHomeSubClusterMap(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
+
+  }
+
+  @Test
+  public void testAddApplicationHomeSubClusterMapAppAlreadyExists()
+      throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+    addApplicationHomeSC(appId, subClusterId1);
+
+    SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+    ApplicationHomeSubCluster ahsc2 =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
+
+    try {
+      stateStore.addApplicationHomeSubClusterMap(
+          AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " already exists"));
+    }
+
+    Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
+
+  }
+
+  @Test
+  public void testDeleteApplicationHomeSubClusterMap() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId = SubClusterId.newInstance("SC");
+    addApplicationHomeSC(appId, subClusterId);
+
+    DeleteApplicationHomeSubClusterRequest delRequest =
+        DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+
+    DeleteApplicationHomeSubClusterResponse response =
+        stateStore.deleteApplicationHomeSubClusterMap(delRequest);
+
+    Assert.assertNotNull(response);
+    try {
+      queryApplicationHomeSC(appId);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId + " does not exist"));
+    }
+
+  }
+
+  @Test
+  public void testDeleteApplicationHomeSubClusterMapUnknownApp()
+      throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    DeleteApplicationHomeSubClusterRequest delRequest =
+        DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+
+    try {
+      stateStore.deleteApplicationHomeSubClusterMap(delRequest);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " does not exist"));
+    }
+  }
+
+  @Test
+  public void testGetApplicationHomeSubClusterMap() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId = SubClusterId.newInstance("SC");
+    addApplicationHomeSC(appId, subClusterId);
+
+    GetApplicationHomeSubClusterRequest getRequest =
+        GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+    GetApplicationHomeSubClusterResponse result =
+        stateStore.getApplicationHomeSubClusterMap(getRequest);
+
+    Assert.assertEquals(appId,
+        result.getApplicationHomeSubCluster().getApplicationId());
+    Assert.assertEquals(subClusterId,
+        result.getApplicationHomeSubCluster().getHomeSubCluster());
+  }
+
+  @Test
+  public void testGetApplicationHomeSubClusterMapUnknownApp() throws Exception 
{
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    GetApplicationHomeSubClusterRequest request =
+        GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+    try {
+      stateStore.getApplicationHomeSubClusterMap(request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " does not exist"));
+    }
+  }
+
+  @Test
+  public void testGetApplicationsHomeSubClusterMap() throws Exception {
+    ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+    ApplicationHomeSubCluster ahsc1 =
+        ApplicationHomeSubCluster.newInstance(appId1, subClusterId1);
+
+    ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+    SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+    ApplicationHomeSubCluster ahsc2 =
+        ApplicationHomeSubCluster.newInstance(appId2, subClusterId2);
+
+    addApplicationHomeSC(appId1, subClusterId1);
+    addApplicationHomeSC(appId2, subClusterId2);
+
+    GetApplicationsHomeSubClusterRequest getRequest =
+        GetApplicationsHomeSubClusterRequest.newInstance();
+
+    GetApplicationsHomeSubClusterResponse result =
+        stateStore.getApplicationsHomeSubClusterMap(getRequest);
+
+    Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
+    Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
+    Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
+  }
+
+  @Test
+  public void testUpdateApplicationHomeSubClusterMap() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+    addApplicationHomeSC(appId, subClusterId1);
+
+    SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+    ApplicationHomeSubCluster ahscUpdate =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
+
+    UpdateApplicationHomeSubClusterRequest updateRequest =
+        UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate);
+
+    UpdateApplicationHomeSubClusterResponse response =
+        stateStore.updateApplicationHomeSubClusterMap(updateRequest);
+
+    Assert.assertNotNull(response);
+
+    Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId));
+  }
+
+  @Test
+  public void testUpdateApplicationHomeSubClusterMapUnknownApp()
+      throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+    ApplicationHomeSubCluster ahsc =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
+
+    UpdateApplicationHomeSubClusterRequest updateRequest =
+        UpdateApplicationHomeSubClusterRequest.newInstance(ahsc);
+
+    try {
+      stateStore.updateApplicationHomeSubClusterMap((updateRequest));
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " does not exist"));
+    }
+  }
+
+  // Test FederationPolicyStore
+
+  @Test
+  public void testSetPolicyConfiguration() throws Exception {
+    SetSubClusterPolicyConfigurationRequest request =
+        SetSubClusterPolicyConfigurationRequest
+            .newInstance(createSCPolicyConf("Queue", "PolicyType"));
+
+    SetSubClusterPolicyConfigurationResponse result =
+        stateStore.setPolicyConfiguration(request);
+
+    Assert.assertNotNull(result);
+    Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
+        queryPolicy("Queue"));
+
+  }
+
+  @Test
+  public void testSetPolicyConfigurationUpdateExisting() throws Exception {
+    setPolicyConf("Queue", "PolicyType1");
+
+    SetSubClusterPolicyConfigurationRequest request2 =
+        SetSubClusterPolicyConfigurationRequest
+            .newInstance(createSCPolicyConf("Queue", "PolicyType2"));
+    SetSubClusterPolicyConfigurationResponse result =
+        stateStore.setPolicyConfiguration(request2);
+
+    Assert.assertNotNull(result);
+    Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"),
+        queryPolicy("Queue"));
+  }
+
+  @Test
+  public void testGetPolicyConfiguration() throws Exception {
+    setPolicyConf("Queue", "PolicyType");
+
+    GetSubClusterPolicyConfigurationRequest getRequest =
+        GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
+    GetSubClusterPolicyConfigurationResponse result =
+        stateStore.getPolicyConfiguration(getRequest);
+
+    Assert.assertNotNull(result);
+    Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
+        result.getPolicyConfiguration());
+
+  }
+
+  @Test
+  public void testGetPolicyConfigurationUnknownQueue() throws Exception {
+
+    GetSubClusterPolicyConfigurationRequest request =
+        GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
+    try {
+      stateStore.getPolicyConfiguration(request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Policy for queue Queue does not exist"));
+    }
+  }
+
+  @Test
+  public void testGetPoliciesConfigurations() throws Exception {
+    setPolicyConf("Queue1", "PolicyType1");
+    setPolicyConf("Queue2", "PolicyType2");
+
+    GetSubClusterPoliciesConfigurationsResponse response =
+        stateStore.getPoliciesConfigurations(
+            GetSubClusterPoliciesConfigurationsRequest.newInstance());
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(response.getPoliciesConfigs());
+
+    Assert.assertEquals(2, response.getPoliciesConfigs().size());
+
+    Assert.assertTrue(response.getPoliciesConfigs()
+        .contains(createSCPolicyConf("Queue1", "PolicyType1")));
+    Assert.assertTrue(response.getPoliciesConfigs()
+        .contains(createSCPolicyConf("Queue2", "PolicyType2")));
+  }
+
+  // Convenience methods
+
   private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
 
     String amRMAddress = "1.2.3.4:1";
@@ -208,6 +487,37 @@ public abstract class FederationStateStoreBaseTest {
         CLOCK.getTime(), "cabability");
   }
 
+  private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
+      String policyType) {
+    return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
+        ByteBuffer.allocate(1));
+  }
+
+  private void addApplicationHomeSC(ApplicationId appId,
+      SubClusterId subClusterId) throws YarnException {
+    ApplicationHomeSubCluster ahsc =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+    AddApplicationHomeSubClusterRequest request =
+        AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+    stateStore.addApplicationHomeSubClusterMap(request);
+  }
+
+  private void setPolicyConf(String queue, String policyType)
+      throws YarnException {
+    SetSubClusterPolicyConfigurationRequest request =
+        SetSubClusterPolicyConfigurationRequest
+            .newInstance(createSCPolicyConf(queue, policyType));
+    stateStore.setPolicyConfiguration(request);
+  }
+
+  private void registerSubCluster(SubClusterId subClusterId)
+      throws YarnException {
+
+    SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+    stateStore.registerSubCluster(
+        SubClusterRegisterRequest.newInstance(subClusterInfo));
+  }
+
   private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
       throws YarnException {
     GetSubClusterInfoRequest request =
@@ -215,4 +525,25 @@ public abstract class FederationStateStoreBaseTest {
     return stateStore.getSubCluster(request).getSubClusterInfo();
   }
 
+  private SubClusterId queryApplicationHomeSC(ApplicationId appId)
+      throws YarnException {
+    GetApplicationHomeSubClusterRequest request =
+        GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+    GetApplicationHomeSubClusterResponse response =
+        stateStore.getApplicationHomeSubClusterMap(request);
+
+    return response.getApplicationHomeSubCluster().getHomeSubCluster();
+  }
+
+  private SubClusterPolicyConfiguration queryPolicy(String queue)
+      throws YarnException {
+    GetSubClusterPolicyConfigurationRequest request =
+        GetSubClusterPolicyConfigurationRequest.newInstance(queue);
+
+    GetSubClusterPolicyConfigurationResponse result =
+        stateStore.getPolicyConfiguration(request);
+    return result.getPolicyConfiguration();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/97944fe3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index 9396eda..74404c7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -17,7 +17,7 @@
 
 package org.apache.hadoop.yarn.server.federation.store.impl;
 
-import 
org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 
 /**
  * Unit tests for MemoryFederationStateStore.
@@ -26,7 +26,7 @@ public class TestMemoryFederationStateStore
     extends FederationStateStoreBaseTest {
 
   @Override
-  protected FederationMembershipStateStore getCleanStateStore() {
+  protected FederationStateStore createStateStore() {
     return new MemoryFederationStateStore();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to