YARN-6511. Federation: transparently spanning application across multiple 
sub-clusters. (Botong Huang via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b71ac1e4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b71ac1e4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b71ac1e4

Branch: refs/heads/YARN-2915
Commit: b71ac1e42bc3887b25660fea5f26195f9a154ac1
Parents: 7c29cf6
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Jun 7 14:45:51 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Jun 22 14:11:06 2017 -0700

----------------------------------------------------------------------
 .../policies/FederationPolicyUtils.java         | 168 +++++
 .../federation/policies/RouterPolicyFacade.java |  21 +-
 .../amrmproxy/FederationInterceptor.java        | 685 ++++++++++++++++++-
 .../amrmproxy/TestFederationInterceptor.java    | 251 +++++++
 .../TestableFederationInterceptor.java          |   6 +
 5 files changed, 1095 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
new file mode 100644
index 0000000..37ce942
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for Federation policy.
+ */
+@Private
+public final class FederationPolicyUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationPolicyUtils.class);
+
+  /** Disable constructor. */
+  private FederationPolicyUtils() {
+  }
+
+  /**
+   * A utilize method to instantiate a policy manager class given the type
+   * (class name) from {@link SubClusterPolicyConfiguration}.
+   *
+   * @param newType class name of the policy manager to create
+   * @return Policy manager
+   * @throws FederationPolicyInitializationException if fails
+   */
+  public static FederationPolicyManager instantiatePolicyManager(String 
newType)
+      throws FederationPolicyInitializationException {
+    FederationPolicyManager federationPolicyManager = null;
+    try {
+      // create policy instance and set queue
+      Class<?> c = Class.forName(newType);
+      federationPolicyManager = (FederationPolicyManager) c.newInstance();
+    } catch (ClassNotFoundException e) {
+      throw new FederationPolicyInitializationException(e);
+    } catch (InstantiationException e) {
+      throw new FederationPolicyInitializationException(e);
+    } catch (IllegalAccessException e) {
+      throw new FederationPolicyInitializationException(e);
+    }
+    return federationPolicyManager;
+  }
+
+  /**
+   * Get Federation policy configuration from state store, using default queue
+   * and configuration as fallback.
+   *
+   * @param queue the queue of the application
+   * @param conf the Yarn configuration
+   * @param federationFacade state store facade
+   * @return SubClusterPolicyConfiguration recreated
+   */
+  public static SubClusterPolicyConfiguration loadPolicyConfiguration(
+      String queue, Configuration conf,
+      FederationStateStoreFacade federationFacade) {
+
+    // The facade might cache this request, based on its parameterization
+    SubClusterPolicyConfiguration configuration = null;
+    if (queue != null) {
+      try {
+        configuration = federationFacade.getPolicyConfiguration(queue);
+      } catch (YarnException e) {
+        LOG.warn("Failed to get policy from FederationFacade with queue "
+            + queue + ": " + e.getMessage());
+      }
+    }
+
+    // If there is no policy configured for this queue, fallback to the 
baseline
+    // policy that is configured either in the store or via XML config
+    if (configuration == null) {
+      LOG.info("No policy configured for queue {} in StateStore,"
+          + " fallback to default queue", queue);
+      queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+      try {
+        configuration = federationFacade.getPolicyConfiguration(queue);
+      } catch (YarnException e) {
+        LOG.warn("No fallback behavior defined in store, defaulting to XML "
+            + "configuration fallback behavior.");
+      }
+    }
+
+    // or from XML conf otherwise.
+    if (configuration == null) {
+      LOG.info("No policy configured for default queue {} in StateStore,"
+          + " fallback to local config", queue);
+
+      String defaultFederationPolicyManager =
+          conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+              YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+      String defaultPolicyParamString =
+          conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
+              YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+      ByteBuffer defaultPolicyParam = ByteBuffer
+          .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
+
+      configuration = SubClusterPolicyConfiguration.newInstance(queue,
+          defaultFederationPolicyManager, defaultPolicyParam);
+    }
+    return configuration;
+  }
+
+  /**
+   * Get AMRMProxy policy from state store, using default queue and
+   * configuration as fallback.
+   *
+   * @param queue the queue of the application
+   * @param oldPolicy the previous policy instance (can be null)
+   * @param conf the Yarn configuration
+   * @param federationFacade state store facade
+   * @param homeSubClusterId home sub-cluster id
+   * @return FederationAMRMProxyPolicy recreated
+   * @throws FederationPolicyInitializationException if fails
+   */
+  public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
+      FederationAMRMProxyPolicy oldPolicy, Configuration conf,
+      FederationStateStoreFacade federationFacade,
+      SubClusterId homeSubClusterId)
+      throws FederationPolicyInitializationException {
+
+    // Local policy and its configuration
+    SubClusterPolicyConfiguration configuration =
+        loadPolicyConfiguration(queue, conf, federationFacade);
+
+    // Instantiate the policyManager and get policy
+    FederationPolicyInitializationContext context =
+        new FederationPolicyInitializationContext(configuration,
+            federationFacade.getSubClusterResolver(), federationFacade,
+            homeSubClusterId);
+
+    LOG.info("Creating policy manager of type: " + configuration.getType());
+    FederationPolicyManager federationPolicyManager =
+        instantiatePolicyManager(configuration.getType());
+    // set queue, reinit policy if required (implementation lazily check
+    // content of conf), and cache it
+    federationPolicyManager.setQueue(configuration.getQueue());
+    return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 8c22623..5e31a08 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -95,7 +95,7 @@ public class RouterPolicyFacade {
         new FederationPolicyInitializationContext(configuration,
             subClusterResolver, federationFacade, homeSubcluster);
     FederationPolicyManager fallbackPolicyManager =
-        instantiatePolicyManager(configuration.getType());
+        
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
     fallbackPolicyManager.setQueue(defaulKey);
 
     // add to the cache the fallback behavior
@@ -209,7 +209,7 @@ public class RouterPolicyFacade {
     FederationRouterPolicy routerPolicy = policyMap.get(queue);
 
     FederationPolicyManager federationPolicyManager =
-        instantiatePolicyManager(newType);
+        FederationPolicyUtils.instantiatePolicyManager(newType);
     // set queue, reinit policy if required (implementation lazily check
     // content of conf), and cache it
     federationPolicyManager.setQueue(queue);
@@ -224,23 +224,6 @@ public class RouterPolicyFacade {
     }
   }
 
-  private static FederationPolicyManager instantiatePolicyManager(
-      String newType) throws FederationPolicyInitializationException {
-    FederationPolicyManager federationPolicyManager = null;
-    try {
-      // create policy instance and set queue
-      Class c = Class.forName(newType);
-      federationPolicyManager = (FederationPolicyManager) c.newInstance();
-    } catch (ClassNotFoundException e) {
-      throw new FederationPolicyInitializationException(e);
-    } catch (InstantiationException e) {
-      throw new FederationPolicyInitializationException(e);
-    } catch (IllegalAccessException e) {
-      throw new FederationPolicyInitializationException(e);
-    }
-    return federationPolicyManager;
-  }
-
   /**
    * This method flushes all cached configurations and policies. This should be
    * invoked if the facade remains activity after very large churn of queues in

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 5f82d69..ffe47f4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -24,7 +24,14 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -38,20 +45,35 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * Extends the AbstractRequestInterceptor and provides an implementation for
  * federation of YARN RM and scaling an application across multiple YARN
@@ -70,6 +92,27 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   private SubClusterId homeSubClusterId;
 
   /**
+   * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
+   * using subClusterId as uamId. One UAM is created per sub-cluster RM except
+   * the home RM.
+   *
+   * Creation and register of UAM in secondary sub-clusters happen on-demand,
+   * when AMRMProxy policy routes resource request to these sub-clusters for 
the
+   * first time. AM heart beats to them are also handled asynchronously for
+   * performance reasons.
+   */
+  private UnmanagedAMPoolManager uamPool;
+
+  /** Thread pool used for asynchronous operations. */
+  private ExecutorService threadpool;
+
+  /**
+   * Stores the AllocateResponses that are received asynchronously from all the
+   * sub-cluster resource managers except the home RM.
+   */
+  private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
+
+  /**
    * Used to keep track of the container Id and the sub cluster RM that created
    * the container, so that we know which sub-cluster to forward later requests
    * about existing containers to.
@@ -89,7 +132,17 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   private RegisterApplicationMasterResponse amRegistrationResponse;
 
-  /** The proxy ugi used to talk to home RM. */
+  private FederationStateStoreFacade federationFacade;
+
+  private SubClusterResolver subClusterResolver;
+
+  /** The policy used to split requests among sub-clusters. */
+  private FederationAMRMProxyPolicy policyInterpreter;
+
+  /**
+   * The proxy ugi used to talk to home RM, loaded with the up-to-date 
AMRMToken
+   * issued by home RM.
+   */
   private UserGroupInformation appOwner;
 
   /**
@@ -97,6 +150,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   public FederationInterceptor() {
     this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
+    this.asyncResponseSink = new ConcurrentHashMap<>();
+    this.threadpool = Executors.newCachedThreadPool();
+    this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
+    this.amRegistrationRequest = null;
     this.amRegistrationResponse = null;
   }
 
@@ -126,6 +183,15 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
     this.homeRM = createHomeRMProxy(appContext);
+
+    this.federationFacade = FederationStateStoreFacade.getInstance();
+    this.subClusterResolver = this.federationFacade.getSubClusterResolver();
+
+    // AMRMProxyPolicy will be initialized in registerApplicationMaster
+    this.policyInterpreter = null;
+
+    this.uamPool.init(conf);
+    this.uamPool.start();
   }
 
   /**
@@ -202,7 +268,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     if (queue == null) {
       LOG.warn("Received null queue for application "
           + 
getApplicationContext().getApplicationAttemptId().getApplicationId()
-          + " from home subcluster. Will use default queue name "
+          + " from home sub-cluster. Will use default queue name "
           + YarnConfiguration.DEFAULT_QUEUE_NAME
           + " for getting AMRMProxyPolicy");
     } else {
@@ -211,6 +277,14 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
           + " belongs to queue " + queue);
     }
 
+    // Initialize the AMRMProxyPolicy
+    try {
+      this.policyInterpreter =
+          FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
+              getConf(), this.federationFacade, this.homeSubClusterId);
+    } catch (FederationPolicyInitializationException e) {
+      throw new YarnRuntimeException(e);
+    }
     return this.amRegistrationResponse;
   }
 
@@ -221,6 +295,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException {
+    Preconditions.checkArgument(this.policyInterpreter != null,
+        "Allocate should be called after registerApplicationMaster");
 
     try {
       // Split the heart beat request into multiple requests, one for each
@@ -228,12 +304,28 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       Map<SubClusterId, AllocateRequest> requests =
           splitAllocateRequest(request);
 
+      // Send the requests to the secondary sub-cluster resource managers.
+      // These secondary requests are send asynchronously and the responses 
will
+      // be collected and merged with the home response. In addition, it also
+      // return the newly registered Unmanaged AMs.
+      Registrations newRegistrations =
+          sendRequestsToSecondaryResourceManagers(requests);
+
       // Send the request to the home RM and get the response
       AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
           requests.get(this.homeSubClusterId), this.homeRM,
           this.amRegistrationRequest,
           getApplicationContext().getApplicationAttemptId());
 
+      // Notify policy of home response
+      try {
+        this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
+            homeResponse);
+      } catch (YarnException e) {
+        LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+            + this.homeSubClusterId, e);
+      }
+
       // If the resource manager sent us a new token, add to the current user
       if (homeResponse.getAMRMToken() != null) {
         LOG.debug("Received new AMRMToken");
@@ -244,6 +336,13 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       // Merge the responses from home and secondary sub-cluster RMs
       homeResponse = mergeAllocateResponses(homeResponse);
 
+      // Merge the containers and NMTokens from the new registrations into
+      // the homeResponse.
+      if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
+        homeResponse = mergeRegistrationResponses(homeResponse,
+            newRegistrations.getSuccessfulRegistrations());
+      }
+
       // return the final response to the application master.
       return homeResponse;
     } catch (IOException ex) {
@@ -261,10 +360,83 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       FinishApplicationMasterRequest request)
       throws YarnException, IOException {
 
+    // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
+    boolean failedToUnRegister = false;
+    ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
+        null;
+
+    // Application master is completing operation. Send the finish
+    // application master request to all the registered sub-cluster resource
+    // managers in parallel, wait for the responses and aggregate the results.
+    Set<String> subClusterIds = this.uamPool.getAllUAMIds();
+    if (subClusterIds.size() > 0) {
+      final FinishApplicationMasterRequest finishRequest = request;
+      compSvc =
+          new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(
+              this.threadpool);
+
+      LOG.info("Sending finish application request to {} sub-cluster RMs",
+          subClusterIds.size());
+      for (final String subClusterId : subClusterIds) {
+        compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() {
+          @Override
+          public FinishApplicationMasterResponseInfo call() throws Exception {
+            LOG.info("Sending finish application request to RM {}",
+                subClusterId);
+            FinishApplicationMasterResponse uamResponse = null;
+            try {
+              uamResponse =
+                  uamPool.finishApplicationMaster(subClusterId, finishRequest);
+            } catch (Throwable e) {
+              LOG.warn("Failed to finish unmanaged application master: "
+                  + "RM address: " + subClusterId + " ApplicationId: "
+                  + getApplicationContext().getApplicationAttemptId(), e);
+            }
+            return new FinishApplicationMasterResponseInfo(uamResponse,
+                subClusterId);
+          }
+        });
+      }
+    }
+
+    // While the finish application request is being processed
+    // asynchronously by other sub-cluster resource managers, send the same
+    // request to the home resource manager on this thread.
     FinishApplicationMasterResponse homeResponse =
         AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
             this.amRegistrationRequest,
             getApplicationContext().getApplicationAttemptId());
+
+    if (subClusterIds.size() > 0) {
+      // Wait for other sub-cluster resource managers to return the
+      // response and merge it with the home response
+      LOG.info(
+          "Waiting for finish application response from {} sub-cluster RMs",
+          subClusterIds.size());
+      for (int i = 0; i < subClusterIds.size(); ++i) {
+        try {
+          Future<FinishApplicationMasterResponseInfo> future = compSvc.take();
+          FinishApplicationMasterResponseInfo uamResponse = future.get();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Received finish application response from RM: "
+                + uamResponse.getSubClusterId());
+          }
+          if (uamResponse.getResponse() == null
+              || !uamResponse.getResponse().getIsUnregistered()) {
+            failedToUnRegister = true;
+          }
+        } catch (Throwable e) {
+          failedToUnRegister = true;
+          LOG.warn("Failed to finish unmanaged application master: "
+              + " ApplicationId: "
+              + getApplicationContext().getApplicationAttemptId(), e);
+        }
+      }
+    }
+
+    if (failedToUnRegister) {
+      homeResponse.setIsUnregistered(false);
+    }
     return homeResponse;
   }
 
@@ -281,10 +453,33 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   @Override
   public void shutdown() {
+    if (this.uamPool != null) {
+      this.uamPool.stop();
+    }
+    if (threadpool != null) {
+      try {
+        threadpool.shutdown();
+      } catch (Throwable ex) {
+      }
+      threadpool = null;
+    }
     super.shutdown();
   }
 
   /**
+   * Create the UAM pool manager for secondary sub-clsuters. For unit test to
+   * override.
+   *
+   * @param threadPool the thread pool to use
+   * @return the UAM pool manager instance
+   */
+  @VisibleForTesting
+  protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
+      ExecutorService threadPool) {
+    return new UnmanagedAMPoolManager(threadPool);
+  }
+
+  /**
    * Returns instance of the ApplicationMasterProtocol proxy class that is used
    * to connect to the Home resource manager.
    *
@@ -302,6 +497,23 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     }
   }
 
+  private SubClusterId getSubClusterForNode(String nodeName) {
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
+    } catch (YarnException e) {
+      LOG.error("Failed to resolve sub-cluster for node " + nodeName
+          + ", skipping this node", e);
+      return null;
+    }
+    if (subClusterId == null) {
+      LOG.error("Failed to resolve sub-cluster for node {}, skipping this 
node",
+          nodeName);
+      return null;
+    }
+    return subClusterId;
+  }
+
   /**
    * In federation, the heart beat request needs to be sent to all the sub
    * clusters from which the AM has requested containers. This method splits 
the
@@ -317,20 +529,39 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
         requestMap);
 
+    // Create heart beat request instances for all other already registered
+    // sub-cluster resource managers
+    Set<String> subClusterIds = this.uamPool.getAllUAMIds();
+    for (String subClusterId : subClusterIds) {
+      findOrCreateAllocateRequestForSubCluster(
+          SubClusterId.newInstance(subClusterId), request, requestMap);
+    }
+
     if (!isNullOrEmpty(request.getAskList())) {
-      AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-          this.homeSubClusterId, request, requestMap);
-      newRequest.getAskList().addAll(request.getAskList());
+      // Ask the federation policy interpreter to split the ask list for
+      // sending it to all the sub-cluster resource managers.
+      Map<SubClusterId, List<ResourceRequest>> asks =
+          splitResourceRequests(request.getAskList());
+
+      // Add the askLists to the corresponding sub-cluster requests.
+      for (Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) 
{
+        AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+            entry.getKey(), request, requestMap);
+        newRequest.getAskList().addAll(entry.getValue());
+      }
     }
 
     if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
         request.getResourceBlacklistRequest().getBlacklistAdditions())) {
       for (String resourceName : request.getResourceBlacklistRequest()
           .getBlacklistAdditions()) {
-        AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-            this.homeSubClusterId, request, requestMap);
-        newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
-            .add(resourceName);
+        SubClusterId subClusterId = getSubClusterForNode(resourceName);
+        if (subClusterId != null) {
+          AllocateRequest newRequest = 
findOrCreateAllocateRequestForSubCluster(
+              subClusterId, request, requestMap);
+          newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+              .add(resourceName);
+        }
       }
     }
 
@@ -338,10 +569,13 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
         request.getResourceBlacklistRequest().getBlacklistRemovals())) {
       for (String resourceName : request.getResourceBlacklistRequest()
           .getBlacklistRemovals()) {
-        AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-            this.homeSubClusterId, request, requestMap);
-        newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
-            .add(resourceName);
+        SubClusterId subClusterId = getSubClusterForNode(resourceName);
+        if (subClusterId != null) {
+          AllocateRequest newRequest = 
findOrCreateAllocateRequestForSubCluster(
+              subClusterId, request, requestMap);
+          newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+              .add(resourceName);
+        }
       }
     }
 
@@ -371,6 +605,174 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   }
 
   /**
+   * This methods sends the specified AllocateRequests to the appropriate
+   * sub-cluster resource managers.
+   *
+   * @param requests contains the heart beat requests to send to the resource
+   *          manager keyed by the resource manager address
+   * @return the registration responses from the newly added sub-cluster
+   *         resource managers
+   * @throws YarnException
+   * @throws IOException
+   */
+  private Registrations sendRequestsToSecondaryResourceManagers(
+      Map<SubClusterId, AllocateRequest> requests)
+      throws YarnException, IOException {
+
+    // Create new UAM instances for the sub-cluster that we have not seen
+    // before
+    Registrations registrations = 
registerWithNewSubClusters(requests.keySet());
+
+    // Now that all the registrations are done, send the allocation request
+    // to the sub-cluster RMs using the Unmanaged application masters
+    // asynchronously and don't wait for the response. The responses will
+    // arrive asynchronously and will be added to the response sink. These
+    // responses will be sent to the application master in some future heart
+    // beat response.
+    for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
+      final SubClusterId subClusterId = entry.getKey();
+
+      if (subClusterId.equals(this.homeSubClusterId)) {
+        // Skip the request for the home sub-cluster resource manager.
+        // It will be handled separately in the allocate() method
+        continue;
+      }
+
+      if (!this.uamPool.hasUAMId(subClusterId.getId())) {
+        // TODO: This means that the registration for this sub-cluster RM
+        // failed. For now, we ignore the resource requests and continue
+        // but we need to fix this and handle this situation. One way would
+        // be to send the request to another RM by consulting the policy.
+        LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
+            subClusterId);
+        continue;
+      }
+
+      this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
+          new AsyncCallback<AllocateResponse>() {
+            @Override
+            public void callback(AllocateResponse response) {
+              synchronized (asyncResponseSink) {
+                List<AllocateResponse> responses = null;
+                if (asyncResponseSink.containsKey(subClusterId)) {
+                  responses = asyncResponseSink.get(subClusterId);
+                } else {
+                  responses = new ArrayList<>();
+                  asyncResponseSink.put(subClusterId, responses);
+                }
+                responses.add(response);
+              }
+
+              // Notify policy of secondary sub-cluster responses
+              try {
+                policyInterpreter.notifyOfResponse(subClusterId, response);
+              } catch (YarnException e) {
+                LOG.warn(
+                    "notifyOfResponse for policy failed for home sub-cluster "
+                        + subClusterId,
+                    e);
+              }
+            }
+          });
+    }
+
+    return registrations;
+  }
+
+  /**
+   * This method ensures that Unmanaged AMs are created for each of the
+   * specified sub-cluster specified in the input and registers with the
+   * corresponding resource managers.
+   */
+  private Registrations registerWithNewSubClusters(
+      Set<SubClusterId> subClusterSet) throws IOException {
+
+    List<SubClusterId> failedRegistrations = new ArrayList<>();
+    Map<SubClusterId, RegisterApplicationMasterResponse>
+        successfulRegistrations = new HashMap<>();
+
+    // Check to see if there are any new sub-clusters in this request
+    // list and create and register Unmanaged AM instance for the new ones
+    List<String> newSubClusters = new ArrayList<>();
+    for (SubClusterId subClusterId : subClusterSet) {
+      if (!subClusterId.equals(this.homeSubClusterId)
+          && !this.uamPool.hasUAMId(subClusterId.getId())) {
+        newSubClusters.add(subClusterId.getId());
+      }
+    }
+
+    if (newSubClusters.size() > 0) {
+      final RegisterApplicationMasterRequest registerRequest =
+          this.amRegistrationRequest;
+      final AMRMProxyApplicationContext appContext = getApplicationContext();
+      ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
+          completionService = new ExecutorCompletionService<>(threadpool);
+
+      for (final String subClusterId : newSubClusters) {
+        completionService
+            .submit(new Callable<RegisterApplicationMasterResponseInfo>() {
+              @Override
+              public RegisterApplicationMasterResponseInfo call()
+                  throws Exception {
+
+                // Create a config loaded with federation on and subclusterId
+                // for each UAM
+                YarnConfiguration config = new YarnConfiguration(getConf());
+                FederationProxyProviderUtil.updateConfForFederation(config,
+                    subClusterId);
+
+                RegisterApplicationMasterResponse uamResponse = null;
+                try {
+                  // For appNameSuffix, use subClusterId of the home 
sub-cluster
+                  uamResponse = uamPool.createAndRegisterNewUAM(subClusterId,
+                      registerRequest, config,
+                      appContext.getApplicationAttemptId().getApplicationId(),
+                      amRegistrationResponse.getQueue(), appContext.getUser(),
+                      homeSubClusterId.toString());
+                } catch (Throwable e) {
+                  LOG.error("Failed to register application master: "
+                      + subClusterId + " Application: "
+                      + appContext.getApplicationAttemptId(), e);
+                }
+                return new RegisterApplicationMasterResponseInfo(uamResponse,
+                    SubClusterId.newInstance(subClusterId));
+              }
+            });
+      }
+
+      // Wait for other sub-cluster resource managers to return the
+      // response and add it to the Map for returning to the caller
+      for (int i = 0; i < newSubClusters.size(); ++i) {
+        try {
+          Future<RegisterApplicationMasterResponseInfo> future =
+              completionService.take();
+          RegisterApplicationMasterResponseInfo uamResponse = future.get();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Received register application response from RM: "
+                + uamResponse.getSubClusterId());
+          }
+
+          if (uamResponse.getResponse() == null) {
+            failedRegistrations.add(uamResponse.getSubClusterId());
+          } else {
+            LOG.info("Successfully registered unmanaged application master: "
+                + uamResponse.getSubClusterId() + " ApplicationId: "
+                + getApplicationContext().getApplicationAttemptId());
+            successfulRegistrations.put(uamResponse.getSubClusterId(),
+                uamResponse.getResponse());
+          }
+        } catch (Exception e) {
+          LOG.warn("Failed to register unmanaged application master: "
+              + " ApplicationId: "
+              + getApplicationContext().getApplicationAttemptId(), e);
+        }
+      }
+    }
+
+    return new Registrations(successfulRegistrations, failedRegistrations);
+  }
+
+  /**
    * Merges the responses from other sub-clusters that we received
    * asynchronously with the specified home cluster response and keeps track of
    * the containers received from each sub-cluster resource managers.
@@ -388,6 +790,24 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
         this.homeSubClusterId);
 
+    synchronized (this.asyncResponseSink) {
+      for (Entry<SubClusterId, List<AllocateResponse>> entry : 
asyncResponseSink
+          .entrySet()) {
+        SubClusterId subClusterId = entry.getKey();
+        List<AllocateResponse> responses = entry.getValue();
+        if (responses.size() > 0) {
+          for (AllocateResponse response : responses) {
+            removeFinishedContainersFromCache(
+                response.getCompletedContainersStatuses());
+            cacheAllocatedContainers(response.getAllocatedContainers(),
+                subClusterId);
+            mergeAllocateResponse(homeResponse, response, subClusterId);
+          }
+          responses.clear();
+        }
+      }
+    }
+
     return homeResponse;
   }
 
@@ -405,6 +825,130 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   }
 
   /**
+   * Helper method for merging the responses from the secondary sub cluster RMs
+   * with the home response to return to the AM.
+   */
+  private AllocateResponse mergeRegistrationResponses(
+      AllocateResponse homeResponse,
+      Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
+
+    for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
+        registrations.entrySet()) {
+      RegisterApplicationMasterResponse registration = entry.getValue();
+
+      if (!isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
+        List<Container> tempContainers = homeResponse.getAllocatedContainers();
+        if (!isNullOrEmpty(tempContainers)) {
+          tempContainers
+              .addAll(registration.getContainersFromPreviousAttempts());
+          homeResponse.setAllocatedContainers(tempContainers);
+        } else {
+          homeResponse.setAllocatedContainers(
+              registration.getContainersFromPreviousAttempts());
+        }
+        cacheAllocatedContainers(
+            registration.getContainersFromPreviousAttempts(), entry.getKey());
+      }
+
+      if (!isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) {
+        List<NMToken> tempTokens = homeResponse.getNMTokens();
+        if (!isNullOrEmpty(tempTokens)) {
+          tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
+          homeResponse.setNMTokens(tempTokens);
+        } else {
+          homeResponse
+              .setNMTokens(registration.getNMTokensFromPreviousAttempts());
+        }
+      }
+    }
+
+    return homeResponse;
+  }
+
+  private void mergeAllocateResponse(AllocateResponse homeResponse,
+      AllocateResponse otherResponse, SubClusterId otherRMAddress) {
+
+    if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
+      if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
+        homeResponse.getAllocatedContainers()
+            .addAll(otherResponse.getAllocatedContainers());
+      } else {
+        homeResponse
+            .setAllocatedContainers(otherResponse.getAllocatedContainers());
+      }
+    }
+
+    if (otherResponse.getAvailableResources() != null) {
+      if (homeResponse.getAvailableResources() != null) {
+        homeResponse.setAvailableResources(
+            Resources.add(homeResponse.getAvailableResources(),
+                otherResponse.getAvailableResources()));
+      } else {
+        homeResponse
+            .setAvailableResources(otherResponse.getAvailableResources());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
+      if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
+        homeResponse.getCompletedContainersStatuses()
+            .addAll(otherResponse.getCompletedContainersStatuses());
+      } else {
+        homeResponse.setCompletedContainersStatuses(
+            otherResponse.getCompletedContainersStatuses());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdatedNodes())) {
+      if (!isNullOrEmpty(homeResponse.getUpdatedNodes())) {
+        homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
+      } else {
+        homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
+      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
+        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
+      } else {
+        homeResponse.setNMTokens(otherResponse.getNMTokens());
+      }
+    }
+
+    PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
+    PreemptionMessage otherPreempMessage = 
otherResponse.getPreemptionMessage();
+
+    if (homePreempMessage == null && otherPreempMessage != null) {
+      homeResponse.setPreemptionMessage(otherPreempMessage);
+    }
+
+    if (homePreempMessage != null && otherPreempMessage != null) {
+      PreemptionContract par1 = homePreempMessage.getContract();
+      PreemptionContract par2 = otherPreempMessage.getContract();
+
+      if (par1 == null && par2 != null) {
+        homePreempMessage.setContract(par2);
+      }
+
+      if (par1 != null && par2 != null) {
+        par1.getResourceRequest().addAll(par2.getResourceRequest());
+        par2.getContainers().addAll(par2.getContainers());
+      }
+
+      StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
+      StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();
+
+      if (spar1 == null && spar2 != null) {
+        homePreempMessage.setStrictContract(spar2);
+      }
+
+      if (spar1 != null && spar2 != null) {
+        spar1.getContainers().addAll(spar2.getContainers());
+      }
+    }
+  }
+
+  /**
    * Add allocated containers to cache mapping.
    */
   private void cacheAllocatedContainers(List<Container> containers,
@@ -418,10 +962,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
           // container allocation more than once. Just move on in this case.
           LOG.warn(
               "Duplicate containerID: {} found in the allocated containers"
-                  + " from same subcluster: {}, so ignoring.",
+                  + " from same sub-cluster: {}, so ignoring.",
               container.getId(), subClusterId);
         } else {
-          // The same container allocation from different subclusters,
+          // The same container allocation from different sub-clusters,
           // something is wrong.
           // TODO: YARN-6667 if some subcluster RM is configured wrong, we
           // should not fail the entire heartbeat.
@@ -432,7 +976,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
                   + " ApplicationId: "
                   + getApplicationContext().getApplicationAttemptId()
                   + " From RM: " + subClusterId
-                  + " . Previous container was from subcluster: "
+                  + " . Previous container was from sub-cluster: "
                   + existingSubClusterId);
         }
       }
