YARN-6234. Support multiple attempts on the node when AMRMProxy is enabled. (Giovanni Matteo Fumarola via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd9ff27f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd9ff27f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd9ff27f Branch: refs/heads/HDFS-9806 Commit: cd9ff27ffc9369820d0c39200a11bf00e6a767c8 Parents: 1769b12 Author: Subru Krishnan <su...@apache.org> Authored: Mon May 8 16:41:30 2017 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Mon May 8 16:41:30 2017 -0700 ---------------------------------------------------------------------- .../nodemanager/amrmproxy/AMRMProxyService.java | 32 ++++++++++++++--- .../amrmproxy/TestAMRMProxyService.java | 36 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd9ff27f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 9f2d9a1..2696bca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -270,18 +270,40 @@ public class AMRMProxyService extends AbstractService implements * @param user * @param amrmToken */ - protected void initializePipeline( - ApplicationAttemptId applicationAttemptId, String user, - Token<AMRMTokenIdentifier> amrmToken, + protected void initializePipeline(ApplicationAttemptId applicationAttemptId, + String user, Token<AMRMTokenIdentifier> amrmToken, Token<AMRMTokenIdentifier> localToken) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { - if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) { + if (applPipelineMap + .containsKey(applicationAttemptId.getApplicationId())) { LOG.warn("Request to start an already existing appId was received. " + " This can happen if an application failed and a new attempt " + "was created on this machine. ApplicationId: " + applicationAttemptId.toString()); - return; + + RequestInterceptorChainWrapper chainWrapperBackup = + this.applPipelineMap.get(applicationAttemptId.getApplicationId()); + if (chainWrapperBackup != null + && chainWrapperBackup.getApplicationAttemptId() != null + && !chainWrapperBackup.getApplicationAttemptId() + .equals(applicationAttemptId)) { + // Remove the existing pipeline + LOG.info("Remove the previous pipeline for ApplicationId: " + + applicationAttemptId.toString()); + RequestInterceptorChainWrapper pipeline = + applPipelineMap.remove(applicationAttemptId.getApplicationId()); + try { + pipeline.getRootInterceptor().shutdown(); + } catch (Throwable ex) { + LOG.warn( + "Failed to shutdown the request processing pipeline for app:" + + applicationAttemptId.getApplicationId(), + ex); + } + } else { + return; + } } chainWrapper = new RequestInterceptorChainWrapper(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd9ff27f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 7fffddf..837278c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -27,10 +28,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -380,6 +385,37 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { } } + @Test + public void testMultipleAttemptsSameNode() + throws YarnException, IOException, Exception { + + String user = "hadoop"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId applicationAttemptId; + + // First Attempt + + RegisterApplicationMasterResponse response1 = + registerApplicationMaster(appId.getId()); + Assert.assertNotNull(response1); + + AllocateResponse allocateResponse = allocate(appId.getId()); + Assert.assertNotNull(allocateResponse); + + // Second Attempt + + applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); + getAMRMProxyService().initializePipeline(applicationAttemptId, user, null, + null); + + RequestInterceptorChainWrapper chain2 = + getAMRMProxyService().getPipelines().get(appId); + Assert.assertEquals(applicationAttemptId, chain2.getApplicationAttemptId()); + + allocateResponse = allocate(appId.getId()); + Assert.assertNotNull(allocateResponse); + } + private List<Container> getContainersAndAssert(int appId, int numberOfResourceRequests) throws Exception { AllocateRequest allocateRequest = --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org