YARN-3666. Federation Intercepting and propagating AM- home RM communications. (Botong Huang via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c29cf65 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c29cf65 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c29cf65 Branch: refs/heads/YARN-2915 Commit: 7c29cf6505ab404a543422f9590ffb5efa0063e5 Parents: f37a7c3 Author: Subru Krishnan <su...@apache.org> Authored: Wed May 31 13:21:09 2017 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Thu Jun 22 14:09:47 2017 -0700 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 7 + .../amrmproxy/FederationInterceptor.java | 510 +++++++++++++++++++ .../amrmproxy/TestAMRMProxyService.java | 1 + .../amrmproxy/TestFederationInterceptor.java | 167 ++++++ .../TestableFederationInterceptor.java | 133 +++++ 5 files changed, 818 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index ee51094..034f03c 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -594,4 +594,11 @@ <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" /> </Match> + <!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE --> + <Match> + <Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" /> + <Method name="registerApplicationMaster" /> + <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" /> + </Match> + </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java new file mode 100644 index 0000000..5f82d69 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -0,0 +1,510 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the AbstractRequestInterceptor and provides an implementation for + * federation of YARN RM and scaling an application across multiple YARN + * sub-clusters. All the federation specific implementation is encapsulated in + * this class. This is always the last intercepter in the chain. + */ +public class FederationInterceptor extends AbstractRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(FederationInterceptor.class); + + /** + * The home sub-cluster is the sub-cluster where the AM container is running + * in. + */ + private ApplicationMasterProtocol homeRM; + private SubClusterId homeSubClusterId; + + /** + * Used to keep track of the container Id and the sub cluster RM that created + * the container, so that we know which sub-cluster to forward later requests + * about existing containers to. + */ + private Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap; + + /** + * The original registration request that was sent by the AM. This instance is + * reused to register/re-register with all the sub-cluster RMs. + */ + private RegisterApplicationMasterRequest amRegistrationRequest; + + /** + * The original registration response from home RM. This instance is reused + * for duplicate register request from AM, triggered by timeout between AM and + * AMRMProxy. + */ + private RegisterApplicationMasterResponse amRegistrationResponse; + + /** The proxy ugi used to talk to home RM. */ + private UserGroupInformation appOwner; + + /** + * Creates an instance of the FederationInterceptor class. + */ + public FederationInterceptor() { + this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>(); + this.amRegistrationResponse = null; + } + + /** + * Initializes the instance using specified context. + */ + @Override + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + LOG.info("Initializing Federation Interceptor"); + + // Update the conf if available + Configuration conf = appContext.getConf(); + if (conf == null) { + conf = getConf(); + } else { + setConf(conf); + } + + try { + this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), + UserGroupInformation.getCurrentUser()); + } catch (Exception ex) { + throw new YarnRuntimeException(ex); + } + + this.homeSubClusterId = + SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); + this.homeRM = createHomeRMProxy(appContext); + } + + /** + * Sends the application master's registration request to the home RM. + * + * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior, + * so that when AM registers more than once, it returns the same register + * success response instead of throwing + * {@link InvalidApplicationMasterRequestException}. Furthermore, we present + * to AM as if we are the RM that never fails over. When actual RM fails over, + * we always re-register automatically. + * + * We did this because FederationInterceptor can receive concurrent register + * requests from AM because of timeout between AM and AMRMProxy, which is + * shorter than the timeout + failOver between FederationInterceptor + * (AMRMProxy) and RM. + */ + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + // If AM is calling with a different request, complain + if (this.amRegistrationRequest != null + && !this.amRegistrationRequest.equals(request)) { + throw new YarnException("A different request body recieved. AM should" + + " not call registerApplicationMaster with different request body"); + } + + // Save the registration request. This will be used for registering with + // secondary sub-clusters using UAMs, as well as re-register later + this.amRegistrationRequest = request; + + /* + * Present to AM as if we are the RM that never fails over. When actual RM + * fails over, we always re-register automatically. + * + * We did this because it is possible for AM to send duplicate register + * request because of timeout. When it happens, it is fine to simply return + * the success message. Out of all outstanding register threads, only the + * last one will still have an unbroken RPC connection and successfully + * return the response. + */ + if (this.amRegistrationResponse != null) { + return this.amRegistrationResponse; + } + + /* + * Send a registration request to the home resource manager. Note that here + * we don't register with other sub-cluster resource managers because that + * will prevent us from using new sub-clusters that get added while the AM + * is running and will breaks the elasticity feature. The registration with + * the other sub-cluster RM will be done lazily as needed later. + */ + try { + this.amRegistrationResponse = + this.homeRM.registerApplicationMaster(request); + } catch (InvalidApplicationMasterRequestException e) { + if (e.getMessage() + .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) { + // Some other register thread might have succeeded in the meantime + if (this.amRegistrationResponse != null) { + LOG.info("Other concurrent thread registered successfully, " + + "simply return the same success register response"); + return this.amRegistrationResponse; + } + } + // This is a real issue, throw back to AM + throw e; + } + + // the queue this application belongs will be used for getting + // AMRMProxy policy from state store. + String queue = this.amRegistrationResponse.getQueue(); + if (queue == null) { + LOG.warn("Received null queue for application " + + getApplicationContext().getApplicationAttemptId().getApplicationId() + + " from home subcluster. Will use default queue name " + + YarnConfiguration.DEFAULT_QUEUE_NAME + + " for getting AMRMProxyPolicy"); + } else { + LOG.info("Application " + + getApplicationContext().getApplicationAttemptId().getApplicationId() + + " belongs to queue " + queue); + } + + return this.amRegistrationResponse; + } + + /** + * Sends the heart beats to the home RM and the secondary sub-cluster RMs that + * are being used by the application. + */ + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException { + + try { + // Split the heart beat request into multiple requests, one for each + // sub-cluster RM that is used by this application. + Map<SubClusterId, AllocateRequest> requests = + splitAllocateRequest(request); + + // Send the request to the home RM and get the response + AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( + requests.get(this.homeSubClusterId), this.homeRM, + this.amRegistrationRequest, + getApplicationContext().getApplicationAttemptId()); + + // If the resource manager sent us a new token, add to the current user + if (homeResponse.getAMRMToken() != null) { + LOG.debug("Received new AMRMToken"); + YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(), + this.appOwner, getConf()); + } + + // Merge the responses from home and secondary sub-cluster RMs + homeResponse = mergeAllocateResponses(homeResponse); + + // return the final response to the application master. + return homeResponse; + } catch (IOException ex) { + LOG.error("Exception encountered while processing heart beat", ex); + throw new YarnException(ex); + } + } + + /** + * Sends the finish application master request to all the resource managers + * used by the application. + */ + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + + FinishApplicationMasterResponse homeResponse = + AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, + this.amRegistrationRequest, + getApplicationContext().getApplicationAttemptId()); + return homeResponse; + } + + @Override + public void setNextInterceptor(RequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on FederationInterceptor. " + + "It should always be used as the last interceptor in the chain"); + } + + /** + * This is called when the application pipeline is being destroyed. We will + * release all the resources that we are holding in this call. + */ + @Override + public void shutdown() { + super.shutdown(); + } + + /** + * Returns instance of the ApplicationMasterProtocol proxy class that is used + * to connect to the Home resource manager. + * + * @param appContext AMRMProxyApplicationContext + * @return the proxy created + */ + protected ApplicationMasterProtocol createHomeRMProxy( + AMRMProxyApplicationContext appContext) { + try { + return FederationProxyProviderUtil.createRMProxy(appContext.getConf(), + ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner, + appContext.getAMRMToken()); + } catch (Exception ex) { + throw new YarnRuntimeException(ex); + } + } + + /** + * In federation, the heart beat request needs to be sent to all the sub + * clusters from which the AM has requested containers. This method splits the + * specified AllocateRequest from the AM and creates a new request for each + * sub-cluster RM. + */ + private Map<SubClusterId, AllocateRequest> splitAllocateRequest( + AllocateRequest request) throws YarnException { + Map<SubClusterId, AllocateRequest> requestMap = + new HashMap<SubClusterId, AllocateRequest>(); + + // Create heart beat request for home sub-cluster resource manager + findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request, + requestMap); + + if (!isNullOrEmpty(request.getAskList())) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + this.homeSubClusterId, request, requestMap); + newRequest.getAskList().addAll(request.getAskList()); + } + + if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( + request.getResourceBlacklistRequest().getBlacklistAdditions())) { + for (String resourceName : request.getResourceBlacklistRequest() + .getBlacklistAdditions()) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + this.homeSubClusterId, request, requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistAdditions() + .add(resourceName); + } + } + + if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( + request.getResourceBlacklistRequest().getBlacklistRemovals())) { + for (String resourceName : request.getResourceBlacklistRequest() + .getBlacklistRemovals()) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + this.homeSubClusterId, request, requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistRemovals() + .add(resourceName); + } + } + + if (!isNullOrEmpty(request.getReleaseList())) { + for (ContainerId cid : request.getReleaseList()) { + if (warnIfNotExists(cid, "release")) { + SubClusterId subClusterId = + this.containerIdToSubClusterIdMap.get(cid); + AllocateRequest newRequest = requestMap.get(subClusterId); + newRequest.getReleaseList().add(cid); + } + } + } + + if (!isNullOrEmpty(request.getUpdateRequests())) { + for (UpdateContainerRequest ucr : request.getUpdateRequests()) { + if (warnIfNotExists(ucr.getContainerId(), "update")) { + SubClusterId subClusterId = + this.containerIdToSubClusterIdMap.get(ucr.getContainerId()); + AllocateRequest newRequest = requestMap.get(subClusterId); + newRequest.getUpdateRequests().add(ucr); + } + } + } + + return requestMap; + } + + /** + * Merges the responses from other sub-clusters that we received + * asynchronously with the specified home cluster response and keeps track of + * the containers received from each sub-cluster resource managers. + */ + private AllocateResponse mergeAllocateResponses( + AllocateResponse homeResponse) { + // Timing issue, we need to remove the completed and then save the new ones. + if (LOG.isDebugEnabled()) { + LOG.debug("Remove containers: " + + homeResponse.getCompletedContainersStatuses()); + LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers()); + } + removeFinishedContainersFromCache( + homeResponse.getCompletedContainersStatuses()); + cacheAllocatedContainers(homeResponse.getAllocatedContainers(), + this.homeSubClusterId); + + return homeResponse; + } + + /** + * Removes the finished containers from the local cache. + */ + private void removeFinishedContainersFromCache( + List<ContainerStatus> finishedContainers) { + for (ContainerStatus container : finishedContainers) { + if (containerIdToSubClusterIdMap + .containsKey(container.getContainerId())) { + containerIdToSubClusterIdMap.remove(container.getContainerId()); + } + } + } + + /** + * Add allocated containers to cache mapping. + */ + private void cacheAllocatedContainers(List<Container> containers, + SubClusterId subClusterId) { + for (Container container : containers) { + if (containerIdToSubClusterIdMap.containsKey(container.getId())) { + SubClusterId existingSubClusterId = + containerIdToSubClusterIdMap.get(container.getId()); + if (existingSubClusterId.equals(subClusterId)) { + // When RM fails over, the new RM master might send out the same + // container allocation more than once. Just move on in this case. + LOG.warn( + "Duplicate containerID: {} found in the allocated containers" + + " from same subcluster: {}, so ignoring.", + container.getId(), subClusterId); + } else { + // The same container allocation from different subclusters, + // something is wrong. + // TODO: YARN-6667 if some subcluster RM is configured wrong, we + // should not fail the entire heartbeat. + throw new YarnRuntimeException( + "Duplicate containerID found in the allocated containers. This" + + " can happen if the RM epoch is not configured properly." + + " ContainerId: " + container.getId().toString() + + " ApplicationId: " + + getApplicationContext().getApplicationAttemptId() + + " From RM: " + subClusterId + + " . Previous container was from subcluster: " + + existingSubClusterId); + } + } + + containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + } + } + + /** + * Check to see if an AllocateRequest exists in the Map for the specified sub + * cluster. If not found, create a new one, copy the value of responseId and + * progress from the orignialAMRequest, save it in the specified Map and + * return the new instance. If found, just return the old instance. + */ + private static AllocateRequest findOrCreateAllocateRequestForSubCluster( + SubClusterId subClusterId, AllocateRequest originalAMRequest, + Map<SubClusterId, AllocateRequest> requestMap) { + AllocateRequest newRequest = null; + if (requestMap.containsKey(subClusterId)) { + newRequest = requestMap.get(subClusterId); + } else { + newRequest = createAllocateRequest(); + newRequest.setResponseId(originalAMRequest.getResponseId()); + newRequest.setProgress(originalAMRequest.getProgress()); + requestMap.put(subClusterId, newRequest); + } + + return newRequest; + } + + /** + * Create an empty AllocateRequest instance. + */ + private static AllocateRequest createAllocateRequest() { + AllocateRequest request = + AllocateRequest.newInstance(0, 0, null, null, null); + request.setAskList(new ArrayList<ResourceRequest>()); + request.setReleaseList(new ArrayList<ContainerId>()); + ResourceBlacklistRequest blackList = + ResourceBlacklistRequest.newInstance(null, null); + blackList.setBlacklistAdditions(new ArrayList<String>()); + blackList.setBlacklistRemovals(new ArrayList<String>()); + request.setResourceBlacklistRequest(blackList); + request.setUpdateRequests(new ArrayList<UpdateContainerRequest>()); + return request; + } + + /** + * Check to see if the specified containerId exists in the cache and log an + * error if not found. + * + * @param containerId the container id + * @param actionName the name of the action + * @return true if the container exists in the map, false otherwise + */ + private boolean warnIfNotExists(ContainerId containerId, String actionName) { + if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) { + LOG.error("AM is trying to {} a container {} that does not exist. ", + actionName, containerId.toString()); + return false; + } + return true; + } + + /** + * Utility method to check if the specified Collection is null or empty + * + * @param c the collection object + * @param <T> element type of the collection + * @return whether is it is null or empty + */ + public static <T> boolean isNullOrEmpty(Collection<T> c) { + return (c == null || c.size() == 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index e734bdd..72e5f53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java new file mode 100644 index 0000000..3b564f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the TestAMRMProxyService and overrides methods in order to use the + * AMRMProxyService's pipeline test cases for testing the FederationInterceptor + * class. The tests for AMRMProxyService has been written cleverly so that it + * can be reused to validate different request intercepter chains. + */ +public class TestFederationInterceptor extends BaseAMRMProxyTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationInterceptor.class); + + public static final String HOME_SC_ID = "SC-home"; + + private TestableFederationInterceptor interceptor; + + private int testAppId; + private ApplicationAttemptId attemptId; + + @Override + public void setUp() throws IOException { + super.setUp(); + interceptor = new TestableFederationInterceptor(); + + testAppId = 1; + attemptId = getApplicationAttemptId(testAppId); + interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), + attemptId, "test-user", null, null)); + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + @Override + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain is the federation intercepter that calls the mock resource manager. + // The others in the chain will simply forward it to the next one in the + // chain + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + TestableFederationInterceptor.class.getName()); + + conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID); + + return conf; + } + + @Test + public void testRequestInterceptorChainCreation() throws Exception { + RequestInterceptor root = + super.getAMRMProxyService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + switch (index) { + case 0: + case 1: + Assert.assertEquals(PassThroughRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 2: + Assert.assertEquals(TestableFederationInterceptor.class.getName(), + root.getClass().getName()); + break; + default: + Assert.fail(); + } + root = root.getNextInterceptor(); + index++; + } + Assert.assertEquals("The number of interceptors in chain does not match", + Integer.toString(3), Integer.toString(index)); + } + + /** + * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior, + * so that when AM registers more than once, it returns the same register + * success response instead of throwing + * {@link InvalidApplicationMasterRequestException} + * + * We did this because FederationInterceptor can receive concurrent register + * requests from AM because of timeout between AM and AMRMProxy. This can + * possible since the timeout between FederationInterceptor and RM longer + * because of performFailover + timeout. + */ + @Test + public void testTwoIdenticalRegisterRequest() throws Exception { + // Register the application twice + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + for (int i = 0; i < 2; i++) { + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + } + } + + @Test + public void testTwoDifferentRegisterRequest() throws Exception { + // Register the application first time + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + // Register the application second time with a different request obj + registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl("different"); + try { + registerResponse = interceptor.registerApplicationMaster(registerReq); + Assert.fail("Should throw if a different request obj is used"); + } catch (YarnException e) { + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c29cf65/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java new file mode 100644 index 0000000..0ca7488 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; +import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; + +/** + * Extends the FederationInterceptor and overrides methods to provide a testable + * implementation of FederationInterceptor. + */ +public class TestableFederationInterceptor extends FederationInterceptor { + private ConcurrentHashMap<String, MockResourceManagerFacade> + secondaryResourceManagers = new ConcurrentHashMap<>(); + private AtomicInteger runningIndex = new AtomicInteger(0); + private MockResourceManagerFacade mockRm; + + @Override + protected ApplicationMasterProtocol createHomeRMProxy( + AMRMProxyApplicationContext appContext) { + synchronized (this) { + if (mockRm == null) { + mockRm = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + } + } + return mockRm; + } + + @SuppressWarnings("unchecked") + protected <T> T createSecondaryRMProxy(Class<T> proxyClass, + Configuration conf, String subClusterId) throws IOException { + // We create one instance of the mock resource manager per sub cluster. Keep + // track of the instances of the RMs in the map keyed by the sub cluster id + synchronized (this.secondaryResourceManagers) { + if (this.secondaryResourceManagers.contains(subClusterId)) { + return (T) this.secondaryResourceManagers.get(subClusterId); + } else { + // The running index here is used to simulate different RM_EPOCH to + // generate unique container identifiers in a federation environment + MockResourceManagerFacade rm = new MockResourceManagerFacade( + new Configuration(conf), runningIndex.addAndGet(10000)); + this.secondaryResourceManagers.put(subClusterId, rm); + return (T) rm; + } + } + } + + protected void setShouldReRegisterNext() { + if (mockRm != null) { + mockRm.setShouldReRegisterNext(); + } + for (MockResourceManagerFacade subCluster : secondaryResourceManagers + .values()) { + subCluster.setShouldReRegisterNext(); + } + } + + /** + * Extends the UnmanagedAMPoolManager and overrides methods to provide a + * testable implementation of UnmanagedAMPoolManager. + */ + protected class TestableUnmanagedAMPoolManager + extends UnmanagedAMPoolManager { + public TestableUnmanagedAMPoolManager(ExecutorService threadpool) { + super(threadpool); + } + + @Override + public UnmanagedApplicationManager createUAM(Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix) { + return new TestableUnmanagedApplicationManager(conf, appId, queueName, + submitter, appNameSuffix); + } + } + + /** + * Extends the UnmanagedApplicationManager and overrides methods to provide a + * testable implementation. + */ + protected class TestableUnmanagedApplicationManager + extends UnmanagedApplicationManager { + + public TestableUnmanagedApplicationManager(Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix) { + super(conf, appId, queueName, submitter, appNameSuffix); + } + + /** + * We override this method here to return a mock RM instances. The base + * class returns the proxy to the real RM which will not work in case of + * stand alone test cases. + */ + @Override + protected <T> T createRMProxy(Class<T> protocol, Configuration config, + UserGroupInformation user, Token<AMRMTokenIdentifier> token) + throws IOException { + return createSecondaryRMProxy(protocol, config, + YarnConfiguration.getClusterId(config)); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org