@@ -498,7 +1042,102 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   }
 
   /**
-   * Utility method to check if the specified Collection is null or empty
+   * Splits the specified request to send it to different sub clusters. The
+   * splitting algorithm is very simple. If the request does not have a node
+   * preference, the policy decides the sub cluster. If the request has a node
+   * preference and if locality is required, then it is sent to the sub cluster
+   * that contains the requested node. If node preference is specified and
+   * locality is not required, then the policy decides the sub cluster.
+   *
+   * @param askList the ask list to split
+   * @return the split asks
+   * @throws YarnException if split fails
+   */
+  protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
+      List<ResourceRequest> askList) throws YarnException {
+    return this.policyInterpreter.splitResourceRequests(askList);
+  }
+
+  @VisibleForTesting
+  public int getUnmanagedAMPoolSize() {
+    return this.uamPool.getAllUAMIds().size();
+  }
+
+  /**
+   * Private structure for encapsulating SubClusterId and
+   * RegisterApplicationMasterResponse instances.
+   */
+  private static class RegisterApplicationMasterResponseInfo {
+    private RegisterApplicationMasterResponse response;
+    private SubClusterId subClusterId;
+
+    RegisterApplicationMasterResponseInfo(
+        RegisterApplicationMasterResponse response, SubClusterId subClusterId) 
{
+      this.response = response;
+      this.subClusterId = subClusterId;
+    }
+
+    public RegisterApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public SubClusterId getSubClusterId() {
+      return subClusterId;
+    }
+  }
+
+  /**
+   * Private structure for encapsulating SubClusterId and
+   * FinishApplicationMasterResponse instances.
+   */
+  private static class FinishApplicationMasterResponseInfo {
+    private FinishApplicationMasterResponse response;
+    private String subClusterId;
+
+    FinishApplicationMasterResponseInfo(
+        FinishApplicationMasterResponse response, String subClusterId) {
+      this.response = response;
+      this.subClusterId = subClusterId;
+    }
+
+    public FinishApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public String getSubClusterId() {
+      return subClusterId;
+    }
+  }
+
+  /**
+   * Private structure for encapsulating successful and failed application
+   * master registration responses.
+   */
+  private static class Registrations {
+    private Map<SubClusterId, RegisterApplicationMasterResponse>
+        successfulRegistrations;
+    private List<SubClusterId> failedRegistrations;
+
+    Registrations(
+        Map<SubClusterId, RegisterApplicationMasterResponse>
+            successfulRegistrations,
+        List<SubClusterId> failedRegistrations) {
+      this.successfulRegistrations = successfulRegistrations;
+      this.failedRegistrations = failedRegistrations;
+    }
+
+    public Map<SubClusterId, RegisterApplicationMasterResponse>
+        getSuccessfulRegistrations() {
+      return this.successfulRegistrations;
+    }
+
+    public List<SubClusterId> getFailedRegistrations() {
+      return this.failedRegistrations;
+    }
+  }
+
+  /**
+   * Utility method to check if the specified Collection is null or empty.
    *
    * @param c the collection object
    * @param <T> element type of the collection
@@ -507,4 +1146,16 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   public static <T> boolean isNullOrEmpty(Collection<T> c) {
     return (c == null || c.size() == 0);
   }
+
+  /**
+   * Utility method to check if the specified Collection is null or empty.
+   *
+   * @param c the map object
+   * @param <T1> key type of the map
+   * @param <T2> value type of the map
+   * @return whether is it is null or empty
+   */
+  public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
+    return (c == null || c.size() == 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 3b564f0..4e15323 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -19,13 +19,31 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -45,6 +63,7 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
   public static final String HOME_SC_ID = "SC-home";
 
   private TestableFederationInterceptor interceptor;
+  private MemoryFederationStateStore stateStore;
 
   private int testAppId;
   private ApplicationAttemptId attemptId;
@@ -54,6 +73,11 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
     super.setUp();
     interceptor = new TestableFederationInterceptor();
 
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        getConf());
+
     testAppId = 1;
     attemptId = getApplicationAttemptId(testAppId);
     interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
