http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java new file mode 100644 index 0000000..12b933b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Extends the AbstractRequestInterceptorClient class and provides an + * implementation that simply forwards the client requests to the cluster + * resource manager. + * + */ +public class DefaultClientRequestInterceptor + extends AbstractClientRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultClientRequestInterceptor.class); + private ApplicationClientProtocol clientRMProxy; + private UserGroupInformation user = null; + + @Override + public void init(String userName) { + super.init(userName); + try { + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + + final Configuration conf = this.getConf(); + + clientRMProxy = + user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() { + @Override + public ApplicationClientProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } + }); + } catch (IOException e) { + String message = "Error while creating Router ClientRM Service for user:"; + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public void setNextInterceptor(ClientRequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor," + + "which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return clientRMProxy.getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return clientRMProxy.getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return clientRMProxy.getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return clientRMProxy.getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return clientRMProxy.moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return clientRMProxy.getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return clientRMProxy.submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return clientRMProxy.listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return clientRMProxy.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return clientRMProxy.deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return clientRMProxy.getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return clientRMProxy.getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return clientRMProxy.getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return clientRMProxy.getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return clientRMProxy.getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return clientRMProxy.getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return clientRMProxy.getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return clientRMProxy.getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return clientRMProxy.getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return clientRMProxy.failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return clientRMProxy.updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return clientRMProxy.signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return clientRMProxy.updateApplicationTimeouts(request); + } + + @VisibleForTesting + public void setRMClient(ApplicationClientProtocol clientRM) { + this.clientRMProxy = clientRM; + + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java new file mode 100644 index 0000000..00016dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.LRUCacheHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * RouterClientRMService is a service that runs on each router that can be used + * to intercept and inspect ApplicationClientProtocol messages from client to + * the cluster resource manager. It listens ApplicationClientProtocol messages + * from the client and creates a request intercepting pipeline instance for each + * client. The pipeline is a chain of intercepter instances that can inspect and + * modify the request/response as needed. The main difference with + * AMRMProxyService is the protocol they implement. + */ +public class RouterClientRMService extends AbstractService + implements ApplicationClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterClientRMService.class); + + private Server server; + private InetSocketAddress listenerEndpoint; + + // For each user we store an interceptors' pipeline. + // For performance issue we use LRU cache to keep in memory the newest ones + // and remove the oldest used ones. + private Map<String, RequestInterceptorChainWrapper> userPipelineMap; + + public RouterClientRMService() { + super(RouterClientRMService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting Router ClientRMService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT); + + int maxCacheSize = + conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap( + new LRUCacheHashMap<String, RequestInterceptorChainWrapper>( + maxCacheSize, true)); + + Configuration serverConf = new Configuration(conf); + + int numWorkerThreads = + serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT); + + this.server = rpc.getServer(ApplicationClientProtocol.class, this, + listenerEndpoint, serverConf, null, numWorkerThreads); + + this.server.start(); + LOG.info("Router ClientRMService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping Router ClientRMService"); + if (this.server != null) { + this.server.stop(); + } + userPipelineMap.clear(); + super.serviceStop(); + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List<String> getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = + conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS); + + List<String> interceptorClassNames = new ArrayList<String>(); + Collection<String> tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationTimeouts(request); + } + + private RequestInterceptorChainWrapper getInterceptorChain() + throws IOException { + String user = UserGroupInformation.getCurrentUser().getUserName(); + if (!userPipelineMap.containsKey(user)) { + initializePipeline(user); + } + return userPipelineMap.get(user); + } + + /** + * Gets the Request intercepter chains for all the users. + * + * @return the request intercepter chains. + */ + @VisibleForTesting + protected Map<String, RequestInterceptorChainWrapper> getPipelines() { + return this.userPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + @VisibleForTesting + protected ClientRequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List<String> interceptorClassNames = getInterceptorClassNames(conf); + + ClientRequestInterceptor pipeline = null; + ClientRequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class<?> interceptorClass = conf.getClassByName(interceptorClassName); + if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) { + ClientRequestInterceptor interceptorInstance = + (ClientRequestInterceptor) ReflectionUtils + .newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException( + "Class: " + interceptorClassName + " not instance of " + + ClientRequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationClientRequestInterceptor: " + + interceptorClassName, + e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Initializes the request intercepter pipeline for the specified application. + * + * @param user + */ + private void initializePipeline(String user) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.userPipelineMap.put(user, chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for application " + + "for the user: {}", user); + + try { + ClientRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + synchronized (this.userPipelineMap) { + this.userPipelineMap.remove(user); + } + throw e; + } + } + + /** + * Private structure for encapsulating RequestInterceptor and user instances. + * + */ + @Private + public static class RequestInterceptorChainWrapper { + private ClientRequestInterceptor rootInterceptor; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param interceptor the first interceptor in the pipeline + */ + public synchronized void init(ClientRequestInterceptor interceptor) { + this.rootInterceptor = interceptor; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized ClientRequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Shutdown the chain of interceptors when the object is destroyed. + */ + @Override + protected void finalize() { + rootInterceptor.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java new file mode 100644 index 0000000..7d1dadd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Router ClientRM Proxy Service package. **/ +package org.apache.hadoop.yarn.server.router.clientrm; http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java deleted file mode 100644 index a31d6b9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.router; - -/** - * Test class for YARN Router. - */ -public class TestRouter { - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java new file mode 100644 index 0000000..a283a62 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the RouterClientRMService test cases. It provides utility + * methods that can be used by the concrete test case classes. + * + */ +public abstract class BaseRouterClientRMTest { + + /** + * The RouterClientRMService instance that will be used by all the test cases. + */ + private MockRouterClientRMService clientrmService; + /** + * Thread pool used for asynchronous operations. + */ + private static ExecutorService threadpool = Executors.newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + public final static int TEST_MAX_CACHE_SIZE = 10; + + protected MockRouterClientRMService getRouterClientRMService() { + Assert.assertNotNull(this.clientrmService); + return this.clientrmService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + + MockClientRequestInterceptor.class.getName()); + + this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + TEST_MAX_CACHE_SIZE); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.clientrmService = createAndStartRouterClientRMService(); + } + + @After + public void tearDown() { + if (clientrmService != null) { + clientrmService.stop(); + clientrmService = null; + } + if (this.dispatcher != null) { + this.dispatcher.stop(); + } + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockRouterClientRMService createAndStartRouterClientRMService() { + MockRouterClientRMService svc = new MockRouterClientRMService(); + svc.init(conf); + svc.start(); + return svc; + } + + protected static class MockRouterClientRMService + extends RouterClientRMService { + public MockRouterClientRMService() { + super(); + } + } + + protected GetNewApplicationResponse getNewApplication(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetNewApplicationResponse>() { + @Override + public GetNewApplicationResponse run() throws Exception { + GetNewApplicationRequest req = + GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = + getRouterClientRMService().getNewApplication(req); + return response; + } + }); + } + + protected SubmitApplicationResponse submitApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() { + @Override + public SubmitApplicationResponse run() throws Exception { + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance(appId, "", "", null, + null, false, false, -1, null, null); + SubmitApplicationRequest req = + SubmitApplicationRequest.newInstance(context); + SubmitApplicationResponse response = + getRouterClientRMService().submitApplication(req); + return response; + } + }); + } + + protected KillApplicationResponse forceKillApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() { + @Override + public KillApplicationResponse run() throws Exception { + KillApplicationRequest req = + KillApplicationRequest.newInstance(appId); + KillApplicationResponse response = + getRouterClientRMService().forceKillApplication(req); + return response; + } + }); + } + + protected GetClusterMetricsResponse getClusterMetrics(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetClusterMetricsResponse>() { + @Override + public GetClusterMetricsResponse run() throws Exception { + GetClusterMetricsRequest req = + GetClusterMetricsRequest.newInstance(); + GetClusterMetricsResponse response = + getRouterClientRMService().getClusterMetrics(req); + return response; + } + }); + } + + protected GetClusterNodesResponse getClusterNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetClusterNodesResponse>() { + @Override + public GetClusterNodesResponse run() throws Exception { + GetClusterNodesRequest req = GetClusterNodesRequest.newInstance(); + GetClusterNodesResponse response = + getRouterClientRMService().getClusterNodes(req); + return response; + } + }); + } + + protected GetQueueInfoResponse getQueueInfo(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetQueueInfoResponse>() { + @Override + public GetQueueInfoResponse run() throws Exception { + GetQueueInfoRequest req = + GetQueueInfoRequest.newInstance("default", false, false, false); + GetQueueInfoResponse response = + getRouterClientRMService().getQueueInfo(req); + return response; + } + }); + } + + protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetQueueUserAclsInfoResponse>() { + @Override + public GetQueueUserAclsInfoResponse run() throws Exception { + GetQueueUserAclsInfoRequest req = + GetQueueUserAclsInfoRequest.newInstance(); + GetQueueUserAclsInfoResponse response = + getRouterClientRMService().getQueueUserAcls(req); + return response; + } + }); + } + + protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + String user, final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction<MoveApplicationAcrossQueuesResponse>() { + @Override + public MoveApplicationAcrossQueuesResponse run() throws Exception { + + MoveApplicationAcrossQueuesRequest req = + MoveApplicationAcrossQueuesRequest.newInstance(appId, + "newQueue"); + MoveApplicationAcrossQueuesResponse response = + getRouterClientRMService().moveApplicationAcrossQueues(req); + return response; + } + }); + } + + public GetNewReservationResponse getNewReservation(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetNewReservationResponse>() { + @Override + public GetNewReservationResponse run() throws Exception { + GetNewReservationResponse response = getRouterClientRMService() + .getNewReservation(GetNewReservationRequest.newInstance()); + return response; + } + }); + } + + protected ReservationSubmissionResponse submitReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<ReservationSubmissionResponse>() { + @Override + public ReservationSubmissionResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + + ReservationSubmissionRequest req = createSimpleReservationRequest(1, + arrival, deadline, duration, reservationId); + ReservationSubmissionResponse response = + getRouterClientRMService().submitReservation(req); + return response; + } + }); + } + + protected ReservationUpdateResponse updateReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<ReservationUpdateResponse>() { + @Override + public ReservationUpdateResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationDefinition rDef = + createSimpleReservationRequest(1, arrival, deadline, duration, + reservationId).getReservationDefinition(); + + ReservationUpdateRequest req = + ReservationUpdateRequest.newInstance(rDef, reservationId); + ReservationUpdateResponse response = + getRouterClientRMService().updateReservation(req); + return response; + } + }); + } + + protected ReservationDeleteResponse deleteReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<ReservationDeleteResponse>() { + @Override + public ReservationDeleteResponse run() throws Exception { + ReservationDeleteRequest req = + ReservationDeleteRequest.newInstance(reservationId); + ReservationDeleteResponse response = + getRouterClientRMService().deleteReservation(req); + return response; + } + }); + } + + protected GetNodesToLabelsResponse getNodeToLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetNodesToLabelsResponse>() { + @Override + public GetNodesToLabelsResponse run() throws Exception { + GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance(); + GetNodesToLabelsResponse response = + getRouterClientRMService().getNodeToLabels(req); + return response; + } + }); + } + + protected GetLabelsToNodesResponse getLabelsToNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetLabelsToNodesResponse>() { + @Override + public GetLabelsToNodesResponse run() throws Exception { + GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance(); + GetLabelsToNodesResponse response = + getRouterClientRMService().getLabelsToNodes(req); + return response; + } + }); + } + + protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetClusterNodeLabelsResponse>() { + @Override + public GetClusterNodeLabelsResponse run() throws Exception { + GetClusterNodeLabelsRequest req = + GetClusterNodeLabelsRequest.newInstance(); + GetClusterNodeLabelsResponse response = + getRouterClientRMService().getClusterNodeLabels(req); + return response; + } + }); + } + + protected GetApplicationReportResponse getApplicationReport(String user, + final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetApplicationReportResponse>() { + @Override + public GetApplicationReportResponse run() throws Exception { + GetApplicationReportRequest req = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + getRouterClientRMService().getApplicationReport(req); + return response; + } + }); + } + + protected GetApplicationsResponse getApplications(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetApplicationsResponse>() { + @Override + public GetApplicationsResponse run() throws Exception { + GetApplicationsRequest req = GetApplicationsRequest.newInstance(); + GetApplicationsResponse response = + getRouterClientRMService().getApplications(req); + return response; + } + }); + } + + protected GetApplicationAttemptReportResponse getApplicationAttemptReport( + String user, final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction<GetApplicationAttemptReportResponse>() { + @Override + public GetApplicationAttemptReportResponse run() throws Exception { + GetApplicationAttemptReportRequest req = + GetApplicationAttemptReportRequest.newInstance(appAttemptId); + GetApplicationAttemptReportResponse response = + getRouterClientRMService().getApplicationAttemptReport(req); + return response; + } + }); + } + + protected GetApplicationAttemptsResponse getApplicationAttempts(String user, + final ApplicationId applicationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetApplicationAttemptsResponse>() { + @Override + public GetApplicationAttemptsResponse run() throws Exception { + GetApplicationAttemptsRequest req = + GetApplicationAttemptsRequest.newInstance(applicationId); + GetApplicationAttemptsResponse response = + getRouterClientRMService().getApplicationAttempts(req); + return response; + } + }); + } + + protected GetContainerReportResponse getContainerReport(String user, + final ContainerId containerId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetContainerReportResponse>() { + @Override + public GetContainerReportResponse run() throws Exception { + GetContainerReportRequest req = + GetContainerReportRequest.newInstance(containerId); + GetContainerReportResponse response = + getRouterClientRMService().getContainerReport(req); + return response; + } + }); + } + + protected GetContainersResponse getContainers(String user, + final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetContainersResponse>() { + @Override + public GetContainersResponse run() throws Exception { + GetContainersRequest req = + GetContainersRequest.newInstance(appAttemptId); + GetContainersResponse response = + getRouterClientRMService().getContainers(req); + return response; + } + }); + } + + protected GetDelegationTokenResponse getDelegationToken(final String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() { + @Override + public GetDelegationTokenResponse run() throws Exception { + GetDelegationTokenRequest req = + GetDelegationTokenRequest.newInstance(user); + GetDelegationTokenResponse response = + getRouterClientRMService().getDelegationToken(req); + return response; + } + }); + } + + protected RenewDelegationTokenResponse renewDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() { + @Override + public RenewDelegationTokenResponse run() throws Exception { + RenewDelegationTokenRequest req = + RenewDelegationTokenRequest.newInstance(token); + RenewDelegationTokenResponse response = + getRouterClientRMService().renewDelegationToken(req); + return response; + } + }); + } + + protected CancelDelegationTokenResponse cancelDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() { + @Override + public CancelDelegationTokenResponse run() throws Exception { + CancelDelegationTokenRequest req = + CancelDelegationTokenRequest.newInstance(token); + CancelDelegationTokenResponse response = + getRouterClientRMService().cancelDelegationToken(req); + return response; + } + }); + } + + private ReservationSubmissionRequest createSimpleReservationRequest( + int numContainers, long arrival, long deadline, long duration, + ReservationId reservationId) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance(arrival, + deadline, reqs, "testRouterClientRMService#reservation"); + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(rDef, "dedicated", reservationId); + return request; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java new file mode 100644 index 0000000..b38703f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; + +/** + * This class mocks the ClientRequestInterceptor. + */ +public class MockClientRequestInterceptor + extends DefaultClientRequestInterceptor { + + public void init(String user) { + MockResourceManagerFacade mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + super.setRMClient(mockRM); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/564f5578/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java new file mode 100644 index 0000000..c403bd5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain. + */ +public class PassThroughClientRequestInterceptor + extends AbstractClientRequestInterceptor { + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return getNextInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return getNextInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return getNextInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return getNextInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return getNextInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return getNextInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return getNextInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return getNextInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return getNextInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return getNextInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return getNextInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return getNextInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationTimeouts(request); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org