YARN-8933. [AMRMProxy] Fix potential empty fields in allocation response, move 
SubClusterTimeout to FederationInterceptor. Contributed by Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5ec85d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5ec85d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5ec85d9

Branch: refs/heads/YARN-7402
Commit: b5ec85d96615e8214c14b57f8980a1dee6197ffa
Parents: 2664248
Author: Botong Huang <bot...@apache.org>
Authored: Sun Nov 11 11:12:53 2018 -0800
Committer: Botong Huang <bot...@apache.org>
Committed: Sun Nov 11 11:12:53 2018 -0800

----------------------------------------------------------------------
 .../amrmproxy/BroadcastAMRMProxyPolicy.java     |   4 +-
 .../amrmproxy/FederationAMRMProxyPolicy.java    |   8 +-
 .../policies/amrmproxy/HomeAMRMProxyPolicy.java |   5 +-
 .../LocalityMulticastAMRMProxyPolicy.java       |  52 ++------
 .../amrmproxy/RejectAMRMProxyPolicy.java        |   4 +-
 .../policies/BaseFederationPoliciesTest.java    |   5 +-
 .../TestBroadcastAMRMProxyFederationPolicy.java |  10 +-
 .../amrmproxy/TestHomeAMRMProxyPolicy.java      |  10 +-
 .../TestLocalityMulticastAMRMProxyPolicy.java   |  41 +++---
 .../amrmproxy/TestRejectAMRMProxyPolicy.java    |   5 +-
 .../amrmproxy/FederationInterceptor.java        | 129 ++++++++++++++++---
 .../amrmproxy/TestFederationInterceptor.java    |  63 ++++++++-
 12 files changed, 236 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
index eb83baa..643bfa6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -49,7 +50,8 @@ public class BroadcastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
 
   @Override
   public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
