YARN-8435. Fix NPE when the same client simultaneously contact for the first time Yarn Router. Contributed by Rang Jiaheng.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d9804dc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d9804dc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d9804dc Branch: refs/heads/HDDS-48 Commit: 0d9804dcef2eab5ebf84667d9ca49bb035d9a731 Parents: 71df8c2 Author: Giovanni Matteo Fumarola <gif...@apache.com> Authored: Thu Jul 5 10:54:31 2018 -0700 Committer: Giovanni Matteo Fumarola <gif...@apache.com> Committed: Thu Jul 5 10:54:31 2018 -0700 ---------------------------------------------------------------------- .../router/clientrm/RouterClientRMService.java | 53 ++++++++-------- .../router/rmadmin/RouterRMAdminService.java | 51 ++++++++------- .../server/router/webapp/RouterWebServices.java | 48 +++++++-------- .../clientrm/TestRouterClientRMService.java | 60 ++++++++++++++++++ .../rmadmin/TestRouterRMAdminService.java | 60 ++++++++++++++++++ .../router/webapp/TestRouterWebServices.java | 65 ++++++++++++++++++++ 6 files changed, 259 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d9804dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 73cc185..bbb8047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -430,13 +430,15 @@ public class RouterClientRMService extends AbstractService return pipeline.getRootInterceptor().getResourceTypeInfo(request); } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); - if (!userPipelineMap.containsKey(user)) { - initializePipeline(user); + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; } - return userPipelineMap.get(user); + return initializePipeline(user); } /** @@ -503,36 +505,33 @@ public class RouterClientRMService extends AbstractService * * @param user */ - private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; + private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { if (this.userPipelineMap.containsKey(user)) { LOG.info("Request to start an already existing user: {}" + " was received, so ignoring.", user); - return; + return userPipelineMap.get(user); } - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for application " - + "for the user: {}", user); - - try { - ClientRequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(user); - chainWrapper.init(interceptorChain); - } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for application " + + "for the user: {}", user); + + ClientRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init ClientRequestInterceptor error for user: " + user, e); + throw e; } - throw e; + + this.userPipelineMap.put(user, chainWrapper); + return chainWrapper; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d9804dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index b8b7ad8..ef30613 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -165,13 +165,15 @@ public class RouterRMAdminService extends AbstractService return interceptorClassNames; } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); - if (!userPipelineMap.containsKey(user)) { - initializePipeline(user); + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; } - return userPipelineMap.get(user); + return initializePipeline(user); } /** @@ -239,35 +241,32 @@ public class RouterRMAdminService extends AbstractService * * @param user */ - private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; + private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { if (this.userPipelineMap.containsKey(user)) { LOG.info("Request to start an already existing user: {}" + " was received, so ignoring.", user); - return; + return userPipelineMap.get(user); } - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for the user: {}", user); - - try { - RMAdminRequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(user); - chainWrapper.init(interceptorChain); - } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + + RMAdminRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e); + throw e; } - throw e; + + this.userPipelineMap.put(user, chainWrapper); + return chainWrapper; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d9804dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index ae57f1c..49de588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -173,10 +173,11 @@ public class RouterWebServices implements RMWebServiceProtocol { } catch (IOException e) { LOG.error("Cannot get user: {}", e.getMessage()); } - if (!userPipelineMap.containsKey(user)) { - initializePipeline(user); + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; } - return userPipelineMap.get(user); + return initializePipeline(user); } /** @@ -242,35 +243,32 @@ public class RouterWebServices implements RMWebServiceProtocol { * * @param user */ - private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; + private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { if (this.userPipelineMap.containsKey(user)) { LOG.info("Request to start an already existing user: {}" + " was received, so ignoring.", user); - return; + return userPipelineMap.get(user); } - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for the user: {}", user); - - try { - RESTRequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(user); - chainWrapper.init(interceptorChain); - } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + + RESTRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init RESTRequestInterceptor error for user: " + user, e); + throw e; } - throw e; + + this.userPipelineMap.put(user, chainWrapper); + return chainWrapper; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d9804dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java index a9c3729..b03059d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; @@ -207,4 +209,62 @@ public class TestRouterClientRMService extends BaseRouterClientRMTest { Assert.assertNull("test2 should have been evicted", chain); } + /** + * This test validates if the ClientRequestInterceptor chain for the user + * can build and init correctly when a multi-client process begins to + * request RouterClientRMService for the same user simultaneously. + */ + @Test + public void testClientPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client request to get a + * ClientRequestInterceptor for the user. + */ + class ClientTestThread extends Thread { + private ClientRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private ClientRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction<ClientRequestInterceptor>() { + @Override + public ClientRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getRouterClientRMService().getInterceptorChain(); + ClientRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init client interceptor success for user " + user); + return interceptor; + } + }); + } + } + + /* + * We start the first thread. It should not finish initing a chainWrapper + * before the other thread starts. In this way, the second thread can + * init at the same time of the first one. In the end, we validate that + * the 2 threads get the same chainWrapper without going into error. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertTrue(client1.interceptor == client2.interceptor); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d9804dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java index 11786e6..07ef73c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; @@ -216,4 +218,62 @@ public class TestRouterRMAdminService extends BaseRouterRMAdminTest { Assert.assertNull("test2 should have been evicted", chain); } + /** + * This test validates if the RMAdminRequestInterceptor chain for the user + * can build and init correctly when a multi-client process begins to + * request RouterRMAdminService for the same user simultaneously. + */ + @Test + public void testRMAdminPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client request to get a + * RMAdminRequestInterceptor for the user. + */ + class ClientTestThread extends Thread { + private RMAdminRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private RMAdminRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction<RMAdminRequestInterceptor>() { + @Override + public RMAdminRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getRouterRMAdminService().getInterceptorChain(); + RMAdminRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init rm admin interceptor success for user" + user); + return interceptor; + } + }); + } + } + + /* + * We start the first thread. It should not finish initing a chainWrapper + * before the other thread starts. In this way, the second thread can + * init at the same time of the first one. In the end, we validate that + * the 2 threads get the same chainWrapper without going into error. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertTrue(client1.interceptor == client2.interceptor); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d9804dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java index c96575c..1465243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import javax.ws.rs.core.Response; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; @@ -49,12 +51,17 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test class to validate the WebService interceptor model inside the Router. */ public class TestRouterWebServices extends BaseRouterWebServicesTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterWebServices.class); + private String user = "test1"; /** @@ -266,4 +273,62 @@ public class TestRouterWebServices extends BaseRouterWebServicesTest { Assert.assertNull("test2 should have been evicted", chain); } + /** + * This test validates if the RESTRequestInterceptor chain for the user + * can build and init correctly when a multi-client process begins to + * request RouterWebServices for the same user simultaneously. + */ + @Test + public void testWebPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client request to get a + * RESTRequestInterceptor for the user. + */ + class ClientTestThread extends Thread { + private RESTRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private RESTRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction<RESTRequestInterceptor>() { + @Override + public RESTRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getInterceptorChain(user); + RESTRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init web interceptor success for user" + user); + return interceptor; + } + }); + } + } + + /* + * We start the first thread. It should not finish initing a chainWrapper + * before the other thread starts. In this way, the second thread can + * init at the same time of the first one. In the end, we validate that + * the 2 threads get the same chainWrapper without going into error. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertTrue(client1.interceptor == client2.interceptor); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org