YARN-7276. Federation Router Web Service fixes. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8180ab43 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8180ab43 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8180ab43 Branch: refs/heads/branch-2 Commit: 8180ab436a19b8e253c3b6c4f392daa32680e187 Parents: 0f20434 Author: Inigo Goiri <inigo...@apache.org> Authored: Wed Nov 1 13:21:15 2017 -0700 Committer: Inigo Goiri <inigo...@apache.org> Committed: Wed Nov 1 13:21:15 2017 -0700 ---------------------------------------------------------------------- .../hadoop-yarn-server-router/pom.xml | 6 + .../hadoop/yarn/server/router/Router.java | 6 + .../webapp/DefaultRequestInterceptorREST.java | 12 +- .../webapp/FederationInterceptorREST.java | 256 ++-- .../router/webapp/RouterWebServiceUtil.java | 62 +- .../server/router/webapp/RouterWebServices.java | 97 +- .../webapp/BaseRouterWebServicesTest.java | 400 ++---- .../yarn/server/router/webapp/JavaProcess.java | 15 +- .../webapp/TestRouterWebServicesREST.java | 1158 ++++++++++-------- 9 files changed, 1021 insertions(+), 991 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index bcdf3d9..5b3ee43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -86,6 +86,12 @@ </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 121e534..76050d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -74,6 +75,8 @@ public class Router extends CompositeService { */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + private static final String METRICS_NAME = "Router"; + public Router() { super(Router.class.getName()); } @@ -95,6 +98,8 @@ public class Router extends CompositeService { webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.ROUTER_BIND_HOST, WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf)); + // Metrics + DefaultMetricsSystem.initialize(METRICS_NAME); super.serviceInit(conf); } @@ -118,6 +123,7 @@ public class Router extends CompositeService { return; } super.serviceStop(); + DefaultMetricsSystem.shutdown(); } protected void shutDown() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index abd8ca6..72ed02f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -129,7 +129,9 @@ public class DefaultRequestInterceptorREST public NodesInfo getNodes(String states) { // states will be part of additionalParam Map<String, String[]> additionalParam = new HashMap<String, String[]>(); - additionalParam.put(RMWSConsts.STATES, new String[] {states}); + if (states != null && !states.isEmpty()) { + additionalParam.put(RMWSConsts.STATES, new String[] {states}); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, NodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null, @@ -226,9 +228,11 @@ public class DefaultRequestInterceptorREST public LabelsToNodesInfo getLabelsToNodes(Set<String> labels) throws IOException { // labels will be part of additionalParam - Map<String, String[]> additionalParam = new HashMap<String, String[]>(); - additionalParam.put(RMWSConsts.LABELS, - labels.toArray(new String[labels.size()])); + Map<String, String[]> additionalParam = new HashMap<>(); + if (labels != null && !labels.isEmpty()) { + additionalParam.put(RMWSConsts.LABELS, + labels.toArray(new String[labels.size()])); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, LabelsToNodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 6ba8ade..2860d10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -26,12 +27,15 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -121,29 +126,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public void init(String user) { federationFacade = FederationStateStoreFacade.getInstance(); - rand = new Random(System.currentTimeMillis()); + rand = new Random(); final Configuration conf = this.getConf(); try { - policyFacade = new RouterPolicyFacade(conf, federationFacade, - this.federationFacade.getSubClusterResolver(), null); + SubClusterResolver subClusterResolver = + this.federationFacade.getSubClusterResolver(); + policyFacade = new RouterPolicyFacade( + conf, federationFacade, subClusterResolver, null); } catch (FederationPolicyInitializationException e) { - LOG.error(e.getMessage()); + throw new YarnRuntimeException(e); } - numSubmitRetries = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + numSubmitRetries = conf.getInt( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); - interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>(); + interceptors = new HashMap<>(); routerMetrics = RouterMetrics.getMetrics(); - threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("FederationInterceptorREST #%d").build()); - - returnPartialReport = - conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); + threadpool = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("FederationInterceptorREST #%d") + .build()); + + returnPartialReport = conf.getBoolean( + YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); } private SubClusterId getRandomActiveSubCluster( @@ -156,8 +165,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); - FederationPolicyUtils.validateSubClusterAvailability(list, - blackListSubClusters); + FederationPolicyUtils.validateSubClusterAvailability( + list, blackListSubClusters); if (blackListSubClusters != null) { @@ -176,8 +185,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (interceptors.containsKey(subClusterId)) { return interceptors.get(subClusterId); } else { - LOG.error("The interceptor for SubCluster " + subClusterId - + " does not exist in the cache."); + LOG.error( + "The interceptor for SubCluster {} does not exist in the cache.", + subClusterId); return null; } } @@ -187,9 +197,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { final Configuration conf = this.getConf(); - String interceptorClassName = - conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); + String interceptorClassName = conf.get( + YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); DefaultRequestInterceptorREST interceptorInstance = null; try { Class<?> interceptorClass = conf.getClassByName(interceptorClassName); @@ -210,7 +220,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { e); } - interceptorInstance.setWebAppAddress(webAppAddress); + interceptorInstance.setWebAppAddress("http://" + webAppAddress); interceptorInstance.setSubClusterId(subClusterId); interceptors.put(subClusterId, interceptorInstance); return interceptorInstance; @@ -272,8 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { .entity(e.getLocalizedMessage()).build(); } - LOG.debug( - "getNewApplication try #" + i + " on SubCluster " + subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId, @@ -282,11 +291,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { response = interceptor.createNewApplication(hsr); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster " - + subClusterId.getId(), e); + LOG.warn("Unable to create a new ApplicationId in SubCluster {}", + subClusterId.getId(), e); } - if (response != null && response.getStatus() == 200) { + if (response != null && + response.getStatus() == HttpServletResponse.SC_OK) { long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); @@ -302,7 +312,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Fail to create a new application."; LOG.error(errMsg); routerMetrics.incrAppsFailedCreated(); - return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); + return Response + .status(Status.INTERNAL_SERVER_ERROR) + .entity(errMsg) + .build(); } /** @@ -381,7 +394,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Missing ApplicationSubmissionContextInfo or " + "applicationSubmissionContex information."; - return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + return Response + .status(Status.BAD_REQUEST) + .entity(errMsg) + .build(); } ApplicationId applicationId = null; @@ -389,7 +405,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { applicationId = ApplicationId.fromString(newApp.getApplicationId()); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -405,11 +423,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterId = policyFacade.getHomeSubcluster(context, blacklist); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } - LOG.info("submitApplication appId" + applicationId + " try #" + i - + " on SubCluster " + subClusterId); + LOG.info("submitApplication appId {} try #{} on SubCluster {}", + applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); @@ -424,8 +444,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Unable to insert the ApplicationId " + applicationId + " into the FederationStateStore"; - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg + " " + e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg + " " + e.getLocalizedMessage()) + .build(); } } else { try { @@ -441,15 +463,19 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { federationFacade.getApplicationHomeSubCluster(applicationId); } catch (YarnException e1) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e1.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e1.getLocalizedMessage()) + .build(); } if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application " + applicationId - + " already submitted on SubCluster " + subClusterId); + LOG.info("Application {} already submitted on SubCluster {}", + applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg) + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) .build(); } } @@ -460,8 +486,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } Response response = null; @@ -470,13 +498,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, hsr); } catch (Exception e) { - LOG.warn("Unable to submit the application " + applicationId - + "to SubCluster " + subClusterId.getId(), e); + LOG.warn("Unable to submit the application {} to SubCluster {}", + applicationId, subClusterId.getId(), e); } - if (response != null && response.getStatus() == 202) { - LOG.info("Application " + context.getApplicationName() + " with appId " - + applicationId + " submitted on " + subClusterId); + if (response != null && + response.getStatus() == HttpServletResponse.SC_ACCEPTED) { + LOG.info("Application {} with appId {} submitted on {}", + context.getApplicationName(), applicationId, subClusterId); long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); @@ -493,7 +522,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Application " + newApp.getApplicationName() + " with appId " + applicationId + " failed to be submitted."; LOG.error(errMsg); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) + .build(); } /** @@ -541,9 +573,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { return null; } - AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId, - subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId, - unselectedFields); + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterId, subClusterInfo.getRMWebServiceAddress()); + AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); @@ -579,7 +612,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -591,7 +626,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -644,26 +681,28 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService<AppsInfo> compSvc = - new ExecutorCompletionService<AppsInfo>(this.threadpool); + CompletionService<AppsInfo> compSvc = + new ExecutorCompletionService<>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { + // HttpServletRequest does not work with ExecutorCompletionService. + // Create a duplicate hsr. + final HttpServletRequest hsrCopy = clone(hsr); compSvc.submit(new Callable<AppsInfo>() { @Override public AppsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); - AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery, - finalStatusQuery, userQuery, queueQuery, count, startedBegin, - startedEnd, finishBegin, finishEnd, applicationTypes, - applicationTags, unselectedFields); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, + statesQuery, finalStatusQuery, userQuery, queueQuery, count, + startedBegin, startedEnd, finishBegin, finishEnd, + applicationTypes, applicationTags, unselectedFields); if (rmApps == null) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return appReport."); + LOG.error("Subcluster {} failed to return appReport.", + info.getSubClusterId()); return null; } return rmApps; @@ -672,8 +711,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Collect all the responses in parallel - - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future<AppsInfo> future = compSvc.take(); AppsInfo appsResponse = future.get(); @@ -686,7 +724,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } } catch (Throwable e) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.warn("Failed to get application report ", e); + LOG.warn("Failed to get application report", e); } } @@ -695,9 +733,42 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Merge all the application reports got from all the available Yarn RMs + return RouterWebServiceUtil.mergeAppsInfo( + apps.getApps(), returnPartialReport); + } - return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), - returnPartialReport); + /** + * Get a copy of a HTTP request. This is for thread safety. + * @param hsr HTTP servlet request to copy. + * @return Copy of the HTTP request. + */ + private HttpServletRequestWrapper clone(final HttpServletRequest hsr) { + if (hsr == null) { + return null; + } + return new HttpServletRequestWrapper(hsr) { + @SuppressWarnings("unchecked") + public Map<String, String[]> getParameterMap() { + return (Map<String, String[]>) hsr.getParameterMap(); + } + public String getPathInfo() { + return hsr.getPathInfo(); + } + public String getRemoteUser() { + return hsr.getRemoteUser(); + } + public Principal getUserPrincipal() { + return hsr.getUserPrincipal(); + } + public String getHeader(String value) { + // we override only Accept + if (value.equals(HttpHeaders.ACCEPT)) { + return RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( + hsr, AppsInfo.class); + } + return null; + } + }; } /** @@ -731,8 +802,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService<NodeInfo> compSvc = + CompletionService<NodeInfo> compSvc = new ExecutorCompletionService<NodeInfo>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -740,14 +810,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public NodeInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodeInfo nodeInfo = interceptor.getNode(nodeId); return nodeInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodeInfo."); + LOG.error("Subcluster {} failed to return nodeInfo.", + info.getSubClusterId()); return null; } } @@ -756,7 +826,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel NodeInfo nodeInfo = null; - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future<NodeInfo> future = compSvc.take(); NodeInfo nodeResponse = future.get(); @@ -765,8 +835,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (nodeResponse != null) { // Check if the node was already found in a different SubCluster and // it has an old health report - if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse - .getLastHealthUpdate()) { + if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < + nodeResponse.getLastHealthUpdate()) { nodeInfo = nodeResponse; } } @@ -808,13 +878,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { subClustersActive = federationFacade.getSubClusters(true); } catch (YarnException e) { - LOG.error(e.getMessage()); + LOG.error("Cannot get nodes: {}", e.getMessage()); return new NodesInfo(); } // Send the requests in parallel - - ExecutorCompletionService<NodesInfo> compSvc = + CompletionService<NodesInfo> compSvc = new ExecutorCompletionService<NodesInfo>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -822,14 +891,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public NodesInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodesInfo nodesInfo = interceptor.getNodes(states); return nodesInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodesInfo."); + LOG.error("Subcluster {} failed to return nodesInfo.", + info.getSubClusterId()); return null; } } @@ -838,7 +907,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future<NodesInfo> future = compSvc.take(); NodesInfo nodesResponse = future.get(); @@ -872,8 +941,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService<ClusterMetricsInfo> compSvc = + CompletionService<ClusterMetricsInfo> compSvc = new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -881,14 +949,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public ClusterMetricsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); return metrics; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return Cluster Metrics."); + LOG.error("Subcluster {} failed to return Cluster Metrics.", + info.getSubClusterId()); return null; } } @@ -897,7 +965,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future<ClusterMetricsInfo> future = compSvc.take(); ClusterMetricsInfo metricsResponse = future.get(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index efc3ea3..40bdbd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.router.webapp; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -28,13 +31,12 @@ import java.util.Map; import java.util.Map.Entry; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; @@ -47,6 +49,8 @@ import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.sun.jersey.api.ConflictException; import com.sun.jersey.api.client.Client; @@ -62,8 +66,8 @@ public final class RouterWebServiceUtil { private static String user = "YarnRouter"; - private static final Log LOG = - LogFactory.getLog(RouterWebServiceUtil.class.getName()); + private static final Logger LOG = + LoggerFactory.getLogger(RouterWebServiceUtil.class.getName()); private final static String PARTIAL_REPORT = "Partial Report "; @@ -85,9 +89,10 @@ public final class RouterWebServiceUtil { * call in case the call has no servlet request * @return the retrieved entity from the REST call */ - protected static <T> T genericForward(final String webApp, - final HttpServletRequest hsr, final Class<T> returnType, - final HTTPMethods method, final String targetPath, final Object formParam, + protected static <T> T genericForward( + final String webApp, final HttpServletRequest hsr, + final Class<T> returnType, final HTTPMethods method, + final String targetPath, final Object formParam, final Map<String, String[]> additionalParam) { UserGroupInformation callerUGI = null; @@ -122,14 +127,22 @@ public final class RouterWebServiceUtil { ClientResponse response = RouterWebServiceUtil.invokeRMWebService( webApp, targetPath, method, - (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam); + (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam, + getMediaTypeFromHttpServletRequest(hsr, returnType)); if (Response.class.equals(returnType)) { return (T) RouterWebServiceUtil.clientResponseToResponse(response); } // YARN RM can answer with Status.OK or it throws an exception - if (response.getStatus() == 200) { + if (response.getStatus() == SC_OK) { return response.getEntity(returnType); } + if (response.getStatus() == SC_NO_CONTENT) { + try { + return returnType.getConstructor().newInstance(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error("Cannot create empty entity for {}", returnType, e); + } + } RouterWebServiceUtil.retrieveException(response); return null; } @@ -148,7 +161,7 @@ public final class RouterWebServiceUtil { */ private static ClientResponse invokeRMWebService(String webApp, String path, HTTPMethods method, String additionalPath, - Map<String, String[]> queryParams, Object formParam) { + Map<String, String[]> queryParams, Object formParam, String mediaType) { Client client = Client.create(); WebResource webResource = client.resource(webApp).path(path); @@ -169,14 +182,12 @@ public final class RouterWebServiceUtil { webResource = webResource.queryParams(paramMap); } - // I can forward the call in JSON or XML since the Router will convert it - // again in Object before send it back to the client Builder builder = null; if (formParam != null) { - builder = webResource.entity(formParam, MediaType.APPLICATION_XML); - builder = builder.accept(MediaType.APPLICATION_XML); + builder = webResource.entity(formParam, mediaType); + builder = builder.accept(mediaType); } else { - builder = webResource.accept(MediaType.APPLICATION_XML); + builder = webResource.accept(mediaType); } ClientResponse response = null; @@ -429,4 +440,25 @@ public final class RouterWebServiceUtil { + metricsResponse.getShutdownNodes()); } + /** + * Extract from HttpServletRequest the MediaType in output. + */ + protected static <T> String getMediaTypeFromHttpServletRequest( + HttpServletRequest request, final Class<T> returnType) { + if (request == null) { + // By default we return XML for REST call without HttpServletRequest + return MediaType.APPLICATION_XML; + } + // TODO + if (!returnType.equals(Response.class)) { + return MediaType.APPLICATION_XML; + } + String header = request.getHeader(HttpHeaders.ACCEPT); + if (header == null || header.equals("*")) { + // By default we return JSON + return MediaType.APPLICATION_JSON; + } + return header; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index 4bb6271..14e7b3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -157,12 +157,19 @@ public class RouterWebServices implements RMWebServiceProtocol { } @VisibleForTesting - protected RequestInterceptorChainWrapper getInterceptorChain() { + protected RequestInterceptorChainWrapper getInterceptorChain( + final HttpServletRequest hsr) { String user = ""; + if (hsr != null) { + user = hsr.getRemoteUser(); + } try { - user = UserGroupInformation.getCurrentUser().getUserName(); + if (user == null || user.equals("")) { + // Yarn Router user + user = UserGroupInformation.getCurrentUser().getUserName(); + } } catch (IOException e) { - LOG.error("IOException " + e.getMessage()); + LOG.error("Cannot get user: {}", e.getMessage()); } if (!userPipelineMap.containsKey(user)) { initializePipeline(user); @@ -313,7 +320,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public ClusterInfo getClusterInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterInfo(); } @@ -323,7 +330,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public ClusterMetricsInfo getClusterMetricsInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterMetricsInfo(); } @@ -333,7 +340,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public SchedulerTypeInfo getSchedulerInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getSchedulerInfo(); } @@ -344,7 +351,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time, @Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr); } @@ -354,7 +361,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNodes(states); } @@ -364,7 +371,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNode(nodeId); } @@ -387,7 +394,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags, @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags, @@ -401,7 +408,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getActivities(hsr, nodeId); } @@ -413,7 +420,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.APP_ID) String appId, @QueryParam(RMWSConsts.MAX_TIME) String time) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time); } @@ -426,7 +433,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.STATES) Set<String> stateQueries, @QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> typeQueries) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries, typeQueries); } @@ -439,7 +446,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields); } @@ -450,7 +457,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppState getAppState(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppState(hsr, appId); } @@ -463,7 +470,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppState(targetState, hsr, appId); } @@ -475,7 +482,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getNodeToLabels(hsr); } @@ -486,7 +493,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public LabelsToNodesInfo getLabelsToNodes( @QueryParam(RMWSConsts.LABELS) Set<String> labels) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getLabelsToNodes(labels); } @@ -498,7 +505,7 @@ public class RouterWebServices implements RMWebServiceProtocol { final NodeToLabelsEntryList newNodeToLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels, hsr); } @@ -512,7 +519,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName, hsr, nodeId); } @@ -524,7 +531,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getClusterNodeLabels(hsr); } @@ -535,7 +542,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels, hsr); } @@ -548,7 +555,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.LABELS) Set<String> oldNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .removeFromCluserNodeLabels(oldNodeLabels, hsr); } @@ -560,7 +567,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId); } @@ -571,7 +578,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppPriority getAppPriority(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppPriority(hsr, appId); } @@ -584,7 +591,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .updateApplicationPriority(targetPriority, hsr, appId); } @@ -596,7 +603,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppQueue getAppQueue(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppQueue(hsr, appId); } @@ -609,7 +616,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr, appId); } @@ -621,7 +628,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response createNewApplication(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewApplication(hsr); } @@ -633,7 +640,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitApplication(newApp, hsr); } @@ -645,7 +652,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr); } @@ -656,7 +663,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr); } @@ -668,7 +675,7 @@ public class RouterWebServices implements RMWebServiceProtocol { throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().cancelDelegationToken(hsr); } @@ -679,7 +686,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response createNewReservation(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewReservation(hsr); } @@ -691,7 +698,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitReservation(resContext, hsr); } @@ -703,7 +710,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateReservation(resContext, hsr); } @@ -715,7 +722,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().deleteReservation(resContext, hsr); } @@ -731,7 +738,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().listReservation(queue, reservationId, startTime, endTime, includeResourceAllocations, hsr); } @@ -744,7 +751,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type); } @@ -755,7 +762,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId); } @@ -768,7 +775,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout, hsr, appId); } @@ -780,7 +787,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppAttempts(hsr, appId); } @@ -792,7 +799,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getAppAttempt(req, res, appId, appAttemptId); } @@ -805,7 +812,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainers(req, res, appId, appAttemptId); } @@ -819,7 +826,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId, @PathParam(RMWSConsts.CONTAINERID) String containerId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainer(req, res, appId, appAttemptId, containerId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 7d42084..9480850 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -20,15 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.security.PrivilegedExceptionAction; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -128,487 +128,263 @@ public abstract class BaseRouterWebServicesTest { protected ClusterInfo get(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ClusterInfo>() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.get(); - } - }); + // HSR is not used here + return routerWebService.get(); } protected ClusterInfo getClusterInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ClusterInfo>() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.getClusterInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterInfo(); } protected ClusterMetricsInfo getClusterMetricsInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ClusterMetricsInfo>() { - @Override - public ClusterMetricsInfo run() throws Exception { - return routerWebService.getClusterMetricsInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterMetricsInfo(); } protected SchedulerTypeInfo getSchedulerInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<SchedulerTypeInfo>() { - @Override - public SchedulerTypeInfo run() throws Exception { - return routerWebService.getSchedulerInfo(); - } - }); + // HSR is not used here + return routerWebService.getSchedulerInfo(); } protected String dumpSchedulerLogs(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<String>() { - @Override - public String run() throws Exception { - return routerWebService.dumpSchedulerLogs(null, null); - } - }); + return routerWebService.dumpSchedulerLogs(null, + createHttpServletRequest(user)); } protected NodesInfo getNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<NodesInfo>() { - @Override - public NodesInfo run() throws Exception { - return routerWebService.getNodes(null); - } - }); + return routerWebService.getNodes(null); } protected NodeInfo getNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<NodeInfo>() { - @Override - public NodeInfo run() throws Exception { - return routerWebService.getNode(null); - } - }); + return routerWebService.getNode(null); } protected AppsInfo getApps(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppsInfo>() { - @Override - public AppsInfo run() throws Exception { - return routerWebService.getApps(null, null, null, null, null, null, - null, null, null, null, null, null, null, null); - } - }); + return routerWebService.getApps(createHttpServletRequest(user), null, null, + null, null, null, null, null, null, null, null, null, null, null); } protected ActivitiesInfo getActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ActivitiesInfo>() { - @Override - public ActivitiesInfo run() throws Exception { - return routerWebService.getActivities(null, null); - } - }); + return routerWebService.getActivities( + createHttpServletRequest(user), null); } protected AppActivitiesInfo getAppActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppActivitiesInfo>() { - @Override - public AppActivitiesInfo run() throws Exception { - return routerWebService.getAppActivities(null, null, null); - } - }); + return routerWebService.getAppActivities( + createHttpServletRequest(user), null, null); } protected ApplicationStatisticsInfo getAppStatistics(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ApplicationStatisticsInfo>() { - @Override - public ApplicationStatisticsInfo run() throws Exception { - return routerWebService.getAppStatistics(null, null, null); - } - }); + return routerWebService.getAppStatistics( + createHttpServletRequest(user), null, null); } protected AppInfo getApp(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppInfo>() { - @Override - public AppInfo run() throws Exception { - return routerWebService.getApp(null, null, null); - } - }); + return routerWebService.getApp(createHttpServletRequest(user), null, null); } protected AppState getAppState(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppState>() { - @Override - public AppState run() throws Exception { - return routerWebService.getAppState(null, null); - } - }); + return routerWebService.getAppState(createHttpServletRequest(user), null); } protected Response updateAppState(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppState(null, null, null); - } - }); + return routerWebService.updateAppState( + null, createHttpServletRequest(user), null); } protected NodeToLabelsInfo getNodeToLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<NodeToLabelsInfo>() { - @Override - public NodeToLabelsInfo run() throws Exception { - return routerWebService.getNodeToLabels(null); - } - }); + return routerWebService.getNodeToLabels(createHttpServletRequest(user)); } protected LabelsToNodesInfo getLabelsToNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<LabelsToNodesInfo>() { - @Override - public LabelsToNodesInfo run() throws Exception { - return routerWebService.getLabelsToNodes(null); - } - }); + return routerWebService.getLabelsToNodes(null); } protected Response replaceLabelsOnNodes(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNodes(null, null); - } - }); + return routerWebService.replaceLabelsOnNodes( + null, createHttpServletRequest(user)); } protected Response replaceLabelsOnNode(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNode(null, null, null); - } - }); + return routerWebService.replaceLabelsOnNode( + null, createHttpServletRequest(user), null); } protected NodeLabelsInfo getClusterNodeLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getClusterNodeLabels(null); - } - }); + return routerWebService.getClusterNodeLabels( + createHttpServletRequest(user)); } protected Response addToClusterNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.addToClusterNodeLabels(null, null); - } - }); + return routerWebService.addToClusterNodeLabels( + null, createHttpServletRequest(user)); } protected Response removeFromCluserNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.removeFromCluserNodeLabels(null, null); - } - }); + return routerWebService.removeFromCluserNodeLabels( + null, createHttpServletRequest(user)); } protected NodeLabelsInfo getLabelsOnNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getLabelsOnNode(null, null); - } - }); + return routerWebService.getLabelsOnNode( + createHttpServletRequest(user), null); } protected AppPriority getAppPriority(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppPriority>() { - @Override - public AppPriority run() throws Exception { - return routerWebService.getAppPriority(null, null); - } - }); + return routerWebService.getAppPriority( + createHttpServletRequest(user), null); } protected Response updateApplicationPriority(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationPriority(null, null, null); - } - }); + return routerWebService.updateApplicationPriority( + null, createHttpServletRequest(user), null); } protected AppQueue getAppQueue(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppQueue>() { - @Override - public AppQueue run() throws Exception { - return routerWebService.getAppQueue(null, null); - } - }); + return routerWebService.getAppQueue(createHttpServletRequest(user), null); } protected Response updateAppQueue(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppQueue(null, null, null); - } - }); + return routerWebService.updateAppQueue( + null, createHttpServletRequest(user), null); } protected Response createNewApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.createNewApplication(null); - } - }); + return routerWebService.createNewApplication( + createHttpServletRequest(user)); } protected Response submitApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.submitApplication(null, null); - } - }); + return routerWebService.submitApplication( + null, createHttpServletRequest(user)); } protected Response postDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationToken(null, null); - } - }); + return routerWebService.postDelegationToken( + null, createHttpServletRequest(user)); } protected Response postDelegationTokenExpiration(String user) throws AuthorizationException, IOException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationTokenExpiration(null); - } - }); + return routerWebService.postDelegationTokenExpiration( + createHttpServletRequest(user)); } protected Response cancelDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.cancelDelegationToken(null); - } - }); + return routerWebService.cancelDelegationToken( + createHttpServletRequest(user)); } protected Response createNewReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.createNewReservation(null); - } - }); + return routerWebService.createNewReservation( + createHttpServletRequest(user)); } protected Response submitReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.submitReservation(null, null); - } - }); + return routerWebService.submitReservation( + null, createHttpServletRequest(user)); } protected Response updateReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.updateReservation(null, null); - } - }); + return routerWebService.updateReservation( + null, createHttpServletRequest(user)); } protected Response deleteReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.deleteReservation(null, null); - } - }); + return routerWebService.deleteReservation( + null, createHttpServletRequest(user)); } protected Response listReservation(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.listReservation(null, null, 0, 0, false, - null); - } - }); + return routerWebService.listReservation( + null, null, 0, 0, false, createHttpServletRequest(user)); } protected AppTimeoutInfo getAppTimeout(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppTimeoutInfo>() { - @Override - public AppTimeoutInfo run() throws Exception { - return routerWebService.getAppTimeout(null, null, null); - } - }); + return routerWebService.getAppTimeout( + createHttpServletRequest(user), null, null); } protected AppTimeoutsInfo getAppTimeouts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppTimeoutsInfo>() { - @Override - public AppTimeoutsInfo run() throws Exception { - return routerWebService.getAppTimeouts(null, null); - } - }); + return routerWebService.getAppTimeouts( + createHttpServletRequest(user), null); } protected Response updateApplicationTimeout(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<Response>() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationTimeout(null, null, null); - } - }); + return routerWebService.updateApplicationTimeout( + null, createHttpServletRequest(user), null); } protected AppAttemptsInfo getAppAttempts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppAttemptsInfo>() { - @Override - public AppAttemptsInfo run() throws Exception { - return routerWebService.getAppAttempts(null, null); - } - }); + return routerWebService.getAppAttempts( + createHttpServletRequest(user), null); } protected AppAttemptInfo getAppAttempt(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<AppAttemptInfo>() { - @Override - public AppAttemptInfo run() throws Exception { - return routerWebService.getAppAttempt(null, null, null, null); - } - }); + return routerWebService.getAppAttempt( + createHttpServletRequest(user), null, null, null); } protected ContainersInfo getContainers(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ContainersInfo>() { - @Override - public ContainersInfo run() throws Exception { - return routerWebService.getContainers(null, null, null, null); - } - }); + return routerWebService.getContainers( + createHttpServletRequest(user), null, null, null); } protected ContainerInfo getContainer(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<ContainerInfo>() { - @Override - public ContainerInfo run() throws Exception { - return routerWebService.getContainer(null, null, null, null, null); - } - }); + return routerWebService.getContainer( + createHttpServletRequest(user), null, null, null, null); } protected RequestInterceptorChainWrapper getInterceptorChain(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction<RequestInterceptorChainWrapper>() { - @Override - public RequestInterceptorChainWrapper run() throws Exception { - return routerWebService.getInterceptorChain(); - } - }); + HttpServletRequest request = createHttpServletRequest(user); + return routerWebService.getInterceptorChain(request); } + private HttpServletRequest createHttpServletRequest(String user) { + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(user); + return request; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8180ab43/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java index d32013f..6c0938c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.File; import java.io.IOException; +import java.util.List; /** * Helper class to start a new process. @@ -28,13 +29,23 @@ public class JavaProcess { private Process process = null; - public JavaProcess(Class<?> klass) throws IOException, InterruptedException { + public JavaProcess(Class<?> clazz) throws IOException, InterruptedException { + this(clazz, null); + } + + public JavaProcess(Class<?> clazz, List<String> addClasspaths) + throws IOException, InterruptedException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); classpath = classpath.concat("./src/test/resources"); - String className = klass.getCanonicalName(); + if (addClasspaths != null) { + for (String addClasspath : addClasspaths) { + classpath = classpath.concat(File.pathSeparatorChar + addClasspath); + } + } + String className = clazz.getCanonicalName(); ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className); builder.inheritIO(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org