-      List<ResourceRequest> resourceRequests) throws YarnException {
+      List<ResourceRequest> resourceRequests,
+      Set<SubClusterId> timedOutSubClusters) throws YarnException {
 
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
index 0541df4..3d39d72 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,15 +40,16 @@ public interface FederationAMRMProxyPolicy
    *
    * @param resourceRequests the list of {@link ResourceRequest}s from the AM 
to
    *          be split
-   *
+   * @param timedOutSubClusters the set of sub-clusters that haven't had a
+   *          successful heart-beat response for a while.
    * @return map of sub-cluster as identified by {@link SubClusterId} to the
    *         list of {@link ResourceRequest}s that should be forwarded to it
-   *
    * @throws YarnException in case the request is malformed or no viable
    *           sub-clusters can be found.
    */
   Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
-      List<ResourceRequest> resourceRequests) throws YarnException;
+      List<ResourceRequest> resourceRequests,
+      Set<SubClusterId> timedOutSubClusters) throws YarnException;
 
   /**
    * This method should be invoked to notify the policy about responses being

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java
index 5dd5c53..acb7e0a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -55,8 +56,8 @@ public class HomeAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
 
   @Override
   public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
-      List<ResourceRequest> resourceRequests) throws YarnException {
-
+      List<ResourceRequest> resourceRequests,
+      Set<SubClusterId> timedOutSubClusters) throws YarnException {
     if (homeSubcluster == null) {
       throw new FederationPolicyException("No home subcluster available");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
index e5f26d8..47d23e0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -132,11 +131,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
   private SubClusterResolver resolver;
 
   private Map<SubClusterId, Resource> headroom;
-  private Map<SubClusterId, Long> lastHeartbeatTimeStamp;
-  private long subClusterTimeOut;
   private float hrAlpha;
   private FederationStateStoreFacade federationFacade;
-  private AllocationBookkeeper bookkeeper;
   private SubClusterId homeSubcluster;
 
   @Override
@@ -186,26 +182,12 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
 
     if (headroom == null) {
       headroom = new ConcurrentHashMap<>();
-      lastHeartbeatTimeStamp = new ConcurrentHashMap<>();
     }
     hrAlpha = policy.getHeadroomAlpha();
 
     this.federationFacade =
         policyContext.getFederationStateStoreFacade();
     this.homeSubcluster = policyContext.getHomeSubcluster();
-
-    this.subClusterTimeOut = this.federationFacade.getConf().getLong(
-        YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
-        YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
-    if (this.subClusterTimeOut <= 0) {
-      LOG.info(
-          "{} configured to be {}, should be positive. Using default of {}.",
-          YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
-          this.subClusterTimeOut,
-          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
-      this.subClusterTimeOut =
-          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
-    }
   }
 
   @Override
@@ -216,18 +198,18 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
       LOG.info("Subcluster {} updated with {} memory headroom", subClusterId,
           response.getAvailableResources().getMemorySize());
     }
-    lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis());
   }
 
   @Override
   public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
-      List<ResourceRequest> resourceRequests) throws YarnException {
+      List<ResourceRequest> resourceRequests,
+      Set<SubClusterId> timedOutSubClusters) throws YarnException {
 
     // object used to accumulate statistics about the answer, initialize with
     // active subclusters. Create a new instance per call because this method
     // can be called concurrently.
-    bookkeeper = new AllocationBookkeeper();
-    bookkeeper.reinitialize(federationFacade.getSubClusters(true));
+    AllocationBookkeeper bookkeeper = new AllocationBookkeeper();
+    bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters);
 
     List<ResourceRequest> nonLocalizedRequests =
         new ArrayList<ResourceRequest>();
@@ -298,15 +280,6 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     // handle all non-localized requests (ANY)
     splitAnyRequests(nonLocalizedRequests, bookkeeper);
 
-    for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : bookkeeper
-        .getAnswer().entrySet()) {
-      // A new-cluster here will trigger new UAM luanch, which might take a 
long
-      // time. We don't want too many requests stuck in this UAM before it is
-      // ready and starts heartbeating
-      if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) {
-        lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis());
-      }
-    }
     return bookkeeper.getAnswer();
   }
 
@@ -540,8 +513,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
     private float totPolicyWeight = 0;
 
     private void reinitialize(
-        Map<SubClusterId, SubClusterInfo> activeSubclusters)
-        throws YarnException {
+        Map<SubClusterId, SubClusterInfo> activeSubclusters,
+        Set<SubClusterId> timedOutSubClusters) throws YarnException {
       if (activeSubclusters == null) {
         throw new YarnRuntimeException("null activeSubclusters received");
       }
@@ -573,17 +546,8 @@ public class LocalityMulticastAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
       }
 
       Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC);
-      for (Map.Entry<SubClusterId, Long> entry : lastHeartbeatTimeStamp
-          .entrySet()) {
-        long duration = System.currentTimeMillis() - entry.getValue();
-        if (duration > subClusterTimeOut) {
-          LOG.warn(
-              "Subcluster {} does not have a success heartbeat for {}s, "
-                  + "skip routing asks there for this request",
-              entry.getKey(), (double) duration / 1000);
-          tmpSCSet.remove(entry.getKey());
-        }
-      }
+      tmpSCSet.removeAll(timedOutSubClusters);
+
       if (tmpSCSet.size() < 1) {
         LOG.warn("All active and enabled subclusters have expired last "
             + "heartbeat time. Ignore the expiry check for this request");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java
index bed037e..a21234e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -47,7 +48,8 @@ public class RejectAMRMProxyPolicy extends 
AbstractAMRMProxyPolicy {
 
   @Override
   public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
-      List<ResourceRequest> resourceRequests) throws YarnException {
+      List<ResourceRequest> resourceRequests,
+      Set<SubClusterId> timedOutSubClusters) throws YarnException {
     throw new FederationPolicyException("The policy configured for this queue "
         + "rejects all routing requests by construction.");
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 23978ed..57d3c67 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -109,8 +110,8 @@ public abstract class BaseFederationPoliciesTest {
       String[] hosts = new String[] {"host1", "host2"};
       List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
           .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
-      ((FederationAMRMProxyPolicy) localPolicy)
-          .splitResourceRequests(resourceRequests);
+      ((FederationAMRMProxyPolicy) localPolicy).splitResourceRequests(
+          resourceRequests, new HashSet<SubClusterId>());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
index df5da85..52f36a4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -28,7 +29,6 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import 
org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -71,8 +71,8 @@ public class TestBroadcastAMRMProxyFederationPolicy
         .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
     Assert.assertTrue(response.size() == 2);
     for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
         .entrySet()) {
@@ -94,8 +94,8 @@ public class TestBroadcastAMRMProxyFederationPolicy
     List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
         .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
 
     ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
         SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java
index 90a6aeb..1f57c1f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -80,9 +81,9 @@ public class TestHomeAMRMProxyPolicy extends 
BaseFederationPoliciesTest {
         hosts, 2 * 1024, 2, 1, 3, null, false);
 
     HomeAMRMProxyPolicy federationPolicy =
-        (HomeAMRMProxyPolicy)getPolicy();
-    Map<SubClusterId, List<ResourceRequest>> response =
-        federationPolicy.splitResourceRequests(resourceRequests);
+        (HomeAMRMProxyPolicy) getPolicy();
+    Map<SubClusterId, List<ResourceRequest>> response = federationPolicy
+        .splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
     assertEquals(1, response.size());
     assertNotNull(response.get(HOME_SC_ID));
     assertEquals(9, response.get(HOME_SC_ID).size());
@@ -101,7 +102,8 @@ public class TestHomeAMRMProxyPolicy extends 
BaseFederationPoliciesTest {
       List<ResourceRequest> resourceRequests = createResourceRequests(
           hosts, 2 * 1024, 2, 1, 3, null, false);
       HomeAMRMProxyPolicy federationPolicy = (HomeAMRMProxyPolicy)getPolicy();
-      federationPolicy.splitResourceRequests(resourceRequests);
+      federationPolicy.splitResourceRequests(resourceRequests,
+          new HashSet<SubClusterId>());
       fail("It should fail when the home subcluster is not active");
     } catch(FederationPolicyException e) {
       GenericTestUtils.assertExceptionContains("is not active", e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
index c49ab60..10359e4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -154,8 +154,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
     prepPolicyWithHeadroom(true);
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
 
     // pretty print requests
     LOG.info("Initial headroom");
@@ -180,7 +180,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     ((FederationAMRMProxyPolicy) getPolicy())
         .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
     response = ((FederationAMRMProxyPolicy) getPolicy())
-        .splitResourceRequests(resourceRequests);
+        .splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
 
     LOG.info("After headroom update");
     prettyPrintRequests(response);
@@ -218,8 +218,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
     long tstart = System.currentTimeMillis();
     for (int i = 0; i < numIterations; i++) {
       Map<SubClusterId, List<ResourceRequest>> response =
-          ((FederationAMRMProxyPolicy) getPolicy())
-              .splitResourceRequests(resourceRequests);
+          ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+              resourceRequests, new HashSet<SubClusterId>());
       validateSplit(response, resourceRequests);
     }
     long tend = System.currentTimeMillis();
@@ -243,8 +243,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
     prepPolicyWithHeadroom(true);
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
 
     // we expect all three to appear for a zero-sized ANY
 
@@ -279,8 +279,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
     prepPolicyWithHeadroom(true);
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
 
     // pretty print requests
     prettyPrintRequests(response);
@@ -354,8 +354,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
     List<ResourceRequest> resourceRequests = createComplexRequest();
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
 
     validateSplit(response, resourceRequests);
     prettyPrintRequests(response);
@@ -697,8 +697,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
         ResourceRequest.ANY, 1024, 1, 1, 0, null, false));
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
 
     checkExpectedAllocation(response, "subcluster0", 3, 1);
     checkExpectedAllocation(response, "subcluster1", 1, 0);
@@ -717,7 +717,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
         ResourceRequest.ANY, 1024, 1, 1, 100, null, false));
 
     response = ((FederationAMRMProxyPolicy) getPolicy())
-        .splitResourceRequests(resourceRequests);
+        .splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
 
     /*
      * Since node request is a cancel, it should not be considered associated
@@ -750,12 +750,13 @@ public class TestLocalityMulticastAMRMProxyPolicy
     initializePolicy(conf);
     List<ResourceRequest> resourceRequests = createSimpleRequest();
 
-    // Update the response timestamp for the first time
     prepPolicyWithHeadroom(true);
 
+    // For first time, no sub-cluster expired
+    Set<SubClusterId> expiredSCList = new HashSet<>();
     Map<SubClusterId, List<ResourceRequest>> response =
         ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+            .splitResourceRequests(resourceRequests, expiredSCList);
 
     // pretty print requests
     prettyPrintRequests(response);
@@ -776,11 +777,11 @@ public class TestLocalityMulticastAMRMProxyPolicy
 
     Thread.sleep(800);
 
-    // Update the response timestamp for the second time, skipping sc0 and sc5
-    prepPolicyWithHeadroom(false);
-
+    // For the second time, sc0 and sc5 expired
+    expiredSCList.add(SubClusterId.newInstance("subcluster0"));
+    expiredSCList.add(SubClusterId.newInstance("subcluster5"));
     response = ((FederationAMRMProxyPolicy) getPolicy())
-        .splitResourceRequests(resourceRequests);
+        .splitResourceRequests(resourceRequests, expiredSCList);
 
     // pretty print requests
     prettyPrintRequests(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java
index 41e7fed..6555829 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -69,8 +70,8 @@ public class TestRejectAMRMProxyPolicy
         .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
 
     Map<SubClusterId, List<ResourceRequest>> response =
-        ((FederationAMRMProxyPolicy) getPolicy())
-            .splitResourceRequests(resourceRequests);
+        ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
+            resourceRequests, new HashSet<SubClusterId>());
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index ae9f78d..dc10f95 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
@@ -163,10 +165,22 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
 
   /**
    * Stores the AllocateResponses that are received asynchronously from all the
-   * sub-cluster resource managers, including home RM.
+   * sub-cluster resource managers, including home RM, but not merged and
+   * returned back to AM yet.
    */
   private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
 