@@ -82,11 +106,238 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
         mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
             + "," + TestableFederationInterceptor.class.getName());
 
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
     conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
 
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
     return conf;
   }
 
+  private void registerSubCluster(SubClusterId subClusterId)
+      throws YarnException {
+    stateStore
+        
.registerSubCluster(SubClusterRegisterRequest.newInstance(SubClusterInfo
+            .newInstance(subClusterId, "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3",
+                "1.2.3.4:4", SubClusterState.SC_RUNNING, 0, "capacity")));
+  }
+
+  private void deRegisterSubCluster(SubClusterId subClusterId)
+      throws YarnException {
+    stateStore.deregisterSubCluster(SubClusterDeregisterRequest
+        .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+  }
+
+  private List<Container> getContainersAndAssert(int numberOfResourceRequests,
+      int numberOfAllocationExcepted) throws Exception {
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<Container> containers =
+        new ArrayList<Container>(numberOfResourceRequests);
+    List<ResourceRequest> askList =
+        new ArrayList<ResourceRequest>(numberOfResourceRequests);
+    for (int id = 0; id < numberOfResourceRequests; id++) {
+      askList.add(createResourceRequest("test-node-" + Integer.toString(id),
+          6000, 2, id % 5, 1));
+    }
+
+    allocateRequest.setAskList(askList);
+
+    AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
+    Assert.assertNotNull("allocate() returned null response", 
allocateResponse);
+
+    containers.addAll(allocateResponse.getAllocatedContainers());
+    LOG.info("Number of allocated containers in the original request: "
+        + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containers.size() < numberOfAllocationExcepted
+        && numHeartbeat++ < 10) {
+      allocateResponse =
+          interceptor.allocate(Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull("allocate() returned null response",
+          allocateResponse);
+
+      containers.addAll(allocateResponse.getAllocatedContainers());
+
+      LOG.info("Number of allocated containers in this request: "
+          + 
Integer.toString(allocateResponse.getAllocatedContainers().size()));
+      LOG.info("Total number of allocated containers: "
+          + Integer.toString(containers.size()));
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(numberOfAllocationExcepted, containers.size());
+    return containers;
+  }
+
+  private void releaseContainersAndAssert(List<Container> containers)
+      throws Exception {
+    Assert.assertTrue(containers.size() > 0);
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
+    for (Container container : containers) {
+      relList.add(container.getId());
+    }
+
+    allocateRequest.setReleaseList(relList);
+
+    AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
+    Assert.assertNotNull(allocateResponse);
+
+    // The way the mock resource manager is setup, it will return the 
containers
+    // that were released in the allocated containers. The release request will
+    // be split and handled by the corresponding UAM. The release containers
+    // returned by the mock resource managers will be aggregated and returned
+    // back to us and we can check if total request size and returned size are
+    // the same
+    List<Container> containersForReleasedContainerIds =
+        new ArrayList<Container>();
+    containersForReleasedContainerIds
+        .addAll(allocateResponse.getAllocatedContainers());
+    LOG.info("Number of containers received in the original request: "
+        + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containersForReleasedContainerIds.size() < relList.size()
+        && numHeartbeat++ < 10) {
+      allocateResponse =
+          interceptor.allocate(Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull(allocateResponse);
+      containersForReleasedContainerIds
+          .addAll(allocateResponse.getAllocatedContainers());
+
+      LOG.info("Number of containers received in this request: "
+          + 
Integer.toString(allocateResponse.getAllocatedContainers().size()));
+      LOG.info("Total number of containers received: "
+          + Integer.toString(containersForReleasedContainerIds.size()));
+      Thread.sleep(10);
+    }
+
+    Assert.assertEquals(relList.size(),
+        containersForReleasedContainerIds.size());
+  }
+
+  @Test
+  public void testMultipleSubClusters() throws Exception {
+
+    // Register the application
+    RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    RegisterApplicationMasterResponse registerResponse =
+        interceptor.registerApplicationMaster(registerReq);
+    Assert.assertNotNull(registerResponse);
+
+    Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+    // Allocate the first batch of containers, with sc1 and sc2 active
+    registerSubCluster(SubClusterId.newInstance("SC-1"));
+    registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+    int numberOfContainers = 3;
+    List<Container> containers =
+        getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+    Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+
+    // Allocate the second batch of containers, with sc1 and sc3 active
+    deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
+    registerSubCluster(SubClusterId.newInstance("SC-3"));
+
+    numberOfContainers = 1;
+    containers.addAll(
+        getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
+    Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
+
+    // Allocate the third batch of containers with only in home sub-cluster
+    // active
+    deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
+    deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
+    registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+    numberOfContainers = 2;
+    containers.addAll(
+        getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
+    Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
+
+    // Release all containers
+    releaseContainersAndAssert(containers);
+
+    // Finish the application
+    FinishApplicationMasterRequest finishReq =
+        Records.newRecord(FinishApplicationMasterRequest.class);
+    finishReq.setDiagnostics("");
+    finishReq.setTrackingUrl("");
+    finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+    FinishApplicationMasterResponse finshResponse =
+        interceptor.finishApplicationMaster(finishReq);
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  /*
+   * Test re-register when RM fails over.
+   */
+  @Test
+  public void testReregister() throws Exception {
+
+    // Register the application
+    RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    RegisterApplicationMasterResponse registerResponse =
+        interceptor.registerApplicationMaster(registerReq);
+    Assert.assertNotNull(registerResponse);
+
+    Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+    // Allocate the first batch of containers
+    registerSubCluster(SubClusterId.newInstance("SC-1"));
+    registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+    interceptor.setShouldReRegisterNext();
+
+    int numberOfContainers = 3;
+    List<Container> containers =
+        getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+    Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+    interceptor.setShouldReRegisterNext();
+
+    // Release all containers
+    releaseContainersAndAssert(containers);
+
+    interceptor.setShouldReRegisterNext();
+
+    // Finish the application
+    FinishApplicationMasterRequest finishReq =
+        Records.newRecord(FinishApplicationMasterRequest.class);
+    finishReq.setDiagnostics("");
+    finishReq.setTrackingUrl("");
+    finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+    FinishApplicationMasterResponse finshResponse =
+        interceptor.finishApplicationMaster(finishReq);
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
   @Test
   public void testRequestInterceptorChainCreation() throws Exception {
     RequestInterceptor root =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 0ca7488..d4b8735 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -45,6 +45,12 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
   private MockResourceManagerFacade mockRm;
 
   @Override
+  protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
+      ExecutorService threadPool) {
+    return new TestableUnmanagedAMPoolManager(threadPool);
+  }
+
+  @Override
   protected ApplicationMasterProtocol createHomeRMProxy(
       AMRMProxyApplicationContext appContext) {
     synchronized (this) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to