YARN-6281. Cleanup when AMRMProxy fails to initialize a new interceptor chain. 
(Botong Huang via Subru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3d5b4cd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3d5b4cd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3d5b4cd

Branch: refs/heads/YARN-2915
Commit: d3d5b4cd1abce52fe2f1196012230fa5b66591b4
Parents: beaca37
Author: Subru Krishnan <su...@apache.org>
Authored: Fri Mar 10 18:13:29 2017 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Apr 24 18:58:53 2017 -0700

----------------------------------------------------------------------
 .../nodemanager/amrmproxy/AMRMProxyService.java | 25 ++++++++++------
 .../amrmproxy/BaseAMRMProxyTest.java            | 21 ++++++++-----
 .../amrmproxy/TestAMRMProxyService.java         | 31 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d5b4cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index 5e91a20..c17d8ca 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -297,11 +297,16 @@ public class AMRMProxyService extends AbstractService 
implements
         + " ApplicationId:" + applicationAttemptId + " for the user: "
         + user);
 
-    RequestInterceptor interceptorChain =
-        this.createRequestInterceptorChain();
-    interceptorChain.init(createApplicationMasterContext(
-        applicationAttemptId, user, amrmToken, localToken));
-    chainWrapper.init(interceptorChain, applicationAttemptId);
+    try {
+      RequestInterceptor interceptorChain =
+          this.createRequestInterceptorChain();
+      interceptorChain.init(createApplicationMasterContext(this.nmContext,
+          applicationAttemptId, user, amrmToken, localToken));
+      chainWrapper.init(interceptorChain, applicationAttemptId);
+    } catch (Exception e) {
+      this.applPipelineMap.remove(applicationAttemptId.getApplicationId());
+      throw e;
+    }
   }
 
   /**
@@ -317,8 +322,10 @@ public class AMRMProxyService extends AbstractService 
implements
         this.applPipelineMap.remove(applicationId);
 
     if (pipeline == null) {
-      LOG.info("Request to stop an application that does not exist. Id:"
-          + applicationId);
+      LOG.info(
+          "No interceptor pipeline for application {},"
+              + " likely because its AM is not run in this node.",
+          applicationId);
     } else {
       LOG.info("Stopping the request processing pipeline for application: "
           + applicationId);
@@ -387,11 +394,11 @@ public class AMRMProxyService extends AbstractService 
implements
   }
 
   private AMRMProxyApplicationContext createApplicationMasterContext(
-      ApplicationAttemptId applicationAttemptId, String user,
+      Context context, ApplicationAttemptId applicationAttemptId, String user,
       Token<AMRMTokenIdentifier> amrmToken,
       Token<AMRMTokenIdentifier> localToken) {
     AMRMProxyApplicationContextImpl appContext =
-        new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
+        new AMRMProxyApplicationContextImpl(context, getConfig(),
             applicationAttemptId, user, amrmToken, localToken);
     return appContext;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d5b4cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 7f96947..6f5009e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -121,9 +121,9 @@ public abstract class BaseAMRMProxyTest {
             + MockRequestInterceptor.class.getName());
 
     this.dispatcher = new AsyncDispatcher();
-    this.dispatcher.init(conf);
+    this.dispatcher.init(this.conf);
     this.dispatcher.start();
-    this.amrmProxyService = createAndStartAMRMProxyService();
+    createAndStartAMRMProxyService(this.conf);
   }
 
   @After
@@ -137,12 +137,19 @@ public abstract class BaseAMRMProxyTest {
     return threadpool;
   }
 
-  protected MockAMRMProxyService createAndStartAMRMProxyService() {
-    MockAMRMProxyService svc =
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
+  protected void createAndStartAMRMProxyService(Configuration config) {
+    // Stop the existing instance first if not null
+    if (this.amrmProxyService != null) {
+      this.amrmProxyService.stop();
+    }
+    this.amrmProxyService =
         new MockAMRMProxyService(new NullContext(), dispatcher);
-    svc.init(conf);
-    svc.start();
-    return svc;
+    this.amrmProxyService.init(config);
+    this.amrmProxyService.start();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3d5b4cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
index 7fffddf..6ac9d84 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -20,17 +20,22 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -90,6 +95,32 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
   }
 
   /**
+   * Tests the case when interceptor pipeline initialization fails.
+   */
+  @Test
+  public void testInterceptorInitFailure() {
+    Configuration conf = this.getConf();
+    // Override with a bad interceptor configuration
+    conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        "class.that.does.not.exist");
+
+    // Reinitialize instance with the new config
+    createAndStartAMRMProxyService(conf);
+    int testAppId = 1;
+    try {
+      registerApplicationMaster(testAppId);
+      Assert.fail("Should not reach here. Expecting an exception thrown");
+    } catch (Exception e) {
+      Map<ApplicationId, RequestInterceptorChainWrapper> pipelines =
+          getAMRMProxyService().getPipelines();
+      ApplicationId id = getApplicationId(testAppId);
+      Assert.assertTrue(
+          "The interceptor pipeline should be removed if initializtion fails",
+          pipelines.get(id) == null);
+    }
+  }
+
+  /**
    * Tests the registration of multiple application master serially one at a
    * time.
    * 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to