+  /**
+   * Remembers the last allocate response from all known sub-clusters. This is
+   * used together with sub-cluster timeout to assemble entries about
+   * cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the 
allocate
+   * response back to AM.
+   */
+  private Map<SubClusterId, AllocateResponse> lastSCResponse;
+
+  /**
+   * The async UAM registration result that is not consumed yet.
+   */
   private Map<SubClusterId, RegisterApplicationMasterResponse> 
uamRegistrations;
 
   // For unit test synchronization
@@ -216,6 +230,16 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
 
   private SubClusterResolver subClusterResolver;
 
+  /**
+   * Records the last time a successful heartbeat response received from a 
known
+   * sub-cluster. lastHeartbeatTimeStamp.keySet() should be in sync with
+   * uamPool.getAllUAMIds().
+   */
+  private Map<SubClusterId, Long> lastSCResponseTime;
+  private long subClusterTimeOut;
+
+  private long lastAMHeartbeatTime;
+
   /** The policy used to split requests among sub-clusters. */
   private FederationAMRMProxyPolicy policyInterpreter;
 
@@ -232,6 +256,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   public FederationInterceptor() {
     this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
     this.asyncResponseSink = new ConcurrentHashMap<>();
+    this.lastSCResponse = new ConcurrentHashMap<>();
     this.uamRegistrations = new ConcurrentHashMap<>();
     this.uamRegisterFutures = new ConcurrentHashMap<>();
     this.threadpool = Executors.newCachedThreadPool();
@@ -241,6 +266,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     this.amRegistrationResponse = null;
     this.justRecovered = false;
     this.finishAMCalled = false;
+    this.lastSCResponseTime = new ConcurrentHashMap<>();
+    this.lastAMHeartbeatTime = this.clock.getTime();
   }
 
   /**
@@ -310,6 +337,19 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     this.heartbeatMaxWaitTimeMs =
         conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
             YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
+
+    this.subClusterTimeOut =
+        conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+            YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
+    if (this.subClusterTimeOut <= 0) {
+      LOG.info(
+          "{} configured to be {}, should be positive. Using default of {}.",
+          YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+          this.subClusterTimeOut,
+          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
+      this.subClusterTimeOut =
+          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
+    }
   }
 
   @Override
@@ -394,6 +434,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
               this.uamPool.registerApplicationMaster(subClusterId.getId(),
                   this.amRegistrationRequest);
 
+          // Set sub-cluster to be timed out initially
+          lastSCResponseTime.put(subClusterId,
+              clock.getTime() - subClusterTimeOut);
+
           // Running containers from secondary RMs
           for (Container container : response
               .getContainersFromPreviousAttempts()) {
@@ -580,6 +624,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       throws YarnException, IOException {
     Preconditions.checkArgument(this.policyInterpreter != null,
         "Allocate should be called after registerApplicationMaster");
+    this.lastAMHeartbeatTime = this.clock.getTime();
 
     if (this.justRecovered) {
       throw new ApplicationMasterNotRegisteredException(
@@ -644,8 +689,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       }
 
       // Prepare the response to AM
-      AllocateResponse response =
-          RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
+      AllocateResponse response = generateBaseAllocationResponse();
 
       // Merge all responses from response sink
       mergeAllocateResponses(response);
@@ -970,6 +1014,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
                 response = uamPool.registerApplicationMaster(
                     subClusterId.getId(), amRegistrationRequest);
 
+                // Set sub-cluster to be timed out initially
+                lastSCResponseTime.put(subClusterId,
+                    clock.getTime() - subClusterTimeOut);
+
                 if (response != null
                     && response.getContainersFromPreviousAttempts() != null) {
                   cacheAllocatedContainers(
@@ -1172,6 +1220,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       if (!subClusterId.equals(this.homeSubClusterId)
           && !this.uamPool.hasUAMId(subClusterId.getId())) {
         newSubClusters.add(subClusterId);
+
+        // Set sub-cluster to be timed out initially
+        lastSCResponseTime.put(subClusterId,
+            clock.getTime() - subClusterTimeOut);
       }
     }
 
@@ -1245,6 +1297,38 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   }
 
   /**
+   * Prepare the base allocation response. Use lastSCResponse and
+   * lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.
+   * AvailableResource, NumClusterNodes.
+   */
+  protected AllocateResponse generateBaseAllocationResponse() {
+    AllocateResponse baseResponse =
+        RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
+
+    baseResponse.setAvailableResources(Resource.newInstance(0, 0));
+    baseResponse.setNumClusterNodes(0);
+
+    Set<SubClusterId> expiredSC = getTimedOutSCs(false);
+    for (Entry<SubClusterId, AllocateResponse> entry : lastSCResponse
+        .entrySet()) {
+      if (expiredSC.contains(entry.getKey())) {
+        // Skip expired sub-clusters
+        continue;
+      }
+      AllocateResponse response = entry.getValue();
+
+      if (response.getAvailableResources() != null) {
+        baseResponse.setAvailableResources(
+            Resources.add(baseResponse.getAvailableResources(),
+                response.getAvailableResources()));
+      }
+      baseResponse.setNumClusterNodes(
+          baseResponse.getNumClusterNodes() + response.getNumClusterNodes());
+    }
+    return baseResponse;
+  }
+
+  /**
    * Merge the responses from all sub-clusters that we received asynchronously
    * and keeps track of the containers received from each sub-cluster resource
    * managers.
@@ -1345,17 +1429,6 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       }
     }
 
-    if (otherResponse.getAvailableResources() != null) {
-      if (homeResponse.getAvailableResources() != null) {
-        homeResponse.setAvailableResources(
-            Resources.add(homeResponse.getAvailableResources(),
-                otherResponse.getAvailableResources()));
-      } else {
-        homeResponse
-            .setAvailableResources(otherResponse.getAvailableResources());
-      }
-    }
-
     if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
       if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
         homeResponse.getCompletedContainersStatuses()
@@ -1520,6 +1593,29 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     return request;
   }
 
+  protected Set<SubClusterId> getTimedOutSCs(boolean verbose) {
+    Set<SubClusterId> timedOutSCs = new HashSet<>();
+    for (Map.Entry<SubClusterId, Long> entry : this.lastSCResponseTime
+        .entrySet()) {
+      if (entry.getValue() > this.lastAMHeartbeatTime) {
+        // AM haven't heartbeat to us (and thus we to all SCs) for a long time,
+        // should not consider the SC as timed out
+        continue;
+      }
+      long duration = this.clock.getTime() - entry.getValue();
+      if (duration > this.subClusterTimeOut) {
+        if (verbose) {
+          LOG.warn(
+              "Subcluster {} doesn't have a successful heartbeat"
+                  + " for {} seconds for {}",
+              entry.getKey(), (double) duration / 1000, this.attemptId);
+        }
+        timedOutSCs.add(entry.getKey());
+      }
+    }
+    return timedOutSCs;
+  }
+
   /**
    * Check to see if the specified containerId exists in the cache and log an
    * error if not found.
@@ -1553,7 +1649,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
       List<ResourceRequest> askList) throws YarnException {
-    return this.policyInterpreter.splitResourceRequests(askList);
+    return policyInterpreter.splitResourceRequests(askList,
+        getTimedOutSCs(true));
   }
 
   @VisibleForTesting
@@ -1602,6 +1699,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
         // Notify main thread about the response arrival
         asyncResponseSink.notifyAll();
       }
+      lastSCResponse.put(subClusterId, response);
+      lastSCResponseTime.put(subClusterId, clock.getTime());
 
       // Notify policy of allocate response
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index ec75cfd..48b7bf5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -160,6 +160,10 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
     // Disable StateStoreFacade cache
     conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
 
+    // Set sub-cluster timeout to 500ms
+    conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+        500);
+
     return conf;
   }
 
@@ -568,6 +572,8 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
         interceptor.recover(recoveredDataMap);
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        // SC1 should be initialized to be timed out
+        Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
 
         // The first allocate call expects a fail-over exception and 
re-register
         try {
@@ -741,6 +747,60 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
   }
 
   @Test
+  public void testSubClusterTimeOut() throws Exception {
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        // Register the application first time
+        RegisterApplicationMasterRequest registerReq =
+            Records.newRecord(RegisterApplicationMasterRequest.class);
+        registerReq.setHost(Integer.toString(testAppId));
+        registerReq.setRpcPort(0);
+        registerReq.setTrackingUrl("");
+        RegisterApplicationMasterResponse registerResponse =
+            interceptor.registerApplicationMaster(registerReq);
+        Assert.assertNotNull(registerResponse);
+        lastResponseId = 0;
+
+        registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+        getContainersAndAssert(1, 1);
+
+        AllocateResponse allocateResponse =
+            interceptor.generateBaseAllocationResponse();
+        Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
+        Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
+
+        // Let all SC timeout (home and SC-1), without an allocate from AM
+        Thread.sleep(800);
+
+        // Should not be considered timeout, because there's no recent AM
+        // heartbeat
+        allocateResponse = interceptor.generateBaseAllocationResponse();
+        Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
+        Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
+
+        // Generate a duplicate heartbeat from AM, so that it won't really
+        // trigger an heartbeat to all SC
+        AllocateRequest allocateRequest =
+            Records.newRecord(AllocateRequest.class);
+        // Set to lastResponseId - 1 so that it will be considered a duplicate
+        // heartbeat and thus not forwarded to all SCs
+        allocateRequest.setResponseId(lastResponseId - 1);
+        interceptor.allocate(allocateRequest);
+
+        // Should be considered timeout
+        allocateResponse = interceptor.generateBaseAllocationResponse();
+        Assert.assertEquals(0, allocateResponse.getNumClusterNodes());
+        Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size());
+        return null;
+      }
+    });
+  }
+
+  @Test
   public void testSecondAttempt() throws Exception {
     final RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
@@ -803,6 +863,8 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
         int numberOfContainers = 3;
         // Should re-attach secondaries and get the three running containers
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        // SC1 should be initialized to be timed out
+        Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
         Assert.assertEquals(numberOfContainers,
             registerResponse.getContainersFromPreviousAttempts().size());
 
@@ -831,5 +893,4 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
       }
     });
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to