YARN-6511. Federation: transparently spanning application across multiple sub-clusters. (Botong Huang via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b71ac1e4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b71ac1e4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b71ac1e4 Branch: refs/heads/YARN-2915 Commit: b71ac1e42bc3887b25660fea5f26195f9a154ac1 Parents: 7c29cf6 Author: Subru Krishnan <su...@apache.org> Authored: Wed Jun 7 14:45:51 2017 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Thu Jun 22 14:11:06 2017 -0700 ---------------------------------------------------------------------- .../policies/FederationPolicyUtils.java | 168 +++++ .../federation/policies/RouterPolicyFacade.java | 21 +- .../amrmproxy/FederationInterceptor.java | 685 ++++++++++++++++++- .../amrmproxy/TestFederationInterceptor.java | 251 +++++++ .../TestableFederationInterceptor.java | 6 + 5 files changed, 1095 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java new file mode 100644 index 0000000..37ce942 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for Federation policy. + */ +@Private +public final class FederationPolicyUtils { + private static final Logger LOG = + LoggerFactory.getLogger(FederationPolicyUtils.class); + + /** Disable constructor. */ + private FederationPolicyUtils() { + } + + /** + * A utilize method to instantiate a policy manager class given the type + * (class name) from {@link SubClusterPolicyConfiguration}. + * + * @param newType class name of the policy manager to create + * @return Policy manager + * @throws FederationPolicyInitializationException if fails + */ + public static FederationPolicyManager instantiatePolicyManager(String newType) + throws FederationPolicyInitializationException { + FederationPolicyManager federationPolicyManager = null; + try { + // create policy instance and set queue + Class<?> c = Class.forName(newType); + federationPolicyManager = (FederationPolicyManager) c.newInstance(); + } catch (ClassNotFoundException e) { + throw new FederationPolicyInitializationException(e); + } catch (InstantiationException e) { + throw new FederationPolicyInitializationException(e); + } catch (IllegalAccessException e) { + throw new FederationPolicyInitializationException(e); + } + return federationPolicyManager; + } + + /** + * Get Federation policy configuration from state store, using default queue + * and configuration as fallback. + * + * @param queue the queue of the application + * @param conf the Yarn configuration + * @param federationFacade state store facade + * @return SubClusterPolicyConfiguration recreated + */ + public static SubClusterPolicyConfiguration loadPolicyConfiguration( + String queue, Configuration conf, + FederationStateStoreFacade federationFacade) { + + // The facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + if (queue != null) { + try { + configuration = federationFacade.getPolicyConfiguration(queue); + } catch (YarnException e) { + LOG.warn("Failed to get policy from FederationFacade with queue " + + queue + ": " + e.getMessage()); + } + } + + // If there is no policy configured for this queue, fallback to the baseline + // policy that is configured either in the store or via XML config + if (configuration == null) { + LOG.info("No policy configured for queue {} in StateStore," + + " fallback to default queue", queue); + queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + try { + configuration = federationFacade.getPolicyConfiguration(queue); + } catch (YarnException e) { + LOG.warn("No fallback behavior defined in store, defaulting to XML " + + "configuration fallback behavior."); + } + } + + // or from XML conf otherwise. + if (configuration == null) { + LOG.info("No policy configured for default queue {} in StateStore," + + " fallback to local config", queue); + + String defaultFederationPolicyManager = + conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER, + YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); + String defaultPolicyParamString = + conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS, + YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); + ByteBuffer defaultPolicyParam = ByteBuffer + .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8)); + + configuration = SubClusterPolicyConfiguration.newInstance(queue, + defaultFederationPolicyManager, defaultPolicyParam); + } + return configuration; + } + + /** + * Get AMRMProxy policy from state store, using default queue and + * configuration as fallback. + * + * @param queue the queue of the application + * @param oldPolicy the previous policy instance (can be null) + * @param conf the Yarn configuration + * @param federationFacade state store facade + * @param homeSubClusterId home sub-cluster id + * @return FederationAMRMProxyPolicy recreated + * @throws FederationPolicyInitializationException if fails + */ + public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue, + FederationAMRMProxyPolicy oldPolicy, Configuration conf, + FederationStateStoreFacade federationFacade, + SubClusterId homeSubClusterId) + throws FederationPolicyInitializationException { + + // Local policy and its configuration + SubClusterPolicyConfiguration configuration = + loadPolicyConfiguration(queue, conf, federationFacade); + + // Instantiate the policyManager and get policy + FederationPolicyInitializationContext context = + new FederationPolicyInitializationContext(configuration, + federationFacade.getSubClusterResolver(), federationFacade, + homeSubClusterId); + + LOG.info("Creating policy manager of type: " + configuration.getType()); + FederationPolicyManager federationPolicyManager = + instantiatePolicyManager(configuration.getType()); + // set queue, reinit policy if required (implementation lazily check + // content of conf), and cache it + federationPolicyManager.setQueue(configuration.getQueue()); + return federationPolicyManager.getAMRMPolicy(context, oldPolicy); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 8c22623..5e31a08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -95,7 +95,7 @@ public class RouterPolicyFacade { new FederationPolicyInitializationContext(configuration, subClusterResolver, federationFacade, homeSubcluster); FederationPolicyManager fallbackPolicyManager = - instantiatePolicyManager(configuration.getType()); + FederationPolicyUtils.instantiatePolicyManager(configuration.getType()); fallbackPolicyManager.setQueue(defaulKey); // add to the cache the fallback behavior @@ -209,7 +209,7 @@ public class RouterPolicyFacade { FederationRouterPolicy routerPolicy = policyMap.get(queue); FederationPolicyManager federationPolicyManager = - instantiatePolicyManager(newType); + FederationPolicyUtils.instantiatePolicyManager(newType); // set queue, reinit policy if required (implementation lazily check // content of conf), and cache it federationPolicyManager.setQueue(queue); @@ -224,23 +224,6 @@ public class RouterPolicyFacade { } } - private static FederationPolicyManager instantiatePolicyManager( - String newType) throws FederationPolicyInitializationException { - FederationPolicyManager federationPolicyManager = null; - try { - // create policy instance and set queue - Class c = Class.forName(newType); - federationPolicyManager = (FederationPolicyManager) c.newInstance(); - } catch (ClassNotFoundException e) { - throw new FederationPolicyInitializationException(e); - } catch (InstantiationException e) { - throw new FederationPolicyInitializationException(e); - } catch (IllegalAccessException e) { - throw new FederationPolicyInitializationException(e); - } - return federationPolicyManager; - } - /** * This method flushes all cached configurations and policies. This should be * invoked if the facade remains activity after very large churn of queues in http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 5f82d69..ffe47f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -24,7 +24,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -38,20 +45,35 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.PreemptionContract; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * Extends the AbstractRequestInterceptor and provides an implementation for * federation of YARN RM and scaling an application across multiple YARN @@ -70,6 +92,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private SubClusterId homeSubClusterId; /** + * UAM pool for secondary sub-clusters (ones other than home sub-cluster), + * using subClusterId as uamId. One UAM is created per sub-cluster RM except + * the home RM. + * + * Creation and register of UAM in secondary sub-clusters happen on-demand, + * when AMRMProxy policy routes resource request to these sub-clusters for the + * first time. AM heart beats to them are also handled asynchronously for + * performance reasons. + */ + private UnmanagedAMPoolManager uamPool; + + /** Thread pool used for asynchronous operations. */ + private ExecutorService threadpool; + + /** + * Stores the AllocateResponses that are received asynchronously from all the + * sub-cluster resource managers except the home RM. + */ + private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink; + + /** * Used to keep track of the container Id and the sub cluster RM that created * the container, so that we know which sub-cluster to forward later requests * about existing containers to. @@ -89,7 +132,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private RegisterApplicationMasterResponse amRegistrationResponse; - /** The proxy ugi used to talk to home RM. */ + private FederationStateStoreFacade federationFacade; + + private SubClusterResolver subClusterResolver; + + /** The policy used to split requests among sub-clusters. */ + private FederationAMRMProxyPolicy policyInterpreter; + + /** + * The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken + * issued by home RM. + */ private UserGroupInformation appOwner; /** @@ -97,6 +150,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ public FederationInterceptor() { this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>(); + this.asyncResponseSink = new ConcurrentHashMap<>(); + this.threadpool = Executors.newCachedThreadPool(); + this.uamPool = createUnmanagedAMPoolManager(this.threadpool); + this.amRegistrationRequest = null; this.amRegistrationResponse = null; } @@ -126,6 +183,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); this.homeRM = createHomeRMProxy(appContext); + + this.federationFacade = FederationStateStoreFacade.getInstance(); + this.subClusterResolver = this.federationFacade.getSubClusterResolver(); + + // AMRMProxyPolicy will be initialized in registerApplicationMaster + this.policyInterpreter = null; + + this.uamPool.init(conf); + this.uamPool.start(); } /** @@ -202,7 +268,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (queue == null) { LOG.warn("Received null queue for application " + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " from home subcluster. Will use default queue name " + + " from home sub-cluster. Will use default queue name " + YarnConfiguration.DEFAULT_QUEUE_NAME + " for getting AMRMProxyPolicy"); } else { @@ -211,6 +277,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + " belongs to queue " + queue); } + // Initialize the AMRMProxyPolicy + try { + this.policyInterpreter = + FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); + } catch (FederationPolicyInitializationException e) { + throw new YarnRuntimeException(e); + } return this.amRegistrationResponse; } @@ -221,6 +295,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException { + Preconditions.checkArgument(this.policyInterpreter != null, + "Allocate should be called after registerApplicationMaster"); try { // Split the heart beat request into multiple requests, one for each @@ -228,12 +304,28 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Map<SubClusterId, AllocateRequest> requests = splitAllocateRequest(request); + // Send the requests to the secondary sub-cluster resource managers. + // These secondary requests are send asynchronously and the responses will + // be collected and merged with the home response. In addition, it also + // return the newly registered Unmanaged AMs. + Registrations newRegistrations = + sendRequestsToSecondaryResourceManagers(requests); + // Send the request to the home RM and get the response AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( requests.get(this.homeSubClusterId), this.homeRM, this.amRegistrationRequest, getApplicationContext().getApplicationAttemptId()); + // Notify policy of home response + try { + this.policyInterpreter.notifyOfResponse(this.homeSubClusterId, + homeResponse); + } catch (YarnException e) { + LOG.warn("notifyOfResponse for policy failed for home sub-cluster " + + this.homeSubClusterId, e); + } + // If the resource manager sent us a new token, add to the current user if (homeResponse.getAMRMToken() != null) { LOG.debug("Received new AMRMToken"); @@ -244,6 +336,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // Merge the responses from home and secondary sub-cluster RMs homeResponse = mergeAllocateResponses(homeResponse); + // Merge the containers and NMTokens from the new registrations into + // the homeResponse. + if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) { + homeResponse = mergeRegistrationResponses(homeResponse, + newRegistrations.getSuccessfulRegistrations()); + } + // return the final response to the application master. return homeResponse; } catch (IOException ex) { @@ -261,10 +360,83 @@ public class FederationInterceptor extends AbstractRequestInterceptor { FinishApplicationMasterRequest request) throws YarnException, IOException { + // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager + boolean failedToUnRegister = false; + ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc = + null; + + // Application master is completing operation. Send the finish + // application master request to all the registered sub-cluster resource + // managers in parallel, wait for the responses and aggregate the results. + Set<String> subClusterIds = this.uamPool.getAllUAMIds(); + if (subClusterIds.size() > 0) { + final FinishApplicationMasterRequest finishRequest = request; + compSvc = + new ExecutorCompletionService<FinishApplicationMasterResponseInfo>( + this.threadpool); + + LOG.info("Sending finish application request to {} sub-cluster RMs", + subClusterIds.size()); + for (final String subClusterId : subClusterIds) { + compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() { + @Override + public FinishApplicationMasterResponseInfo call() throws Exception { + LOG.info("Sending finish application request to RM {}", + subClusterId); + FinishApplicationMasterResponse uamResponse = null; + try { + uamResponse = + uamPool.finishApplicationMaster(subClusterId, finishRequest); + } catch (Throwable e) { + LOG.warn("Failed to finish unmanaged application master: " + + "RM address: " + subClusterId + " ApplicationId: " + + getApplicationContext().getApplicationAttemptId(), e); + } + return new FinishApplicationMasterResponseInfo(uamResponse, + subClusterId); + } + }); + } + } + + // While the finish application request is being processed + // asynchronously by other sub-cluster resource managers, send the same + // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, this.amRegistrationRequest, getApplicationContext().getApplicationAttemptId()); + + if (subClusterIds.size() > 0) { + // Wait for other sub-cluster resource managers to return the + // response and merge it with the home response + LOG.info( + "Waiting for finish application response from {} sub-cluster RMs", + subClusterIds.size()); + for (int i = 0; i < subClusterIds.size(); ++i) { + try { + Future<FinishApplicationMasterResponseInfo> future = compSvc.take(); + FinishApplicationMasterResponseInfo uamResponse = future.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("Received finish application response from RM: " + + uamResponse.getSubClusterId()); + } + if (uamResponse.getResponse() == null + || !uamResponse.getResponse().getIsUnregistered()) { + failedToUnRegister = true; + } + } catch (Throwable e) { + failedToUnRegister = true; + LOG.warn("Failed to finish unmanaged application master: " + + " ApplicationId: " + + getApplicationContext().getApplicationAttemptId(), e); + } + } + } + + if (failedToUnRegister) { + homeResponse.setIsUnregistered(false); + } return homeResponse; } @@ -281,10 +453,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ @Override public void shutdown() { + if (this.uamPool != null) { + this.uamPool.stop(); + } + if (threadpool != null) { + try { + threadpool.shutdown(); + } catch (Throwable ex) { + } + threadpool = null; + } super.shutdown(); } /** + * Create the UAM pool manager for secondary sub-clsuters. For unit test to + * override. + * + * @param threadPool the thread pool to use + * @return the UAM pool manager instance + */ + @VisibleForTesting + protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( + ExecutorService threadPool) { + return new UnmanagedAMPoolManager(threadPool); + } + + /** * Returns instance of the ApplicationMasterProtocol proxy class that is used * to connect to the Home resource manager. * @@ -302,6 +497,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } + private SubClusterId getSubClusterForNode(String nodeName) { + SubClusterId subClusterId = null; + try { + subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName); + } catch (YarnException e) { + LOG.error("Failed to resolve sub-cluster for node " + nodeName + + ", skipping this node", e); + return null; + } + if (subClusterId == null) { + LOG.error("Failed to resolve sub-cluster for node {}, skipping this node", + nodeName); + return null; + } + return subClusterId; + } + /** * In federation, the heart beat request needs to be sent to all the sub * clusters from which the AM has requested containers. This method splits the @@ -317,20 +529,39 @@ public class FederationInterceptor extends AbstractRequestInterceptor { findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request, requestMap); + // Create heart beat request instances for all other already registered + // sub-cluster resource managers + Set<String> subClusterIds = this.uamPool.getAllUAMIds(); + for (String subClusterId : subClusterIds) { + findOrCreateAllocateRequestForSubCluster( + SubClusterId.newInstance(subClusterId), request, requestMap); + } + if (!isNullOrEmpty(request.getAskList())) { - AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( - this.homeSubClusterId, request, requestMap); - newRequest.getAskList().addAll(request.getAskList()); + // Ask the federation policy interpreter to split the ask list for + // sending it to all the sub-cluster resource managers. + Map<SubClusterId, List<ResourceRequest>> asks = + splitResourceRequests(request.getAskList()); + + // Add the askLists to the corresponding sub-cluster requests. + for (Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + entry.getKey(), request, requestMap); + newRequest.getAskList().addAll(entry.getValue()); + } } if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( request.getResourceBlacklistRequest().getBlacklistAdditions())) { for (String resourceName : request.getResourceBlacklistRequest() .getBlacklistAdditions()) { - AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( - this.homeSubClusterId, request, requestMap); - newRequest.getResourceBlacklistRequest().getBlacklistAdditions() - .add(resourceName); + SubClusterId subClusterId = getSubClusterForNode(resourceName); + if (subClusterId != null) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + subClusterId, request, requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistAdditions() + .add(resourceName); + } } } @@ -338,10 +569,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { request.getResourceBlacklistRequest().getBlacklistRemovals())) { for (String resourceName : request.getResourceBlacklistRequest() .getBlacklistRemovals()) { - AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( - this.homeSubClusterId, request, requestMap); - newRequest.getResourceBlacklistRequest().getBlacklistRemovals() - .add(resourceName); + SubClusterId subClusterId = getSubClusterForNode(resourceName); + if (subClusterId != null) { + AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( + subClusterId, request, requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistRemovals() + .add(resourceName); + } } } @@ -371,6 +605,174 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** + * This methods sends the specified AllocateRequests to the appropriate + * sub-cluster resource managers. + * + * @param requests contains the heart beat requests to send to the resource + * manager keyed by the resource manager address + * @return the registration responses from the newly added sub-cluster + * resource managers + * @throws YarnException + * @throws IOException + */ + private Registrations sendRequestsToSecondaryResourceManagers( + Map<SubClusterId, AllocateRequest> requests) + throws YarnException, IOException { + + // Create new UAM instances for the sub-cluster that we have not seen + // before + Registrations registrations = registerWithNewSubClusters(requests.keySet()); + + // Now that all the registrations are done, send the allocation request + // to the sub-cluster RMs using the Unmanaged application masters + // asynchronously and don't wait for the response. The responses will + // arrive asynchronously and will be added to the response sink. These + // responses will be sent to the application master in some future heart + // beat response. + for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) { + final SubClusterId subClusterId = entry.getKey(); + + if (subClusterId.equals(this.homeSubClusterId)) { + // Skip the request for the home sub-cluster resource manager. + // It will be handled separately in the allocate() method + continue; + } + + if (!this.uamPool.hasUAMId(subClusterId.getId())) { + // TODO: This means that the registration for this sub-cluster RM + // failed. For now, we ignore the resource requests and continue + // but we need to fix this and handle this situation. One way would + // be to send the request to another RM by consulting the policy. + LOG.warn("Unmanaged AM registration not found for sub-cluster {}", + subClusterId); + continue; + } + + this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), + new AsyncCallback<AllocateResponse>() { + @Override + public void callback(AllocateResponse response) { + synchronized (asyncResponseSink) { + List<AllocateResponse> responses = null; + if (asyncResponseSink.containsKey(subClusterId)) { + responses = asyncResponseSink.get(subClusterId); + } else { + responses = new ArrayList<>(); + asyncResponseSink.put(subClusterId, responses); + } + responses.add(response); + } + + // Notify policy of secondary sub-cluster responses + try { + policyInterpreter.notifyOfResponse(subClusterId, response); + } catch (YarnException e) { + LOG.warn( + "notifyOfResponse for policy failed for home sub-cluster " + + subClusterId, + e); + } + } + }); + } + + return registrations; + } + + /** + * This method ensures that Unmanaged AMs are created for each of the + * specified sub-cluster specified in the input and registers with the + * corresponding resource managers. + */ + private Registrations registerWithNewSubClusters( + Set<SubClusterId> subClusterSet) throws IOException { + + List<SubClusterId> failedRegistrations = new ArrayList<>(); + Map<SubClusterId, RegisterApplicationMasterResponse> + successfulRegistrations = new HashMap<>(); + + // Check to see if there are any new sub-clusters in this request + // list and create and register Unmanaged AM instance for the new ones + List<String> newSubClusters = new ArrayList<>(); + for (SubClusterId subClusterId : subClusterSet) { + if (!subClusterId.equals(this.homeSubClusterId) + && !this.uamPool.hasUAMId(subClusterId.getId())) { + newSubClusters.add(subClusterId.getId()); + } + } + + if (newSubClusters.size() > 0) { + final RegisterApplicationMasterRequest registerRequest = + this.amRegistrationRequest; + final AMRMProxyApplicationContext appContext = getApplicationContext(); + ExecutorCompletionService<RegisterApplicationMasterResponseInfo> + completionService = new ExecutorCompletionService<>(threadpool); + + for (final String subClusterId : newSubClusters) { + completionService + .submit(new Callable<RegisterApplicationMasterResponseInfo>() { + @Override + public RegisterApplicationMasterResponseInfo call() + throws Exception { + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId); + + RegisterApplicationMasterResponse uamResponse = null; + try { + // For appNameSuffix, use subClusterId of the home sub-cluster + uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, + registerRequest, config, + appContext.getApplicationAttemptId().getApplicationId(), + amRegistrationResponse.getQueue(), appContext.getUser(), + homeSubClusterId.toString()); + } catch (Throwable e) { + LOG.error("Failed to register application master: " + + subClusterId + " Application: " + + appContext.getApplicationAttemptId(), e); + } + return new RegisterApplicationMasterResponseInfo(uamResponse, + SubClusterId.newInstance(subClusterId)); + } + }); + } + + // Wait for other sub-cluster resource managers to return the + // response and add it to the Map for returning to the caller + for (int i = 0; i < newSubClusters.size(); ++i) { + try { + Future<RegisterApplicationMasterResponseInfo> future = + completionService.take(); + RegisterApplicationMasterResponseInfo uamResponse = future.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("Received register application response from RM: " + + uamResponse.getSubClusterId()); + } + + if (uamResponse.getResponse() == null) { + failedRegistrations.add(uamResponse.getSubClusterId()); + } else { + LOG.info("Successfully registered unmanaged application master: " + + uamResponse.getSubClusterId() + " ApplicationId: " + + getApplicationContext().getApplicationAttemptId()); + successfulRegistrations.put(uamResponse.getSubClusterId(), + uamResponse.getResponse()); + } + } catch (Exception e) { + LOG.warn("Failed to register unmanaged application master: " + + " ApplicationId: " + + getApplicationContext().getApplicationAttemptId(), e); + } + } + } + + return new Registrations(successfulRegistrations, failedRegistrations); + } + + /** * Merges the responses from other sub-clusters that we received * asynchronously with the specified home cluster response and keeps track of * the containers received from each sub-cluster resource managers. @@ -388,6 +790,24 @@ public class FederationInterceptor extends AbstractRequestInterceptor { cacheAllocatedContainers(homeResponse.getAllocatedContainers(), this.homeSubClusterId); + synchronized (this.asyncResponseSink) { + for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink + .entrySet()) { + SubClusterId subClusterId = entry.getKey(); + List<AllocateResponse> responses = entry.getValue(); + if (responses.size() > 0) { + for (AllocateResponse response : responses) { + removeFinishedContainersFromCache( + response.getCompletedContainersStatuses()); + cacheAllocatedContainers(response.getAllocatedContainers(), + subClusterId); + mergeAllocateResponse(homeResponse, response, subClusterId); + } + responses.clear(); + } + } + } + return homeResponse; } @@ -405,6 +825,130 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** + * Helper method for merging the responses from the secondary sub cluster RMs + * with the home response to return to the AM. + */ + private AllocateResponse mergeRegistrationResponses( + AllocateResponse homeResponse, + Map<SubClusterId, RegisterApplicationMasterResponse> registrations) { + + for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry : + registrations.entrySet()) { + RegisterApplicationMasterResponse registration = entry.getValue(); + + if (!isNullOrEmpty(registration.getContainersFromPreviousAttempts())) { + List<Container> tempContainers = homeResponse.getAllocatedContainers(); + if (!isNullOrEmpty(tempContainers)) { + tempContainers + .addAll(registration.getContainersFromPreviousAttempts()); + homeResponse.setAllocatedContainers(tempContainers); + } else { + homeResponse.setAllocatedContainers( + registration.getContainersFromPreviousAttempts()); + } + cacheAllocatedContainers( + registration.getContainersFromPreviousAttempts(), entry.getKey()); + } + + if (!isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) { + List<NMToken> tempTokens = homeResponse.getNMTokens(); + if (!isNullOrEmpty(tempTokens)) { + tempTokens.addAll(registration.getNMTokensFromPreviousAttempts()); + homeResponse.setNMTokens(tempTokens); + } else { + homeResponse + .setNMTokens(registration.getNMTokensFromPreviousAttempts()); + } + } + } + + return homeResponse; + } + + private void mergeAllocateResponse(AllocateResponse homeResponse, + AllocateResponse otherResponse, SubClusterId otherRMAddress) { + + if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) { + if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) { + homeResponse.getAllocatedContainers() + .addAll(otherResponse.getAllocatedContainers()); + } else { + homeResponse + .setAllocatedContainers(otherResponse.getAllocatedContainers()); + } + } + + if (otherResponse.getAvailableResources() != null) { + if (homeResponse.getAvailableResources() != null) { + homeResponse.setAvailableResources( + Resources.add(homeResponse.getAvailableResources(), + otherResponse.getAvailableResources())); + } else { + homeResponse + .setAvailableResources(otherResponse.getAvailableResources()); + } + } + + if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) { + if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) { + homeResponse.getCompletedContainersStatuses() + .addAll(otherResponse.getCompletedContainersStatuses()); + } else { + homeResponse.setCompletedContainersStatuses( + otherResponse.getCompletedContainersStatuses()); + } + } + + if (!isNullOrEmpty(otherResponse.getUpdatedNodes())) { + if (!isNullOrEmpty(homeResponse.getUpdatedNodes())) { + homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes()); + } else { + homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes()); + } + } + + if (!isNullOrEmpty(otherResponse.getNMTokens())) { + if (!isNullOrEmpty(homeResponse.getNMTokens())) { + homeResponse.getNMTokens().addAll(otherResponse.getNMTokens()); + } else { + homeResponse.setNMTokens(otherResponse.getNMTokens()); + } + } + + PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage(); + PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage(); + + if (homePreempMessage == null && otherPreempMessage != null) { + homeResponse.setPreemptionMessage(otherPreempMessage); + } + + if (homePreempMessage != null && otherPreempMessage != null) { + PreemptionContract par1 = homePreempMessage.getContract(); + PreemptionContract par2 = otherPreempMessage.getContract(); + + if (par1 == null && par2 != null) { + homePreempMessage.setContract(par2); + } + + if (par1 != null && par2 != null) { + par1.getResourceRequest().addAll(par2.getResourceRequest()); + par2.getContainers().addAll(par2.getContainers()); + } + + StrictPreemptionContract spar1 = homePreempMessage.getStrictContract(); + StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract(); + + if (spar1 == null && spar2 != null) { + homePreempMessage.setStrictContract(spar2); + } + + if (spar1 != null && spar2 != null) { + spar1.getContainers().addAll(spar2.getContainers()); + } + } + } + + /** * Add allocated containers to cache mapping. */ private void cacheAllocatedContainers(List<Container> containers, @@ -418,10 +962,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // container allocation more than once. Just move on in this case. LOG.warn( "Duplicate containerID: {} found in the allocated containers" - + " from same subcluster: {}, so ignoring.", + + " from same sub-cluster: {}, so ignoring.", container.getId(), subClusterId); } else { - // The same container allocation from different subclusters, + // The same container allocation from different sub-clusters, // something is wrong. // TODO: YARN-6667 if some subcluster RM is configured wrong, we // should not fail the entire heartbeat. @@ -432,7 +976,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + " ApplicationId: " + getApplicationContext().getApplicationAttemptId() + " From RM: " + subClusterId - + " . Previous container was from subcluster: " + + " . Previous container was from sub-cluster: " + existingSubClusterId); } } @@ -498,7 +1042,102 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Utility method to check if the specified Collection is null or empty + * Splits the specified request to send it to different sub clusters. The + * splitting algorithm is very simple. If the request does not have a node + * preference, the policy decides the sub cluster. If the request has a node + * preference and if locality is required, then it is sent to the sub cluster + * that contains the requested node. If node preference is specified and + * locality is not required, then the policy decides the sub cluster. + * + * @param askList the ask list to split + * @return the split asks + * @throws YarnException if split fails + */ + protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests( + List<ResourceRequest> askList) throws YarnException { + return this.policyInterpreter.splitResourceRequests(askList); + } + + @VisibleForTesting + public int getUnmanagedAMPoolSize() { + return this.uamPool.getAllUAMIds().size(); + } + + /** + * Private structure for encapsulating SubClusterId and + * RegisterApplicationMasterResponse instances. + */ + private static class RegisterApplicationMasterResponseInfo { + private RegisterApplicationMasterResponse response; + private SubClusterId subClusterId; + + RegisterApplicationMasterResponseInfo( + RegisterApplicationMasterResponse response, SubClusterId subClusterId) { + this.response = response; + this.subClusterId = subClusterId; + } + + public RegisterApplicationMasterResponse getResponse() { + return response; + } + + public SubClusterId getSubClusterId() { + return subClusterId; + } + } + + /** + * Private structure for encapsulating SubClusterId and + * FinishApplicationMasterResponse instances. + */ + private static class FinishApplicationMasterResponseInfo { + private FinishApplicationMasterResponse response; + private String subClusterId; + + FinishApplicationMasterResponseInfo( + FinishApplicationMasterResponse response, String subClusterId) { + this.response = response; + this.subClusterId = subClusterId; + } + + public FinishApplicationMasterResponse getResponse() { + return response; + } + + public String getSubClusterId() { + return subClusterId; + } + } + + /** + * Private structure for encapsulating successful and failed application + * master registration responses. + */ + private static class Registrations { + private Map<SubClusterId, RegisterApplicationMasterResponse> + successfulRegistrations; + private List<SubClusterId> failedRegistrations; + + Registrations( + Map<SubClusterId, RegisterApplicationMasterResponse> + successfulRegistrations, + List<SubClusterId> failedRegistrations) { + this.successfulRegistrations = successfulRegistrations; + this.failedRegistrations = failedRegistrations; + } + + public Map<SubClusterId, RegisterApplicationMasterResponse> + getSuccessfulRegistrations() { + return this.successfulRegistrations; + } + + public List<SubClusterId> getFailedRegistrations() { + return this.failedRegistrations; + } + } + + /** + * Utility method to check if the specified Collection is null or empty. * * @param c the collection object * @param <T> element type of the collection @@ -507,4 +1146,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public static <T> boolean isNullOrEmpty(Collection<T> c) { return (c == null || c.size() == 0); } + + /** + * Utility method to check if the specified Collection is null or empty. + * + * @param c the map object + * @param <T1> key type of the map + * @param <T2> value type of the map + * @return whether is it is null or empty + */ + public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) { + return (c == null || c.size() == 0); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 3b564f0..4e15323 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -19,13 +19,31 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -45,6 +63,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { public static final String HOME_SC_ID = "SC-home"; private TestableFederationInterceptor interceptor; + private MemoryFederationStateStore stateStore; private int testAppId; private ApplicationAttemptId attemptId; @@ -54,6 +73,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { super.setUp(); interceptor = new TestableFederationInterceptor(); + stateStore = new MemoryFederationStateStore(); + stateStore.init(getConf()); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, + getConf()); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), @@ -82,11 +106,238 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + TestableFederationInterceptor.class.getName()); + conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, + UniformBroadcastPolicyManager.class.getName()); + conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID); + // Disable StateStoreFacade cache + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + return conf; } + private void registerSubCluster(SubClusterId subClusterId) + throws YarnException { + stateStore + .registerSubCluster(SubClusterRegisterRequest.newInstance(SubClusterInfo + .newInstance(subClusterId, "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", + "1.2.3.4:4", SubClusterState.SC_RUNNING, 0, "capacity"))); + } + + private void deRegisterSubCluster(SubClusterId subClusterId) + throws YarnException { + stateStore.deregisterSubCluster(SubClusterDeregisterRequest + .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED)); + } + + private List<Container> getContainersAndAssert(int numberOfResourceRequests, + int numberOfAllocationExcepted) throws Exception { + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List<Container> containers = + new ArrayList<Container>(numberOfResourceRequests); + List<ResourceRequest> askList = + new ArrayList<ResourceRequest>(numberOfResourceRequests); + for (int id = 0; id < numberOfResourceRequests; id++) { + askList.add(createResourceRequest("test-node-" + Integer.toString(id), + 6000, 2, id % 5, 1)); + } + + allocateRequest.setAskList(askList); + + AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); + Assert.assertNotNull("allocate() returned null response", allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + LOG.info("Number of allocated containers in the original request: " + + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containers.size() < numberOfAllocationExcepted + && numHeartbeat++ < 10) { + allocateResponse = + interceptor.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull("allocate() returned null response", + allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + LOG.info("Number of allocated containers in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers().size())); + LOG.info("Total number of allocated containers: " + + Integer.toString(containers.size())); + Thread.sleep(10); + } + Assert.assertEquals(numberOfAllocationExcepted, containers.size()); + return containers; + } + + private void releaseContainersAndAssert(List<Container> containers) + throws Exception { + Assert.assertTrue(containers.size() > 0); + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List<ContainerId> relList = new ArrayList<ContainerId>(containers.size()); + for (Container container : containers) { + relList.add(container.getId()); + } + + allocateRequest.setReleaseList(relList); + + AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); + Assert.assertNotNull(allocateResponse); + + // The way the mock resource manager is setup, it will return the containers + // that were released in the allocated containers. The release request will + // be split and handled by the corresponding UAM. The release containers + // returned by the mock resource managers will be aggregated and returned + // back to us and we can check if total request size and returned size are + // the same + List<Container> containersForReleasedContainerIds = + new ArrayList<Container>(); + containersForReleasedContainerIds + .addAll(allocateResponse.getAllocatedContainers()); + LOG.info("Number of containers received in the original request: " + + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containersForReleasedContainerIds.size() < relList.size() + && numHeartbeat++ < 10) { + allocateResponse = + interceptor.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(allocateResponse); + containersForReleasedContainerIds + .addAll(allocateResponse.getAllocatedContainers()); + + LOG.info("Number of containers received in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers().size())); + LOG.info("Total number of containers received: " + + Integer.toString(containersForReleasedContainerIds.size())); + Thread.sleep(10); + } + + Assert.assertEquals(relList.size(), + containersForReleasedContainerIds.size()); + } + + @Test + public void testMultipleSubClusters() throws Exception { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 and sc2 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-2")); + + int numberOfContainers = 3; + List<Container> containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the second batch of containers, with sc1 and sc3 active + deRegisterSubCluster(SubClusterId.newInstance("SC-2")); + registerSubCluster(SubClusterId.newInstance("SC-3")); + + numberOfContainers = 1; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the third batch of containers with only in home sub-cluster + // active + deRegisterSubCluster(SubClusterId.newInstance("SC-1")); + deRegisterSubCluster(SubClusterId.newInstance("SC-3")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + numberOfContainers = 2; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 1)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + /* + * Test re-register when RM fails over. + */ + @Test + public void testReregister() throws Exception { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + interceptor.setShouldReRegisterNext(); + + int numberOfContainers = 3; + List<Container> containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + interceptor.setShouldReRegisterNext(); + + // Release all containers + releaseContainersAndAssert(containers); + + interceptor.setShouldReRegisterNext(); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + @Test public void testRequestInterceptorChainCreation() throws Exception { RequestInterceptor root = http://git-wip-us.apache.org/repos/asf/hadoop/blob/b71ac1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 0ca7488..d4b8735 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -45,6 +45,12 @@ public class TestableFederationInterceptor extends FederationInterceptor { private MockResourceManagerFacade mockRm; @Override + protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( + ExecutorService threadPool) { + return new TestableUnmanagedAMPoolManager(threadPool); + } + + @Override protected ApplicationMasterProtocol createHomeRMProxy( AMRMProxyApplicationContext appContext) { synchronized (this) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org