YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler when activating applications. Contributed by Craig Welch (cherry picked from commit c53420f58364b11fbda1dace7679d45534533382)
(cherry picked from commit 4931600030e13d9332d9a0e588487cb8684c667d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1b35ffd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1b35ffd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1b35ffd Branch: refs/heads/branch-2.6 Commit: f1b35ffd4ca680d76bf82b541357b8e5748f129e Parents: 3bd9b74 Author: Jian He <jia...@apache.org> Authored: Tue Jan 13 17:32:07 2015 -0800 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Thu Sep 3 17:40:24 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 1 + .../resourcemanager/RMActiveServiceContext.java | 4 +- .../server/resourcemanager/RMContextImpl.java | 32 ++- .../server/resourcemanager/rmapp/RMApp.java | 3 + .../server/resourcemanager/rmapp/RMAppImpl.java | 5 + .../scheduler/capacity/CSQueueUtils.java | 24 -- .../scheduler/capacity/LeafQueue.java | 252 ++++++++++++------ .../scheduler/common/fica/FiCaSchedulerApp.java | 15 ++ .../webapp/CapacitySchedulerPage.java | 4 +- .../dao/CapacitySchedulerLeafQueueInfo.java | 24 +- .../applicationsmanager/MockAsm.java | 7 + .../TestAMRMRPCNodeUpdates.java | 9 + .../TestCapacitySchedulerPlanFollower.java | 1 + .../server/resourcemanager/rmapp/MockRMApp.java | 7 + .../capacity/TestApplicationLimits.java | 256 +++++++++++++------ .../scheduler/capacity/TestLeafQueue.java | 79 +++--- .../scheduler/capacity/TestReservations.java | 34 +-- .../scheduler/fifo/TestFifoScheduler.java | 23 +- .../webapp/TestRMWebServicesCapacitySched.java | 12 - 20 files changed, 519 insertions(+), 276 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 60ae3d0..a5b270e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -165,6 +165,9 @@ Release 2.6.1 - UNRELEASED YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when Node is connected/disconnected (Bibin A Chundatt via jlowe) + YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler + when activating applications. (Craig Welch via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 45d7294..971acea 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -173,6 +173,7 @@ <Field name="userLimit" /> <Field name="userLimitFactor" /> <Field name="maxAMResourcePerQueuePercent" /> + <Field name="lastClusterResource" /> </Or> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 3bc2e9b..03fc40e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -117,7 +117,8 @@ public class RMActiveServiceContext { RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, - RMApplicationHistoryWriter rmApplicationHistoryWriter) { + RMApplicationHistoryWriter rmApplicationHistoryWriter, + ResourceScheduler scheduler) { this(); this.setContainerAllocationExpirer(containerAllocationExpirer); this.setAMLivelinessMonitor(amLivelinessMonitor); @@ -128,6 +129,7 @@ public class RMActiveServiceContext { this.setNMTokenSecretManager(nmTokenSecretManager); this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + this.setScheduler(scheduler); RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 55d7667..32216e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -84,18 +84,46 @@ public class RMContextImpl implements RMContext { RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, - RMApplicationHistoryWriter rmApplicationHistoryWriter) { + RMApplicationHistoryWriter rmApplicationHistoryWriter, + ResourceScheduler scheduler) { this(); this.setDispatcher(rmDispatcher); setActiveServiceContext(new RMActiveServiceContext(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager, rmApplicationHistoryWriter)); + clientToAMTokenSecretManager, rmApplicationHistoryWriter, + scheduler)); ConfigurationProvider provider = new LocalConfigurationProvider(); setConfigurationProvider(provider); } + + @VisibleForTesting + // helper constructor for tests + public RMContextImpl(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer delegationTokenRenewer, + AMRMTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + NMTokenSecretManagerInRM nmTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this( + rmDispatcher, + containerAllocationExpirer, + amLivelinessMonitor, + amFinishingMonitor, + delegationTokenRenewer, + appTokenSecretManager, + containerTokenSecretManager, + nmTokenSecretManager, + clientToAMTokenSecretManager, + rmApplicationHistoryWriter, + null); + } @Override public Dispatcher getDispatcher() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 624aa18..fbcaab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -239,4 +240,6 @@ public interface RMApp extends EventHandler<RMAppEvent> { RMAppMetrics getRMAppMetrics(); ReservationId getReservationId(); + + ResourceRequest getAMResourceRequest(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 751dbe4..19f2193 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1337,6 +1337,11 @@ public class RMAppImpl implements RMApp, Recoverable { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } protected Credentials parseCredentials() throws IOException { Credentials credentials = new Credentials(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 0a2fa3a..f458057 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -109,30 +109,6 @@ class CSQueueUtils { } return absoluteMaxCapacityByNodeLabels; } - - public static int computeMaxActiveApplications( - ResourceCalculator calculator, - Resource clusterResource, Resource minimumAllocation, - float maxAMResourcePercent, float absoluteMaxCapacity) { - return - Math.max( - (int)Math.ceil( - Resources.ratio( - calculator, - clusterResource, - minimumAllocation) * - maxAMResourcePercent * absoluteMaxCapacity - ), - 1); - } - - public static int computeMaxActiveApplicationsPerUser( - int maxActiveApplications, int userLimit, float userLimitFactor) { - return Math.max( - (int)Math.ceil( - maxActiveApplications * (userLimit / 100.0f) * userLimitFactor), - 1); - } @Lock(CSQueue.class) public static void updateQueueStatistics( http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6ffc61a..60b5a59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -87,9 +87,6 @@ public class LeafQueue extends AbstractCSQueue { protected int maxApplicationsPerUser; private float maxAMResourcePerQueuePercent; - private int maxActiveApplications; // Based on absolute max capacity - private int maxActiveAppsUsingAbsCap; // Based on absolute capacity - private int maxActiveApplicationsPerUser; private int nodeLocalityDelay; @@ -113,9 +110,16 @@ public class LeafQueue extends AbstractCSQueue { // cache last cluster resource to compute actual capacity private Resource lastClusterResource = Resources.none(); + // absolute capacity as a resource (based on cluster resource) + private Resource absoluteCapacityResource = Resources.none(); + private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); private volatile float absoluteMaxAvailCapacity; + + // sum of resources used by application masters for applications + // running in this queue + private final Resource usedAMResources = Resource.newInstance(0, 0); public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -155,19 +159,6 @@ public class LeafQueue extends AbstractCSQueue { float maxAMResourcePerQueuePercent = cs.getConfiguration() .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); - int maxActiveApplications = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - cs.getClusterResource(), this.minimumAllocation, - maxAMResourcePerQueuePercent, absoluteMaxCapacity); - this.maxActiveAppsUsingAbsCap = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - cs.getClusterResource(), this.minimumAllocation, - maxAMResourcePerQueuePercent, absoluteCapacity); - int maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser( - maxActiveAppsUsingAbsCap, userLimit, userLimitFactor); this.queueInfo.setChildQueues(new ArrayList<QueueInfo>()); @@ -179,8 +170,7 @@ public class LeafQueue extends AbstractCSQueue { setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs - .getConfiguration().getNodeLocalityDelay(), accessibleLabels, + state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels, defaultLabelExpression, this.capacitiyByNodeLabels, this.maxCapacityByNodeLabels, cs.getConfiguration().getReservationContinueLook()); @@ -208,8 +198,7 @@ public class LeafQueue extends AbstractCSQueue { float maximumCapacity, float absoluteMaxCapacity, int userLimit, float userLimitFactor, int maxApplications, float maxAMResourcePerQueuePercent, - int maxApplicationsPerUser, int maxActiveApplications, - int maxActiveApplicationsPerUser, QueueState state, + int maxApplicationsPerUser, QueueState state, Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay, Set<String> labels, String defaultLabelExpression, Map<String, Float> capacitieByLabel, @@ -224,6 +213,16 @@ public class LeafQueue extends AbstractCSQueue { float absCapacity = getParent().getAbsoluteCapacity() * capacity; CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity, absoluteMaxCapacity); + + this.lastClusterResource = clusterResource; + updateAbsoluteCapacityResource(clusterResource); + + // Initialize headroom info, also used for calculating application + // master resource limits. Since this happens during queue initialization + // and all queues may not be realized yet, we'll use (optimistic) + // absoluteMaxCapacity (it will be replaced with the more accurate + // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) + updateHeadroomInfo(clusterResource, absoluteMaxCapacity); this.absoluteCapacity = absCapacity; @@ -234,9 +233,6 @@ public class LeafQueue extends AbstractCSQueue { this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; this.maxApplicationsPerUser = maxApplicationsPerUser; - this.maxActiveApplications = maxActiveApplications; - this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; - if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, this.defaultLabelExpression)) { throw new IOException("Invalid default label expression of " @@ -288,21 +284,6 @@ public class LeafQueue extends AbstractCSQueue { "maxApplicationsPerUser = " + maxApplicationsPerUser + " [= (int)(maxApplications * (userLimit / 100.0f) * " + "userLimitFactor) ]" + "\n" + - "maxActiveApplications = " + maxActiveApplications + - " [= max(" + - "(int)ceil((clusterResourceMemory / minimumAllocation) * " + - "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," + - "1) ]" + "\n" + - "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap + - " [= max(" + - "(int)ceil((clusterResourceMemory / minimumAllocation) *" + - "maxAMResourcePercent * absoluteCapacity)," + - "1) ]" + "\n" + - "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser + - " [= max(" + - "(int)(maxActiveApplications * (userLimit / 100.0f) * " + - "userLimitFactor)," + - "1) ]" + "\n" + "usedCapacity = " + usedCapacity + " [= usedResourcesMemory / " + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + @@ -355,14 +336,6 @@ public class LeafQueue extends AbstractCSQueue { return maxApplicationsPerUser; } - public synchronized int getMaximumActiveApplications() { - return maxActiveApplications; - } - - public synchronized int getMaximumActiveApplicationsPerUser() { - return maxActiveApplicationsPerUser; - } - @Override public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; @@ -525,8 +498,6 @@ public class LeafQueue extends AbstractCSQueue { newlyParsedLeafQueue.maxApplications, newlyParsedLeafQueue.maxAMResourcePerQueuePercent, newlyParsedLeafQueue.getMaxApplicationsPerUser(), - newlyParsedLeafQueue.getMaximumActiveApplications(), - newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, newlyParsedLeafQueue.getNodeLocalityDelay(), newlyParsedLeafQueue.accessibleLabels, @@ -612,27 +583,115 @@ public class LeafQueue extends AbstractCSQueue { } } + + public synchronized Resource getAMResourceLimit() { + /* + * The limit to the amount of resources which can be consumed by + * application masters for applications running in the queue + * is calculated by taking the greater of the max resources currently + * available to the queue (see absoluteMaxAvailCapacity) and the absolute + * resources guaranteed for the queue and multiplying it by the am + * resource percent. + * + * This is to allow a queue to grow its (proportional) application + * master resource use up to its max capacity when other queues are + * idle but to scale back down to it's guaranteed capacity as they + * become busy. + * + */ + Resource queueMaxCap; + synchronized (queueHeadroomInfo) { + queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); + } + Resource queueCap = Resources.max(resourceCalculator, lastClusterResource, + absoluteCapacityResource, queueMaxCap); + return Resources.multiplyAndNormalizeUp( + resourceCalculator, + queueCap, + maxAMResourcePerQueuePercent, minimumAllocation); + } + + public synchronized Resource getUserAMResourceLimit() { + /* + * The user amresource limit is based on the same approach as the + * user limit (as it should represent a subset of that). This means that + * it uses the absolute queue capacity instead of the max and is modified + * by the userlimit and the userlimit factor as is the userlimit + * + */ + float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f / + Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + + return Resources.multiplyAndNormalizeUp( + resourceCalculator, + absoluteCapacityResource, + maxAMResourcePerQueuePercent * effectiveUserLimit * + userLimitFactor, minimumAllocation); + } private synchronized void activateApplications() { + //limit of allowed resource usage for application masters + Resource amLimit = getAMResourceLimit(); + Resource userAMLimit = getUserAMResourceLimit(); + for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); i.hasNext(); ) { FiCaSchedulerApp application = i.next(); - // Check queue limit - if (getNumActiveApplications() >= getMaximumActiveApplications()) { - break; + // Check am resource limit + Resource amIfStarted = + Resources.add(application.getAMResource(), usedAMResources); + + if (LOG.isDebugEnabled()) { + LOG.debug("application AMResource " + application.getAMResource() + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + + " lastClusterResource " + lastClusterResource + + " amIfStarted " + amIfStarted); + } + + if (!Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds amLimit"); + continue; + } } - // Check user limit + // Check user am resource limit + User user = getUser(application.getUser()); - if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { - user.activateApplication(); - activeApplications.add(application); - i.remove(); - LOG.info("Application " + application.getApplicationId() + - " from user: " + application.getUser() + - " activated in queue: " + getQueueName()); + + Resource userAmIfStarted = + Resources.add(application.getAMResource(), + user.getConsumedAMResources()); + + if (!Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, userAmIfStarted, + userAMLimit)) { + if (getNumActiveApplications() < 1) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set too low." + + " skipping enforcement to allow at least one application to start"); + } else { + LOG.info("not starting application as amIfStarted exceeds " + + "userAmLimit"); + continue; + } } + user.activateApplication(); + activeApplications.add(application); + Resources.addTo(usedAMResources, application.getAMResource()); + Resources.addTo(user.getConsumedAMResources(), + application.getAMResource()); + i.remove(); + LOG.info("Application " + application.getApplicationId() + + " from user: " + application.getUser() + + " activated in queue: " + getQueueName()); } } @@ -678,6 +737,10 @@ public class LeafQueue extends AbstractCSQueue { boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); + } else { + Resources.subtractFrom(usedAMResources, application.getAMResource()); + Resources.subtractFrom(user.getConsumedAMResources(), + application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -1015,6 +1078,25 @@ public class LeafQueue extends AbstractCSQueue { return canAssign; } + + private Resource updateHeadroomInfo(Resource clusterResource, + float absoluteMaxAvailCapacity) { + + Resource queueMaxCap = + Resources.multiplyAndNormalizeDown( + resourceCalculator, + clusterResource, + absoluteMaxAvailCapacity, + minimumAllocation); + + synchronized (queueHeadroomInfo) { + queueHeadroomInfo.setQueueMaxCap(queueMaxCap); + queueHeadroomInfo.setClusterResource(clusterResource); + } + + return queueMaxCap; + + } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, @@ -1027,18 +1109,9 @@ public class LeafQueue extends AbstractCSQueue { Resource userLimit = computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - - Resource queueMaxCap = // Queue Max-Capacity - Resources.multiplyAndNormalizeDown( - resourceCalculator, - clusterResource, - absoluteMaxAvailCapacity, - minimumAllocation); - - synchronized (queueHeadroomInfo) { - queueHeadroomInfo.setQueueMaxCap(queueMaxCap); - queueHeadroomInfo.setClusterResource(clusterResource); - } + + Resource queueMaxCap = + updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); @@ -1734,25 +1807,25 @@ public class LeafQueue extends AbstractCSQueue { " used=" + usedResources + " numContainers=" + numContainers + " user=" + userName + " user-resources=" + user.getTotalConsumedResources()); } + + private void updateAbsoluteCapacityResource(Resource clusterResource) { + + absoluteCapacityResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + clusterResource, + absoluteCapacity, minimumAllocation); + + } @Override public synchronized void updateClusterResource(Resource clusterResource) { lastClusterResource = clusterResource; + updateAbsoluteCapacityResource(clusterResource); - // Update queue properties - maxActiveApplications = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - clusterResource, minimumAllocation, - maxAMResourcePerQueuePercent, absoluteMaxCapacity); - maxActiveAppsUsingAbsCap = - CSQueueUtils.computeMaxActiveApplications( - resourceCalculator, - clusterResource, minimumAllocation, - maxAMResourcePerQueuePercent, absoluteCapacity); - maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser( - maxActiveAppsUsingAbsCap, userLimit, userLimitFactor); + // Update headroom info based on new cluster resource value + // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity + // during allocation + updateHeadroomInfo(clusterResource, absoluteMaxCapacity); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -1775,6 +1848,7 @@ public class LeafQueue extends AbstractCSQueue { @VisibleForTesting public static class User { Resource consumed = Resources.createResource(0, 0); + Resource consumedAMResources = Resources.createResource(0, 0); Map<String, Resource> consumedByLabel = new HashMap<String, Resource>(); int pendingApplications = 0; int activeApplications = 0; @@ -1798,6 +1872,10 @@ public class LeafQueue extends AbstractCSQueue { public int getActiveApplications() { return activeApplications; } + + public Resource getConsumedAMResources() { + return consumedAMResources; + } public int getTotalApplications() { return getPendingApplications() + getActiveApplications(); @@ -1943,6 +2021,10 @@ public class LeafQueue extends AbstractCSQueue { @Override public float getAbsActualCapacity() { + //? Is this actually used by anything at present? + // There is a findbugs warning -re lastClusterResource (now excluded), + // when this is used, verify that the access is mt correct and remove + // the findbugs exclusion if possible if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, lastClusterResource, Resources.none())) { return absoluteCapacity; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2f9569c..9f97b13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -72,6 +73,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); + + Resource amResource; + if (rmApp == null || rmApp.getAMResourceRequest() == null) { + //the rmApp may be undefined (the resource manager checks for this too) + //and unmanaged applications do not provide an amResource request + //in these cases, provide a default using the scheduler + amResource = rmContext.getScheduler().getMinimumResourceCapability(); + } else { + amResource = rmApp.getAMResourceRequest().getCapability(); + } + + setAMResource(amResource); } synchronized public boolean containerCompleted(RMContainer rmContainer, http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index ffead48..0be361a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -115,8 +115,8 @@ class CapacitySchedulerPage extends RmView { _("Num Containers:", Integer.toString(lqinfo.getNumContainers())). _("Max Applications:", Integer.toString(lqinfo.getMaxApplications())). _("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())). - _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())). - _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())). + _("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()). + _("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()). _("Configured Capacity:", percent(lqinfo.getCapacity() / 100)). _("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)). _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index d90e963..bb4c749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -32,11 +32,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { protected int numContainers; protected int maxApplications; protected int maxApplicationsPerUser; - protected int maxActiveApplications; - protected int maxActiveApplicationsPerUser; protected int userLimit; protected UsersInfo users; // To add another level in the XML protected float userLimitFactor; + protected ResourceInfo aMResourceLimit; + protected ResourceInfo userAMResourceLimit; CapacitySchedulerLeafQueueInfo() { }; @@ -48,11 +48,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { numContainers = q.getNumContainers(); maxApplications = q.getMaxApplications(); maxApplicationsPerUser = q.getMaxApplicationsPerUser(); - maxActiveApplications = q.getMaximumActiveApplications(); - maxActiveApplicationsPerUser = q.getMaximumActiveApplicationsPerUser(); userLimit = q.getUserLimit(); users = new UsersInfo(q.getUsers()); userLimitFactor = q.getUserLimitFactor(); + aMResourceLimit = new ResourceInfo(q.getAMResourceLimit()); + userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit()); } public int getNumActiveApplications() { @@ -75,14 +75,6 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { return maxApplicationsPerUser; } - public int getMaxActiveApplications() { - return maxActiveApplications; - } - - public int getMaxActiveApplicationsPerUser() { - return maxActiveApplicationsPerUser; - } - public int getUserLimit() { return userLimit; } @@ -95,4 +87,12 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { public float getUserLimitFactor() { return userLimitFactor; } + + public ResourceInfo getAMResourceLimit() { + return aMResourceLimit; + } + + public ResourceInfo getUserAMResourceLimit() { + return userAMResourceLimit; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 62e3e5c..f8d92aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,6 +52,7 @@ import com.google.common.collect.Lists; public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { + ResourceRequest amReq; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -183,6 +185,11 @@ public abstract class MockAsm extends MockApps { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } public static RMApp newApplication(int i) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index e93d351..f4cb3b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -23,6 +23,7 @@ import java.util.List; import org.junit.Assert; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +57,13 @@ public class TestAMRMRPCNodeUpdates { dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override + public void init(Configuration conf) { + conf.set( + CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + "1.0"); + super.init(conf); + } + @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler) { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index 4eedd42..f2840b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -95,6 +95,7 @@ public class TestCapacitySchedulerPlanFollower { .thenReturn(null); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); + when(spyRMContext.getScheduler()).thenReturn(scheduler); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 787b5d7..ec990f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,6 +56,7 @@ public class MockRMApp implements RMApp { StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; + ResourceRequest amReq; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -264,4 +266,9 @@ public class MockRMApp implements RMApp { public ReservationId getReservationId() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ResourceRequest getAMResourceRequest() { + return this.amReq; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0cd74d0..81a5aad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -28,16 +28,21 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.mockito.Matchers; +import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -47,8 +52,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -56,6 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Ignore; public class TestApplicationLimits { @@ -119,8 +127,6 @@ public class TestApplicationLimits { // Some default values doReturn(100).when(queue).getMaxApplications(); doReturn(25).when(queue).getMaxApplicationsPerUser(); - doReturn(10).when(queue).getMaximumActiveApplications(); - doReturn(2).when(queue).getMaximumActiveApplicationsPerUser(); } private static final String A = "a"; @@ -136,10 +142,14 @@ public class TestApplicationLimits { final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; conf.setCapacity(Q_B, 90); + conf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50); + conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + "." + A, 5.0f); + LOG.info("Setup top-level queues a and b"); } - private FiCaSchedulerApp getMockApplication(int appId, String user) { + private FiCaSchedulerApp getMockApplication(int appId, String user, + Resource amResource) { FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); @@ -147,10 +157,90 @@ public class TestApplicationLimits { when(application).getApplicationId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); return application; } @Test + public void testAMResourceLimit() throws Exception { + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // This uses the default 10% of cluster value for the max am resources + // which are allowed, at 80GB = 8GB for AM's at the queue level. The user + // am limit is 4G initially (based on the queue absolute capacity) + // when there is only 1 user, and drops to 2G (the userlimit) when there + // is a second user + queue.updateClusterResource(Resource.newInstance(80 * GB, 40)); + + ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class); + when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); + + assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit()); + assertEquals(Resource.newInstance(4 * GB, 1), + queue.getUserAMResourceLimit()); + + // Two apps for user_0, both start + int APPLICATION_ID = 0; + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_0, user_0); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + when(activeUsersManager.getNumActiveUsers()).thenReturn(1); + + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_1, user_0); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // AMLimits unchanged + assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit()); + assertEquals(Resource.newInstance(4 * GB, 1), + queue.getUserAMResourceLimit()); + + // One app for user_1, starts + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_2, user_1); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + + when(activeUsersManager.getNumActiveUsers()).thenReturn(2); + + // Now userAMResourceLimit drops to the queue configured 50% as there is + // another user active + assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit()); + assertEquals(Resource.newInstance(2 * GB, 1), + queue.getUserAMResourceLimit()); + + // Second user_1 app cannot start + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_1, + Resource.newInstance(2 * GB, 1)); + queue.submitApplicationAttempt(app_3, user_1); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(1, queue.getNumPendingApplications(user_1)); + + // Now finish app so another should be activated + queue.finishApplicationAttempt(app_2, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + + } + + @Test public void testLimitsComputation() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); @@ -172,7 +262,8 @@ public class TestApplicationLimits { when(csContext.getRMContext()).thenReturn(rmContext); // Say cluster has 100 nodes of 16G each - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); + Resource clusterResource = + Resources.createResource(100 * 16 * GB, 100 * 16); when(csContext.getClusterResource()).thenReturn(clusterResource); Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); @@ -183,28 +274,14 @@ public class TestApplicationLimits { LeafQueue queue = (LeafQueue)queues.get(A); LOG.info("Queue 'A' -" + - " maxActiveApplications=" + queue.getMaximumActiveApplications() + - " maxActiveApplicationsPerUser=" + - queue.getMaximumActiveApplicationsPerUser()); - int expectedMaxActiveApps = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf. - getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) * - queue.getAbsoluteMaximumCapacity())); - assertEquals(expectedMaxActiveApps, - queue.getMaximumActiveApplications()); - int expectedMaxActiveAppsUsingAbsCap = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf.getMaximumApplicationMasterResourcePercent() * - queue.getAbsoluteCapacity())); - assertEquals( - (int)Math.ceil( - expectedMaxActiveAppsUsingAbsCap * (queue.getUserLimit() / 100.0f) * - queue.getUserLimitFactor()), - queue.getMaximumActiveApplicationsPerUser()); + " aMResourceLimit=" + queue.getAMResourceLimit() + + " UserAMResourceLimit=" + + queue.getUserAMResourceLimit()); + + assertEquals(queue.getAMResourceLimit(), Resource.newInstance(160*GB, 1)); + assertEquals(queue.getUserAMResourceLimit(), + Resource.newInstance(80*GB, 1)); + assertEquals( (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()), queue.getMetrics().getAvailableMB() @@ -213,24 +290,11 @@ public class TestApplicationLimits { // Add some nodes to the cluster & test new limits clusterResource = Resources.createResource(120 * 16 * GB); root.updateClusterResource(clusterResource); - expectedMaxActiveApps = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf. - getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) * - queue.getAbsoluteMaximumCapacity())); - assertEquals(expectedMaxActiveApps, - queue.getMaximumActiveApplications()); - expectedMaxActiveAppsUsingAbsCap = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf.getMaximumApplicationMasterResourcePercent() * - queue.getAbsoluteCapacity())); - assertEquals( - (int)Math.ceil(expectedMaxActiveAppsUsingAbsCap * - (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()), - queue.getMaximumActiveApplicationsPerUser()); + + assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1)); + assertEquals(queue.getUserAMResourceLimit(), + Resource.newInstance(96*GB, 1)); + assertEquals( (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()), queue.getMetrics().getAvailableMB() @@ -271,18 +335,15 @@ public class TestApplicationLimits { clusterResource = Resources.createResource(100 * 16 * GB); queue = (LeafQueue)queues.get(A); - expectedMaxActiveApps = - Math.max(1, - (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * - csConf. - getMaximumApplicationMasterResourcePerQueuePercent( - queue.getQueuePath()) * - queue.getAbsoluteMaximumCapacity())); assertEquals((long) 0.5, - (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath())); - assertEquals(expectedMaxActiveApps, - queue.getMaximumActiveApplications()); + (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent( + queue.getQueuePath()) + ); + + assertEquals(queue.getAMResourceLimit(), Resource.newInstance(800*GB, 1)); + assertEquals(queue.getUserAMResourceLimit(), + Resource.newInstance(400*GB, 1)); // Change the per-queue max applications. csConf.setInt( @@ -308,10 +369,16 @@ public class TestApplicationLimits { public void testActiveApplicationLimits() throws Exception { final String user_0 = "user_0"; final String user_1 = "user_1"; + final String user_2 = "user_2"; + + assertEquals(Resource.newInstance(16 * GB, 1), queue.getAMResourceLimit()); + assertEquals(Resource.newInstance(8 * GB, 1), + queue.getUserAMResourceLimit()); int APPLICATION_ID = 0; // Submit first application - FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_0, user_0); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -319,15 +386,17 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit second application - FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_1, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - // Submit third application, should remain pending - FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + // Submit third application, should remain pending due to user amlimit + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_2, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -342,18 +411,17 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit another one for user_0 - FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_3, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); - // Change queue limit to be smaller so 2 users can fill it up - doReturn(3).when(queue).getMaximumActiveApplications(); - // Submit first app for user_1 - FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1, + Resources.createResource(8 * GB, 0)); queue.submitApplicationAttempt(app_4, user_1); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -362,15 +430,17 @@ public class TestApplicationLimits { assertEquals(1, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); - // Submit second app for user_1, should block due to queue-limit - FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); - queue.submitApplicationAttempt(app_5, user_1); + // Submit first app for user_2, should block due to queue amlimit + FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_2, + Resources.createResource(8 * GB, 0)); + queue.submitApplicationAttempt(app_5, user_2); assertEquals(3, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); assertEquals(1, queue.getNumActiveApplications(user_1)); - assertEquals(1, queue.getNumPendingApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + assertEquals(1, queue.getNumPendingApplications(user_2)); // Now finish one app of user_1 so app_5 should be activated queue.finishApplicationAttempt(app_4, A); @@ -378,21 +448,22 @@ public class TestApplicationLimits { assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); - assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumActiveApplications(user_1)); assertEquals(0, queue.getNumPendingApplications(user_1)); + assertEquals(1, queue.getNumActiveApplications(user_2)); + assertEquals(0, queue.getNumPendingApplications(user_2)); + } - + @Test public void testActiveLimitsWithKilledApps() throws Exception { final String user_0 = "user_0"; int APPLICATION_ID = 0; - // set max active to 2 - doReturn(2).when(queue).getMaximumActiveApplications(); - // Submit first application - FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_0, user_0); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -401,7 +472,8 @@ public class TestApplicationLimits { assertTrue(queue.activeApplications.contains(app_0)); // Submit second application - FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_1, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -410,7 +482,8 @@ public class TestApplicationLimits { assertTrue(queue.activeApplications.contains(app_1)); // Submit third application, should remain pending - FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_2, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -419,7 +492,8 @@ public class TestApplicationLimits { assertTrue(queue.pendingApplications.contains(app_2)); // Submit fourth application, should remain pending - FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0, + Resources.createResource(4 * GB, 0)); queue.submitApplicationAttempt(app_3, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -506,6 +580,18 @@ public class TestApplicationLimits { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); RMContext rmContext = TestUtils.getMockRMContext(); + RMContext spyRMContext = spy(rmContext); + + ConcurrentMap<ApplicationId, RMApp> spyApps = + spy(new ConcurrentHashMap<ApplicationId, RMApp>()); + RMApp rmApp = mock(RMApp.class); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + Priority priority_1 = TestUtils.createMockPriority(1); @@ -513,9 +599,9 @@ public class TestApplicationLimits { // and check headroom final ApplicationAttemptId appAttemptId_0_0 = TestUtils.getMockApplicationAttemptId(0, 0); - FiCaSchedulerApp app_0_0 = - spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp( + appAttemptId_0_0, user_0, queue, + queue.getActiveUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_0_0, user_0); List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>(); @@ -532,9 +618,9 @@ public class TestApplicationLimits { // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = TestUtils.getMockApplicationAttemptId(1, 0); - FiCaSchedulerApp app_0_1 = - spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, - queue.getActiveUsersManager(), rmContext)); + FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp( + appAttemptId_0_1, user_0, queue, + queue.getActiveUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_0_1, user_0); List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>(); @@ -551,9 +637,9 @@ public class TestApplicationLimits { // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = TestUtils.getMockApplicationAttemptId(2, 0); - FiCaSchedulerApp app_1_0 = - spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, - queue.getActiveUsersManager(), rmContext)); + FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp( + appAttemptId_1_0, user_1, queue, + queue.getActiveUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_1_0, user_1); List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index fb7bb2c..ead5719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -101,6 +101,7 @@ public class TestLeafQueue { RMContext rmContext; RMContext spyRMContext; + ResourceRequest amResourceRequest; CapacityScheduler cs; CapacitySchedulerConfiguration csConf; CapacitySchedulerContext csContext; @@ -124,6 +125,10 @@ public class TestLeafQueue { spy(new ConcurrentHashMap<ApplicationId, RMApp>()); RMApp rmApp = mock(RMApp.class); when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null); + amResourceRequest = mock(ResourceRequest.class); + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(0, 0)); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); @@ -265,26 +270,37 @@ public class TestLeafQueue { @Test public void testInitializeQueue() throws Exception { - final float epsilon = 1e-5f; - //can add more sturdy test with 3-layer queues - //once MAPREDUCE:3410 is resolved - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - assertEquals(0.085, a.getCapacity(), epsilon); - assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); - assertEquals(0.2, a.getMaximumCapacity(), epsilon); - assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); + final float epsilon = 1e-5f; + //can add more sturdy test with 3-layer queues + //once MAPREDUCE:3410 is resolved + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + assertEquals(0.085, a.getCapacity(), epsilon); + assertEquals(0.085, a.getAbsoluteCapacity(), epsilon); + assertEquals(0.2, a.getMaximumCapacity(), epsilon); + assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); + + LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); + assertEquals(0.80, b.getCapacity(), epsilon); + assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); + assertEquals(0.99, b.getMaximumCapacity(), epsilon); + assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); + + ParentQueue c = (ParentQueue)queues.get(C); + assertEquals(0.015, c.getCapacity(), epsilon); + assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); + assertEquals(0.1, c.getMaximumCapacity(), epsilon); + assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + + //Verify the value for getAMResourceLimit for queues with < .1 maxcap + Resource clusterResource = Resource.newInstance(50 * GB, 50); - LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); - assertEquals(0.80, b.getCapacity(), epsilon); - assertEquals(0.80, b.getAbsoluteCapacity(), epsilon); - assertEquals(0.99, b.getMaximumCapacity(), epsilon); - assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); - - ParentQueue c = (ParentQueue)queues.get(C); - assertEquals(0.015, c.getCapacity(), epsilon); - assertEquals(0.015, c.getAbsoluteCapacity(), epsilon); - assertEquals(0.1, c.getMaximumCapacity(), epsilon); - assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon); + a.updateClusterResource(clusterResource); + assertEquals(Resource.newInstance(1 * GB, 1), + a.getAMResourceLimit()); + + b.updateClusterResource(clusterResource); + assertEquals(Resource.newInstance(5 * GB, 1), + b.getAMResourceLimit()); } @Test @@ -679,7 +695,7 @@ public class TestLeafQueue { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); qb.submitApplicationAttempt(app_0, user_0); Priority u0Priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -702,7 +718,7 @@ public class TestLeafQueue { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); Priority u1Priority = TestUtils.createMockPriority(2); app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, @@ -736,12 +752,12 @@ public class TestLeafQueue { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_1, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); app_1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, u0Priority, recordFactory))); @@ -764,7 +780,7 @@ public class TestLeafQueue { TestUtils.getMockApplicationAttemptId(4, 0); FiCaSchedulerApp app_4 = new FiCaSchedulerApp(appAttemptId_4, user_0, qb, - qb.getActiveUsersManager(), rmContext); + qb.getActiveUsersManager(), spyRMContext); qb.submitApplicationAttempt(app_4, user_0); app_4.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, @@ -980,7 +996,6 @@ public class TestLeafQueue { assertEquals(0*GB, app_1.getHeadroom().getMemory()); // Check headroom for app_2 - LOG.info("here"); app_1.updateResourceRequests(Collections.singletonList( // unset TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); @@ -1904,6 +1919,9 @@ public class TestLeafQueue { // Users final String user_e = "user_e"; + + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(1 * GB, 0)); // Submit applications final ApplicationAttemptId appAttemptId_0 = @@ -1942,7 +1960,7 @@ public class TestLeafQueue { newQueues, queues, TestUtils.spyHook); queues = newQueues; - root.reinitialize(newRoot, cs.getClusterResource()); + root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization assertEquals(3, e.activeApplications.size()); @@ -1982,6 +2000,9 @@ public class TestLeafQueue { // Users final String user_e = "user_e"; + + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(1 * GB, 0)); // Submit applications final ApplicationAttemptId appAttemptId_0 = @@ -2291,20 +2312,20 @@ public class TestLeafQueue { csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80); LeafQueue a = new LeafQueue(csContext, A, root, null); assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f); - assertEquals(160, a.getMaximumActiveApplications()); + assertEquals(a.getAMResourceLimit(), Resources.createResource(160 * GB, 1)); csConf.setFloat(CapacitySchedulerConfiguration. MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f); LeafQueue newA = new LeafQueue(csContext, A, root, null); a.reinitialize(newA, clusterResource); assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f); - assertEquals(320, a.getMaximumActiveApplications()); + assertEquals(a.getAMResourceLimit(), Resources.createResource(320 * GB, 1)); Resource newClusterResource = Resources.createResource(100 * 20 * GB, 100 * 32); a.updateClusterResource(newClusterResource); // 100 * 20 * 0.2 = 400 - assertEquals(400, a.getMaximumActiveApplications()); + assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1)); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 2a49545..985609e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -77,6 +77,7 @@ public class TestReservations { .getRecordFactory(null); RMContext rmContext; + RMContext spyRMContext; CapacityScheduler cs; // CapacitySchedulerConfiguration csConf; CapacitySchedulerContext csContext; @@ -132,7 +133,10 @@ public class TestReservations { root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); - cs.setRMContext(rmContext); + spyRMContext = spy(rmContext); + when(spyRMContext.getScheduler()).thenReturn(cs); + + cs.setRMContext(spyRMContext); cs.init(csConf); cs.start(); } @@ -212,14 +216,14 @@ public class TestReservations { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -361,14 +365,14 @@ public class TestReservations { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -506,14 +510,14 @@ public class TestReservations { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -618,7 +622,7 @@ public class TestReservations { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); String host_0 = "host_0"; FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, @@ -685,7 +689,7 @@ public class TestReservations { .getMockApplicationAttemptId(0, 0); LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); String host_1 = "host_1"; FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, @@ -742,14 +746,14 @@ public class TestReservations { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -916,14 +920,14 @@ public class TestReservations { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes @@ -1042,14 +1046,14 @@ public class TestReservations { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), rmContext); + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes