YARN-6281. Cleanup when AMRMProxy fails to initialize a new interceptor chain. (Botong Huang via Subru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3d5b4cd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3d5b4cd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3d5b4cd Branch: refs/heads/YARN-2915 Commit: d3d5b4cd1abce52fe2f1196012230fa5b66591b4 Parents: beaca37 Author: Subru Krishnan <su...@apache.org> Authored: Fri Mar 10 18:13:29 2017 -0800 Committer: Subru Krishnan <su...@apache.org> Committed: Mon Apr 24 18:58:53 2017 -0700 ---------------------------------------------------------------------- .../nodemanager/amrmproxy/AMRMProxyService.java | 25 ++++++++++------ .../amrmproxy/BaseAMRMProxyTest.java | 21 ++++++++----- .../amrmproxy/TestAMRMProxyService.java | 31 ++++++++++++++++++++ 3 files changed, 61 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d5b4cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 5e91a20..c17d8ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -297,11 +297,16 @@ public class AMRMProxyService extends AbstractService implements + " ApplicationId:" + applicationAttemptId + " for the user: " + user); - RequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext( - applicationAttemptId, user, amrmToken, localToken)); - chainWrapper.init(interceptorChain, applicationAttemptId); + try { + RequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(createApplicationMasterContext(this.nmContext, + applicationAttemptId, user, amrmToken, localToken)); + chainWrapper.init(interceptorChain, applicationAttemptId); + } catch (Exception e) { + this.applPipelineMap.remove(applicationAttemptId.getApplicationId()); + throw e; + } } /** @@ -317,8 +322,10 @@ public class AMRMProxyService extends AbstractService implements this.applPipelineMap.remove(applicationId); if (pipeline == null) { - LOG.info("Request to stop an application that does not exist. Id:" - + applicationId); + LOG.info( + "No interceptor pipeline for application {}," + + " likely because its AM is not run in this node.", + applicationId); } else { LOG.info("Stopping the request processing pipeline for application: " + applicationId); @@ -387,11 +394,11 @@ public class AMRMProxyService extends AbstractService implements } private AMRMProxyApplicationContext createApplicationMasterContext( - ApplicationAttemptId applicationAttemptId, String user, + Context context, ApplicationAttemptId applicationAttemptId, String user, Token<AMRMTokenIdentifier> amrmToken, Token<AMRMTokenIdentifier> localToken) { AMRMProxyApplicationContextImpl appContext = - new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(), + new AMRMProxyApplicationContextImpl(context, getConfig(), applicationAttemptId, user, amrmToken, localToken); return appContext; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d5b4cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 7f96947..6f5009e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -121,9 +121,9 @@ public abstract class BaseAMRMProxyTest { + MockRequestInterceptor.class.getName()); this.dispatcher = new AsyncDispatcher(); - this.dispatcher.init(conf); + this.dispatcher.init(this.conf); this.dispatcher.start(); - this.amrmProxyService = createAndStartAMRMProxyService(); + createAndStartAMRMProxyService(this.conf); } @After @@ -137,12 +137,19 @@ public abstract class BaseAMRMProxyTest { return threadpool; } - protected MockAMRMProxyService createAndStartAMRMProxyService() { - MockAMRMProxyService svc = + protected Configuration getConf() { + return this.conf; + } + + protected void createAndStartAMRMProxyService(Configuration config) { + // Stop the existing instance first if not null + if (this.amrmProxyService != null) { + this.amrmProxyService.stop(); + } + this.amrmProxyService = new MockAMRMProxyService(new NullContext(), dispatcher); - svc.init(conf); - svc.start(); - return svc; + this.amrmProxyService.init(config); + this.amrmProxyService.start(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d5b4cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 7fffddf..6ac9d84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -20,17 +20,22 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -90,6 +95,32 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { } /** + * Tests the case when interceptor pipeline initialization fails. + */ + @Test + public void testInterceptorInitFailure() { + Configuration conf = this.getConf(); + // Override with a bad interceptor configuration + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + "class.that.does.not.exist"); + + // Reinitialize instance with the new config + createAndStartAMRMProxyService(conf); + int testAppId = 1; + try { + registerApplicationMaster(testAppId); + Assert.fail("Should not reach here. Expecting an exception thrown"); + } catch (Exception e) { + Map<ApplicationId, RequestInterceptorChainWrapper> pipelines = + getAMRMProxyService().getPipelines(); + ApplicationId id = getApplicationId(testAppId); + Assert.assertTrue( + "The interceptor pipeline should be removed if initializtion fails", + pipelines.get(id) == null); + } + } + + /** * Tests the registration of multiple application master serially one at a * time. * --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org