YARN-8933. [AMRMProxy] Fix potential empty fields in allocation response, move SubClusterTimeout to FederationInterceptor. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5ec85d9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5ec85d9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5ec85d9 Branch: refs/heads/YARN-7402 Commit: b5ec85d96615e8214c14b57f8980a1dee6197ffa Parents: 2664248 Author: Botong Huang <bot...@apache.org> Authored: Sun Nov 11 11:12:53 2018 -0800 Committer: Botong Huang <bot...@apache.org> Committed: Sun Nov 11 11:12:53 2018 -0800 ---------------------------------------------------------------------- .../amrmproxy/BroadcastAMRMProxyPolicy.java | 4 +- .../amrmproxy/FederationAMRMProxyPolicy.java | 8 +- .../policies/amrmproxy/HomeAMRMProxyPolicy.java | 5 +- .../LocalityMulticastAMRMProxyPolicy.java | 52 ++------ .../amrmproxy/RejectAMRMProxyPolicy.java | 4 +- .../policies/BaseFederationPoliciesTest.java | 5 +- .../TestBroadcastAMRMProxyFederationPolicy.java | 10 +- .../amrmproxy/TestHomeAMRMProxyPolicy.java | 10 +- .../TestLocalityMulticastAMRMProxyPolicy.java | 41 +++--- .../amrmproxy/TestRejectAMRMProxyPolicy.java | 5 +- .../amrmproxy/FederationInterceptor.java | 129 ++++++++++++++++--- .../amrmproxy/TestFederationInterceptor.java | 63 ++++++++- 12 files changed, 236 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java index eb83baa..643bfa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -49,7 +50,8 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { @Override public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( - List<ResourceRequest> resourceRequests) throws YarnException { + List<ResourceRequest> resourceRequests, + Set<SubClusterId> timedOutSubClusters) throws YarnException { Map<SubClusterId, SubClusterInfo> activeSubclusters = getActiveSubclusters(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java index 0541df4..3d39d72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,15 +40,16 @@ public interface FederationAMRMProxyPolicy * * @param resourceRequests the list of {@link ResourceRequest}s from the AM to * be split - * + * @param timedOutSubClusters the set of sub-clusters that haven't had a + * successful heart-beat response for a while. * @return map of sub-cluster as identified by {@link SubClusterId} to the * list of {@link ResourceRequest}s that should be forwarded to it - * * @throws YarnException in case the request is malformed or no viable * sub-clusters can be found. */ Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( - List<ResourceRequest> resourceRequests) throws YarnException; + List<ResourceRequest> resourceRequests, + Set<SubClusterId> timedOutSubClusters) throws YarnException; /** * This method should be invoked to notify the policy about responses being http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java index 5dd5c53..acb7e0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -55,8 +56,8 @@ public class HomeAMRMProxyPolicy extends AbstractAMRMProxyPolicy { @Override public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( - List<ResourceRequest> resourceRequests) throws YarnException { - + List<ResourceRequest> resourceRequests, + Set<SubClusterId> timedOutSubClusters) throws YarnException { if (homeSubcluster == null) { throw new FederationPolicyException("No home subcluster available"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index e5f26d8..47d23e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -132,11 +131,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private SubClusterResolver resolver; private Map<SubClusterId, Resource> headroom; - private Map<SubClusterId, Long> lastHeartbeatTimeStamp; - private long subClusterTimeOut; private float hrAlpha; private FederationStateStoreFacade federationFacade; - private AllocationBookkeeper bookkeeper; private SubClusterId homeSubcluster; @Override @@ -186,26 +182,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { if (headroom == null) { headroom = new ConcurrentHashMap<>(); - lastHeartbeatTimeStamp = new ConcurrentHashMap<>(); } hrAlpha = policy.getHeadroomAlpha(); this.federationFacade = policyContext.getFederationStateStoreFacade(); this.homeSubcluster = policyContext.getHomeSubcluster(); - - this.subClusterTimeOut = this.federationFacade.getConf().getLong( - YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, - YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); - if (this.subClusterTimeOut <= 0) { - LOG.info( - "{} configured to be {}, should be positive. Using default of {}.", - YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, - this.subClusterTimeOut, - YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); - this.subClusterTimeOut = - YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; - } } @Override @@ -216,18 +198,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, response.getAvailableResources().getMemorySize()); } - lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis()); } @Override public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( - List<ResourceRequest> resourceRequests) throws YarnException { + List<ResourceRequest> resourceRequests, + Set<SubClusterId> timedOutSubClusters) throws YarnException { // object used to accumulate statistics about the answer, initialize with // active subclusters. Create a new instance per call because this method // can be called concurrently. - bookkeeper = new AllocationBookkeeper(); - bookkeeper.reinitialize(federationFacade.getSubClusters(true)); + AllocationBookkeeper bookkeeper = new AllocationBookkeeper(); + bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters); List<ResourceRequest> nonLocalizedRequests = new ArrayList<ResourceRequest>(); @@ -298,15 +280,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // handle all non-localized requests (ANY) splitAnyRequests(nonLocalizedRequests, bookkeeper); - for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : bookkeeper - .getAnswer().entrySet()) { - // A new-cluster here will trigger new UAM luanch, which might take a long - // time. We don't want too many requests stuck in this UAM before it is - // ready and starts heartbeating - if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) { - lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis()); - } - } return bookkeeper.getAnswer(); } @@ -540,8 +513,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private float totPolicyWeight = 0; private void reinitialize( - Map<SubClusterId, SubClusterInfo> activeSubclusters) - throws YarnException { + Map<SubClusterId, SubClusterInfo> activeSubclusters, + Set<SubClusterId> timedOutSubClusters) throws YarnException { if (activeSubclusters == null) { throw new YarnRuntimeException("null activeSubclusters received"); } @@ -573,17 +546,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { } Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC); - for (Map.Entry<SubClusterId, Long> entry : lastHeartbeatTimeStamp - .entrySet()) { - long duration = System.currentTimeMillis() - entry.getValue(); - if (duration > subClusterTimeOut) { - LOG.warn( - "Subcluster {} does not have a success heartbeat for {}s, " - + "skip routing asks there for this request", - entry.getKey(), (double) duration / 1000); - tmpSCSet.remove(entry.getKey()); - } - } + tmpSCSet.removeAll(timedOutSubClusters); + if (tmpSCSet.size() < 1) { LOG.warn("All active and enabled subclusters have expired last " + "heartbeat time. Ignore the expiry check for this request"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java index bed037e..a21234e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -47,7 +48,8 @@ public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy { @Override public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( - List<ResourceRequest> resourceRequests) throws YarnException { + List<ResourceRequest> resourceRequests, + Set<SubClusterId> timedOutSubClusters) throws YarnException { throw new FederationPolicyException("The policy configured for this queue " + "rejects all routing requests by construction."); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 23978ed..57d3c67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -109,8 +110,8 @@ public abstract class BaseFederationPoliciesTest { String[] hosts = new String[] {"host1", "host2"}; List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); - ((FederationAMRMProxyPolicy) localPolicy) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) localPolicy).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java index df5da85..52f36a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -28,7 +29,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -71,8 +71,8 @@ public class TestBroadcastAMRMProxyFederationPolicy .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); Assert.assertTrue(response.size() == 2); for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response .entrySet()) { @@ -94,8 +94,8 @@ public class TestBroadcastAMRMProxyFederationPolicy List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( SubClusterId.newInstance("sc3"), mock(AllocateResponse.class)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java index 90a6aeb..1f57c1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,9 +81,9 @@ public class TestHomeAMRMProxyPolicy extends BaseFederationPoliciesTest { hosts, 2 * 1024, 2, 1, 3, null, false); HomeAMRMProxyPolicy federationPolicy = - (HomeAMRMProxyPolicy)getPolicy(); - Map<SubClusterId, List<ResourceRequest>> response = - federationPolicy.splitResourceRequests(resourceRequests); + (HomeAMRMProxyPolicy) getPolicy(); + Map<SubClusterId, List<ResourceRequest>> response = federationPolicy + .splitResourceRequests(resourceRequests, new HashSet<SubClusterId>()); assertEquals(1, response.size()); assertNotNull(response.get(HOME_SC_ID)); assertEquals(9, response.get(HOME_SC_ID).size()); @@ -101,7 +102,8 @@ public class TestHomeAMRMProxyPolicy extends BaseFederationPoliciesTest { List<ResourceRequest> resourceRequests = createResourceRequests( hosts, 2 * 1024, 2, 1, 3, null, false); HomeAMRMProxyPolicy federationPolicy = (HomeAMRMProxyPolicy)getPolicy(); - federationPolicy.splitResourceRequests(resourceRequests); + federationPolicy.splitResourceRequests(resourceRequests, + new HashSet<SubClusterId>()); fail("It should fail when the home subcluster is not active"); } catch(FederationPolicyException e) { GenericTestUtils.assertExceptionContains("is not active", e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index c49ab60..10359e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -154,8 +154,8 @@ public class TestLocalityMulticastAMRMProxyPolicy prepPolicyWithHeadroom(true); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); // pretty print requests LOG.info("Initial headroom"); @@ -180,7 +180,7 @@ public class TestLocalityMulticastAMRMProxyPolicy ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, new HashSet<SubClusterId>()); LOG.info("After headroom update"); prettyPrintRequests(response); @@ -218,8 +218,8 @@ public class TestLocalityMulticastAMRMProxyPolicy long tstart = System.currentTimeMillis(); for (int i = 0; i < numIterations; i++) { Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); validateSplit(response, resourceRequests); } long tend = System.currentTimeMillis(); @@ -243,8 +243,8 @@ public class TestLocalityMulticastAMRMProxyPolicy prepPolicyWithHeadroom(true); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); // we expect all three to appear for a zero-sized ANY @@ -279,8 +279,8 @@ public class TestLocalityMulticastAMRMProxyPolicy prepPolicyWithHeadroom(true); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); // pretty print requests prettyPrintRequests(response); @@ -354,8 +354,8 @@ public class TestLocalityMulticastAMRMProxyPolicy List<ResourceRequest> resourceRequests = createComplexRequest(); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); validateSplit(response, resourceRequests); prettyPrintRequests(response); @@ -697,8 +697,8 @@ public class TestLocalityMulticastAMRMProxyPolicy ResourceRequest.ANY, 1024, 1, 1, 0, null, false)); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); checkExpectedAllocation(response, "subcluster0", 3, 1); checkExpectedAllocation(response, "subcluster1", 1, 0); @@ -717,7 +717,7 @@ public class TestLocalityMulticastAMRMProxyPolicy ResourceRequest.ANY, 1024, 1, 1, 100, null, false)); response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, new HashSet<SubClusterId>()); /* * Since node request is a cancel, it should not be considered associated @@ -750,12 +750,13 @@ public class TestLocalityMulticastAMRMProxyPolicy initializePolicy(conf); List<ResourceRequest> resourceRequests = createSimpleRequest(); - // Update the response timestamp for the first time prepPolicyWithHeadroom(true); + // For first time, no sub-cluster expired + Set<SubClusterId> expiredSCList = new HashSet<>(); Map<SubClusterId, List<ResourceRequest>> response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, expiredSCList); // pretty print requests prettyPrintRequests(response); @@ -776,11 +777,11 @@ public class TestLocalityMulticastAMRMProxyPolicy Thread.sleep(800); - // Update the response timestamp for the second time, skipping sc0 and sc5 - prepPolicyWithHeadroom(false); - + // For the second time, sc0 and sc5 expired + expiredSCList.add(SubClusterId.newInstance("subcluster0")); + expiredSCList.add(SubClusterId.newInstance("subcluster5")); response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, expiredSCList); // pretty print requests prettyPrintRequests(response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java index 41e7fed..6555829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -69,8 +70,8 @@ public class TestRejectAMRMProxyPolicy .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); Map<SubClusterId, List<ResourceRequest>> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet<SubClusterId>()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ae9f78d..dc10f95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; @@ -163,10 +165,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * Stores the AllocateResponses that are received asynchronously from all the - * sub-cluster resource managers, including home RM. + * sub-cluster resource managers, including home RM, but not merged and + * returned back to AM yet. */ private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink; + /** + * Remembers the last allocate response from all known sub-clusters. This is + * used together with sub-cluster timeout to assemble entries about + * cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the allocate + * response back to AM. + */ + private Map<SubClusterId, AllocateResponse> lastSCResponse; + + /** + * The async UAM registration result that is not consumed yet. + */ private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations; // For unit test synchronization @@ -216,6 +230,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private SubClusterResolver subClusterResolver; + /** + * Records the last time a successful heartbeat response received from a known + * sub-cluster. lastHeartbeatTimeStamp.keySet() should be in sync with + * uamPool.getAllUAMIds(). + */ + private Map<SubClusterId, Long> lastSCResponseTime; + private long subClusterTimeOut; + + private long lastAMHeartbeatTime; + /** The policy used to split requests among sub-clusters. */ private FederationAMRMProxyPolicy policyInterpreter; @@ -232,6 +256,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public FederationInterceptor() { this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>(); this.asyncResponseSink = new ConcurrentHashMap<>(); + this.lastSCResponse = new ConcurrentHashMap<>(); this.uamRegistrations = new ConcurrentHashMap<>(); this.uamRegisterFutures = new ConcurrentHashMap<>(); this.threadpool = Executors.newCachedThreadPool(); @@ -241,6 +266,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationResponse = null; this.justRecovered = false; this.finishAMCalled = false; + this.lastSCResponseTime = new ConcurrentHashMap<>(); + this.lastAMHeartbeatTime = this.clock.getTime(); } /** @@ -310,6 +337,19 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.heartbeatMaxWaitTimeMs = conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS, YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS); + + this.subClusterTimeOut = + conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + if (this.subClusterTimeOut <= 0) { + LOG.info( + "{} configured to be {}, should be positive. Using default of {}.", + YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + this.subClusterTimeOut, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + this.subClusterTimeOut = + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; + } } @Override @@ -394,6 +434,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.uamPool.registerApplicationMaster(subClusterId.getId(), this.amRegistrationRequest); + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, + clock.getTime() - subClusterTimeOut); + // Running containers from secondary RMs for (Container container : response .getContainersFromPreviousAttempts()) { @@ -580,6 +624,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { throws YarnException, IOException { Preconditions.checkArgument(this.policyInterpreter != null, "Allocate should be called after registerApplicationMaster"); + this.lastAMHeartbeatTime = this.clock.getTime(); if (this.justRecovered) { throw new ApplicationMasterNotRegisteredException( @@ -644,8 +689,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } // Prepare the response to AM - AllocateResponse response = - RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + AllocateResponse response = generateBaseAllocationResponse(); // Merge all responses from response sink mergeAllocateResponses(response); @@ -970,6 +1014,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { response = uamPool.registerApplicationMaster( subClusterId.getId(), amRegistrationRequest); + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, + clock.getTime() - subClusterTimeOut); + if (response != null && response.getContainersFromPreviousAttempts() != null) { cacheAllocatedContainers( @@ -1172,6 +1220,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(subClusterId.getId())) { newSubClusters.add(subClusterId); + + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, + clock.getTime() - subClusterTimeOut); } } @@ -1245,6 +1297,38 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** + * Prepare the base allocation response. Use lastSCResponse and + * lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g. + * AvailableResource, NumClusterNodes. + */ + protected AllocateResponse generateBaseAllocationResponse() { + AllocateResponse baseResponse = + RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + + baseResponse.setAvailableResources(Resource.newInstance(0, 0)); + baseResponse.setNumClusterNodes(0); + + Set<SubClusterId> expiredSC = getTimedOutSCs(false); + for (Entry<SubClusterId, AllocateResponse> entry : lastSCResponse + .entrySet()) { + if (expiredSC.contains(entry.getKey())) { + // Skip expired sub-clusters + continue; + } + AllocateResponse response = entry.getValue(); + + if (response.getAvailableResources() != null) { + baseResponse.setAvailableResources( + Resources.add(baseResponse.getAvailableResources(), + response.getAvailableResources())); + } + baseResponse.setNumClusterNodes( + baseResponse.getNumClusterNodes() + response.getNumClusterNodes()); + } + return baseResponse; + } + + /** * Merge the responses from all sub-clusters that we received asynchronously * and keeps track of the containers received from each sub-cluster resource * managers. @@ -1345,17 +1429,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - if (otherResponse.getAvailableResources() != null) { - if (homeResponse.getAvailableResources() != null) { - homeResponse.setAvailableResources( - Resources.add(homeResponse.getAvailableResources(), - otherResponse.getAvailableResources())); - } else { - homeResponse - .setAvailableResources(otherResponse.getAvailableResources()); - } - } - if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) { if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) { homeResponse.getCompletedContainersStatuses() @@ -1520,6 +1593,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return request; } + protected Set<SubClusterId> getTimedOutSCs(boolean verbose) { + Set<SubClusterId> timedOutSCs = new HashSet<>(); + for (Map.Entry<SubClusterId, Long> entry : this.lastSCResponseTime + .entrySet()) { + if (entry.getValue() > this.lastAMHeartbeatTime) { + // AM haven't heartbeat to us (and thus we to all SCs) for a long time, + // should not consider the SC as timed out + continue; + } + long duration = this.clock.getTime() - entry.getValue(); + if (duration > this.subClusterTimeOut) { + if (verbose) { + LOG.warn( + "Subcluster {} doesn't have a successful heartbeat" + + " for {} seconds for {}", + entry.getKey(), (double) duration / 1000, this.attemptId); + } + timedOutSCs.add(entry.getKey()); + } + } + return timedOutSCs; + } + /** * Check to see if the specified containerId exists in the cache and log an * error if not found. @@ -1553,7 +1649,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( List<ResourceRequest> askList) throws YarnException { - return this.policyInterpreter.splitResourceRequests(askList); + return policyInterpreter.splitResourceRequests(askList, + getTimedOutSCs(true)); } @VisibleForTesting @@ -1602,6 +1699,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // Notify main thread about the response arrival asyncResponseSink.notifyAll(); } + lastSCResponse.put(subClusterId, response); + lastSCResponseTime.put(subClusterId, clock.getTime()); // Notify policy of allocate response try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5ec85d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index ec75cfd..48b7bf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -160,6 +160,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { // Disable StateStoreFacade cache conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + // Set sub-cluster timeout to 500ms + conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + 500); + return conf; } @@ -568,6 +572,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); // The first allocate call expects a fail-over exception and re-register try { @@ -741,6 +747,60 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { } @Test + public void testSubClusterTimeOut() throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + // Register the application first time + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + registerSubCluster(SubClusterId.newInstance("SC-1")); + + getContainersAndAssert(1, 1); + + AllocateResponse allocateResponse = + interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); + + // Let all SC timeout (home and SC-1), without an allocate from AM + Thread.sleep(800); + + // Should not be considered timeout, because there's no recent AM + // heartbeat + allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); + + // Generate a duplicate heartbeat from AM, so that it won't really + // trigger an heartbeat to all SC + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + // Set to lastResponseId - 1 so that it will be considered a duplicate + // heartbeat and thus not forwarded to all SCs + allocateRequest.setResponseId(lastResponseId - 1); + interceptor.allocate(allocateRequest); + + // Should be considered timeout + allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(0, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size()); + return null; + } + }); + } + + @Test public void testSecondAttempt() throws Exception { final RegisterApplicationMasterRequest registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); @@ -803,6 +863,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { int numberOfContainers = 3; // Should re-attach secondaries and get the three running containers Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); Assert.assertEquals(numberOfContainers, registerResponse.getContainersFromPreviousAttempts().size()); @@ -831,5 +893,4 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { } }); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org