Author: tomwhite Date: Fri Nov 9 12:38:10 2012 New Revision: 1407433 URL: http://svn.apache.org/viewvc?rev=1407433&view=rev Log: YARN-183. Clean up fair scheduler code. Contributed by Sandy Ryza.
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java - copied, changed from r1407432, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java Removed: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Nov 9 12:38:10 2012 @@ -69,6 +69,8 @@ Release 2.0.3-alpha - Unreleased YARN-136. Make ClientToAMTokenSecretManager part of RMContext (Vinod Kumar Vavilapalli via sseth) + YARN-183. Clean up fair scheduler code. (Sandy Ryza via tomwhite) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Fri Nov 9 12:38:10 2012 @@ -101,7 +101,7 @@ public class AppSchedulable extends Sche @Override public Resource getResourceUsage() { - return this.app.getCurrentConsumption(); + return app.getCurrentConsumption(); } @@ -114,7 +114,7 @@ public class AppSchedulable extends Sche * Get metrics reference from containing queue. */ public QueueMetrics getMetrics() { - return this.queue.getQueueSchedulable().getMetrics(); + return queue.getQueueSchedulable().getMetrics(); } @Override @@ -190,9 +190,9 @@ public class AppSchedulable extends Sche RMContainer rmContainer = application.reserve(node, priority, null, container); node.reserveResource(application, priority, rmContainer); - getMetrics().reserveResource(this.app.getUser(), + getMetrics().reserveResource(app.getUser(), container.getResource()); - scheduler.getRootQueueMetrics().reserveResource(this.app.getUser(), + scheduler.getRootQueueMetrics().reserveResource(app.getUser(), container.getResource()); } @@ -257,13 +257,13 @@ public class AppSchedulable extends Sche // TODO this should subtract resource just assigned // TEMPROARY getMetrics().setAvailableResourcesToQueue( - this.scheduler.getClusterCapacity()); + scheduler.getClusterCapacity()); } // If we had previously made a reservation, delete it if (reserved) { - this.unreserve(application, priority, node); + unreserve(application, priority, node); } // Inform the node @@ -290,7 +290,7 @@ public class AppSchedulable extends Sche // Make sure the application still needs requests at this priority if (app.getTotalRequiredResources(priority) == 0) { - this.unreserve(app, priority, node); + unreserve(app, priority, node); return Resources.none(); } } else { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Fri Nov 9 12:38:10 2012 @@ -61,7 +61,7 @@ public class FSQueue { queueSchedulable.addApp(appSchedulable); } - public void removeJob(FSSchedulerApp app) { + public void removeApp(FSSchedulerApp app) { applications.remove(app); queueSchedulable.removeApp(app); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java Fri Nov 9 12:38:10 2012 @@ -80,7 +80,7 @@ public class FSQueueSchedulable extends this.scheduler = scheduler; this.queue = queue; this.queueMgr = scheduler.getQueueManager(); - this.metrics = QueueMetrics.forQueue(this.getName(), null, true, scheduler.getConf()); + this.metrics = QueueMetrics.forQueue(getName(), null, true, scheduler.getConf()); this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); } @@ -113,7 +113,7 @@ public class FSQueueSchedulable extends Resource toAdd = sched.getDemand(); if (LOG.isDebugEnabled()) { LOG.debug("Counting resource from " + sched.getName() + " " + toAdd - + "; Total resource consumption for " + this.getName() + " now " + + "; Total resource consumption for " + getName() + " now " + demand); } demand = Resources.add(demand, toAdd); @@ -123,7 +123,7 @@ public class FSQueueSchedulable extends } } if (LOG.isDebugEnabled()) { - LOG.debug("The updated demand for " + this.getName() + " is " + demand + LOG.debug("The updated demand for " + getName() + " is " + demand + "; the max is " + maxRes); } } @@ -164,9 +164,9 @@ public class FSQueueSchedulable extends @Override public Resource assignContainer(FSSchedulerNode node, boolean reserved) { - LOG.debug("Node offered to queue: " + this.getName() + " reserved: " + reserved); + LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved); // If this queue is over its limit, reject - if (Resources.greaterThan(this.getResourceUsage(), + if (Resources.greaterThan(getResourceUsage(), queueMgr.getMaxResources(queue.getName()))) { return Resources.none(); } @@ -258,7 +258,7 @@ public class FSQueueSchedulable extends @Override public Map<QueueACL, AccessControlList> getQueueAcls() { - Map<QueueACL, AccessControlList> acls = this.queueMgr.getQueueAcls(this.getName()); + Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName()); return new HashMap<QueueACL, AccessControlList>(acls); } @@ -284,7 +284,7 @@ public class FSQueueSchedulable extends recordFactory.newRecordInstance(QueueUserACLInfo.class); List<QueueACL> operations = new ArrayList<QueueACL>(); for (QueueACL operation : QueueACL.values()) { - Map<QueueACL, AccessControlList> acls = this.queueMgr.getQueueAcls(this.getName()); + Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName()); if (acls.get(operation).isUserAllowed(user)) { operations.add(operation); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Nov 9 12:38:10 2012 @@ -112,12 +112,12 @@ public class FSSchedulerApp extends Sche } public ApplicationId getApplicationId() { - return this.appSchedulingInfo.getApplicationId(); + return appSchedulingInfo.getApplicationId(); } @Override public ApplicationAttemptId getApplicationAttemptId() { - return this.appSchedulingInfo.getApplicationAttemptId(); + return appSchedulingInfo.getApplicationAttemptId(); } public void setAppSchedulable(AppSchedulable appSchedulable) { @@ -129,7 +129,7 @@ public class FSSchedulerApp extends Sche } public String getUser() { - return this.appSchedulingInfo.getUser(); + return appSchedulingInfo.getUser(); } public synchronized void updateResourceRequests( @@ -138,19 +138,19 @@ public class FSSchedulerApp extends Sche } public Map<String, ResourceRequest> getResourceRequests(Priority priority) { - return this.appSchedulingInfo.getResourceRequests(priority); + return appSchedulingInfo.getResourceRequests(priority); } public int getNewContainerId() { - return this.appSchedulingInfo.getNewContainerId(); + return appSchedulingInfo.getNewContainerId(); } public Collection<Priority> getPriorities() { - return this.appSchedulingInfo.getPriorities(); + return appSchedulingInfo.getPriorities(); } public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { - return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress); + return appSchedulingInfo.getResourceRequest(priority, nodeAddress); } public synchronized int getTotalRequiredResources(Priority priority) { @@ -158,7 +158,7 @@ public class FSSchedulerApp extends Sche } public Resource getResource(Priority priority) { - return this.appSchedulingInfo.getResource(priority); + return appSchedulingInfo.getResource(priority); } /** @@ -167,11 +167,11 @@ public class FSSchedulerApp extends Sche */ @Override public boolean isPending() { - return this.appSchedulingInfo.isPending(); + return appSchedulingInfo.isPending(); } public String getQueueName() { - return this.appSchedulingInfo.getQueueName(); + return appSchedulingInfo.getQueueName(); } /** @@ -185,7 +185,7 @@ public class FSSchedulerApp extends Sche public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { // Cleanup all scheduling information - this.appSchedulingInfo.stop(rmAppAttemptFinalState); + appSchedulingInfo.stop(rmAppAttemptFinalState); } @SuppressWarnings("unchecked") @@ -196,7 +196,7 @@ public class FSSchedulerApp extends Sche getRMContainer(containerId); if (rmContainer == null) { // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() + rmContext.getDispatcher().getEventHandler() .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); return; } @@ -272,7 +272,7 @@ public class FSSchedulerApp extends Sche } synchronized public void addSchedulingOpportunity(Priority priority) { - this.schedulingOpportunities.setCount(priority, + schedulingOpportunities.setCount(priority, schedulingOpportunities.count(priority) + 1); } @@ -282,19 +282,19 @@ public class FSSchedulerApp extends Sche * successfully did so. */ synchronized public int getSchedulingOpportunities(Priority priority) { - return this.schedulingOpportunities.count(priority); + return schedulingOpportunities.count(priority); } synchronized void resetReReservations(Priority priority) { - this.reReservations.setCount(priority, 0); + reReservations.setCount(priority, 0); } synchronized void addReReservation(Priority priority) { - this.reReservations.add(priority); + reReservations.add(priority); } synchronized public int getReReservations(Priority priority) { - return this.reReservations.count(priority); + return reReservations.count(priority); } public synchronized int getNumReservedContainers(Priority priority) { @@ -458,8 +458,8 @@ public class FSSchedulerApp extends Sche * @param priority The priority of the container scheduled. */ synchronized public void resetSchedulingOpportunities(Priority priority) { - this.lastScheduledContainer.put(priority, System.currentTimeMillis()); - this.schedulingOpportunities.setCount(priority, 0); + lastScheduledContainer.put(priority, System.currentTimeMillis()); + schedulingOpportunities.setCount(priority, 0); } /** @@ -494,14 +494,14 @@ public class FSSchedulerApp extends Sche rackLocalityThreshold; // Relax locality constraints once we've surpassed threshold. - if (this.getSchedulingOpportunities(priority) > (numNodes * threshold)) { + if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { if (allowed.equals(NodeType.NODE_LOCAL)) { allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); - this.resetSchedulingOpportunities(priority); + resetSchedulingOpportunities(priority); } else if (allowed.equals(NodeType.RACK_LOCAL)) { allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); - this.resetSchedulingOpportunities(priority); + resetSchedulingOpportunities(priority); } } return allowedLocalityLevel.get(priority); @@ -512,7 +512,7 @@ public class FSSchedulerApp extends Sche Priority priority, ResourceRequest request, Container container) { // Update allowed locality level - NodeType allowed = this.allowedLocalityLevel.get(priority); + NodeType allowed = allowedLocalityLevel.get(priority); if (allowed != null) { if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(NodeType.NODE_LOCAL) || @@ -532,9 +532,9 @@ public class FSSchedulerApp extends Sche } // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, this - .getApplicationAttemptId(), node.getNodeID(), this.rmContext - .getDispatcher().getEventHandler(), this.rmContext + RMContainer rmContainer = new RMContainerImpl(container, + getApplicationAttemptId(), node.getNodeID(), rmContext + .getDispatcher().getEventHandler(), rmContext .getContainerAllocationExpirer()); // Add it to allContainers list. Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Fri Nov 9 12:38:10 2012 @@ -67,25 +67,25 @@ public class FSSchedulerNode extends Sch } public RMNode getRMNode() { - return this.rmNode; + return rmNode; } public NodeId getNodeID() { - return this.rmNode.getNodeID(); + return rmNode.getNodeID(); } public String getHttpAddress() { - return this.rmNode.getHttpAddress(); + return rmNode.getHttpAddress(); } @Override public String getHostName() { - return this.rmNode.getHostName(); + return rmNode.getHostName(); } @Override public String getRackName() { - return this.rmNode.getRackName(); + return rmNode.getRackName(); } /** @@ -112,17 +112,18 @@ public class FSSchedulerNode extends Sch @Override public synchronized Resource getAvailableResource() { - return this.availableResource; + return availableResource; } @Override public synchronized Resource getUsedResource() { - return this.usedResource; + return usedResource; } private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) + if (launchedContainers.containsKey(c.getId())) { return true; + } return false; } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 9 12:38:10 2012 @@ -139,11 +139,11 @@ public class FairScheduler implements Re public FairSchedulerConfiguration getConf() { - return this.conf; + return conf; } public QueueManager getQueueManager() { - return this.queueMgr; + return queueMgr; } public List<FSQueueSchedulable> getQueueSchedulables() { @@ -183,36 +183,34 @@ public class FairScheduler implements Re * fair shares, deficits, minimum slot allocations, and amount of used and * required resources per job. */ - protected void update() { - synchronized (this) { - queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file - updateRunnability(); // Set job runnability based on user/queue limits - updatePreemptionVariables(); // Determine if any queues merit preemption + protected synchronized void update() { + queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file + updateRunnability(); // Set job runnability based on user/queue limits + updatePreemptionVariables(); // Determine if any queues merit preemption - // Update demands of apps and queues - for (FSQueue queue: queueMgr.getQueues()) { - queue.getQueueSchedulable().updateDemand(); - } - - // Compute fair shares based on updated demands - List<FSQueueSchedulable> queueScheds = this.getQueueSchedulables(); - SchedulingAlgorithms.computeFairShares( - queueScheds, clusterCapacity); + // Update demands of apps and queues + for (FSQueue queue: queueMgr.getQueues()) { + queue.getQueueSchedulable().updateDemand(); + } - // Update queue metrics for this queue - for (FSQueueSchedulable sched : queueScheds) { - sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare()); - } + // Compute fair shares based on updated demands + List<FSQueueSchedulable> queueScheds = getQueueSchedulables(); + SchedulingAlgorithms.computeFairShares( + queueScheds, clusterCapacity); - // Use the computed shares to assign shares within each queue - for (FSQueue queue: queueMgr.getQueues()) { - queue.getQueueSchedulable().redistributeShare(); - } + // Update queue metrics for this queue + for (FSQueueSchedulable sched : queueScheds) { + sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare()); + } - // Update recorded capacity of root queue (child queues are updated - // when fair share is calculated). - rootMetrics.setAvailableResourcesToQueue(clusterCapacity); + // Use the computed shares to assign shares within each queue + for (FSQueue queue: queueMgr.getQueues()) { + queue.getQueueSchedulable().redistributeShare(); } + + // Update recorded capacity of root queue (child queues are updated + // when fair share is calculated). + rootMetrics.setAvailableResourcesToQueue(clusterCapacity); } /** @@ -257,17 +255,16 @@ public class FairScheduler implements Re * have been below half their fair share for the fairSharePreemptionTimeout. * If such queues exist, compute how many tasks of each type need to be * preempted and then select the right ones using preemptTasks. - * - * This method computes and logs the number of tasks we want to preempt even - * if preemption is disabled, for debugging purposes. */ protected void preemptTasksIfNecessary() { - if (!preemptionEnabled) + if (!preemptionEnabled) { return; + } long curTime = clock.getTime(); - if (curTime - lastPreemptCheckTime < preemptionInterval) + if (curTime - lastPreemptCheckTime < preemptionInterval) { return; + } lastPreemptCheckTime = curTime; Resource resToPreempt = Resources.none(); @@ -288,8 +285,9 @@ public class FairScheduler implements Re * lowest priority to preempt. */ protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) { - if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) + if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { return; + } Map<RMContainer, FSSchedulerApp> apps = new HashMap<RMContainer, FSSchedulerApp>(); @@ -330,7 +328,7 @@ public class FairScheduler implements Re // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). - this.completedContainer(container, status, RMContainerEventType.KILL); + completedContainer(container, status, RMContainerEventType.KILL); toPreempt = Resources.subtract(toPreempt, container.getContainer().getResource()); @@ -413,7 +411,7 @@ public class FairScheduler implements Re } public RMContainerTokenSecretManager getContainerTokenSecretManager() { - return this.rmContext.getContainerTokenSecretManager(); + return rmContext.getContainerTokenSecretManager(); } public double getAppWeight(AppSchedulable app) { @@ -437,28 +435,28 @@ public class FairScheduler implements Re @Override public Resource getMinimumResourceCapability() { - return this.minimumAllocation; + return minimumAllocation; } @Override public Resource getMaximumResourceCapability() { - return this.maximumAllocation; + return maximumAllocation; } public double getNodeLocalityThreshold() { - return this.nodeLocalityThreshold; + return nodeLocalityThreshold; } public double getRackLocalityThreshold() { - return this.rackLocalityThreshold; + return rackLocalityThreshold; } public Resource getClusterCapacity() { - return this.clusterCapacity; + return clusterCapacity; } public Clock getClock() { - return this.clock; + return clock; } protected void setClock(Clock clock) { @@ -478,11 +476,11 @@ public class FairScheduler implements Re addApplication(ApplicationAttemptId applicationAttemptId, String queueName, String user) { - FSQueue queue = this.queueMgr.getQueue(queueName); + FSQueue queue = queueMgr.getQueue(queueName); FSSchedulerApp schedulerApp = new FSSchedulerApp(applicationAttemptId, user, - queue.getQueueSchedulable(), new ActiveUsersManager(this.getRootQueueMetrics()), + queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()), rmContext, null); // Inforce ACLs @@ -553,8 +551,8 @@ public class FairScheduler implements Re application.stop(rmAppAttemptFinalState); // Inform the queue - FSQueue queue = this.queueMgr.getQueue(application.getQueue().getQueueName()); - queue.removeJob(application); + FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName()); + queue.removeApp(application); // Remove from our data-structure applications.remove(applicationAttemptId); @@ -600,7 +598,7 @@ public class FairScheduler implements Re } private synchronized void addNode(RMNode node) { - this.nodes.put(node.getNodeID(), new FSSchedulerNode(node)); + nodes.put(node.getNodeID(), new FSSchedulerNode(node)); Resources.addTo(clusterCapacity, node.getTotalCapability()); LOG.info("Added node " + node.getNodeAddress() + @@ -608,7 +606,7 @@ public class FairScheduler implements Re } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = this.nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = nodes.get(rmNode.getNodeID()); Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); // Remove running containers @@ -631,7 +629,7 @@ public class FairScheduler implements Re RMContainerEventType.KILL); } - this.nodes.remove(rmNode.getNodeID()); + nodes.remove(rmNode.getNodeID()); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterCapacity); } @@ -669,10 +667,8 @@ public class FairScheduler implements Re } synchronized (application) { - if (!ask.isEmpty()) { - - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("allocate: pre-update" + " applicationAttemptId=" + appAttemptId + " application=" + application.getApplicationId()); @@ -686,7 +682,7 @@ public class FairScheduler implements Re application.showRequests(); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("allocate:" + " applicationAttemptId=" + appAttemptId + " #ask=" + ask.size()); @@ -764,7 +760,7 @@ public class FairScheduler implements Re int assignedContainers = 0; while (true) { // At most one task is scheduled each iteration of this loop - List<FSQueueSchedulable> scheds = this.getQueueSchedulables(); + List<FSQueueSchedulable> scheds = getQueueSchedulables(); Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); boolean assignedContainer = false; for (FSQueueSchedulable sched : scheds) { @@ -796,11 +792,11 @@ public class FairScheduler implements Re @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { - if (!this.applications.containsKey(appAttemptId)) { + if (!applications.containsKey(appAttemptId)) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; } - return new SchedulerAppReport(this.applications.get(appAttemptId)); + return new SchedulerAppReport(applications.get(appAttemptId)); } @Override @@ -812,37 +808,30 @@ public class FairScheduler implements Re public void handle(SchedulerEvent event) { switch(event.getType()) { case NODE_ADDED: - { if (!(event instanceof NodeAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); - } - break; + break; case NODE_REMOVED: - { if (!(event instanceof NodeRemovedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; removeNode(nodeRemovedEvent.getRemovedRMNode()); - } - break; + break; case NODE_UPDATE: - { if (!(event instanceof NodeUpdateSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - this.nodeUpdate(nodeUpdatedEvent.getRMNode(), + nodeUpdate(nodeUpdatedEvent.getRMNode(), nodeUpdatedEvent.getNewlyLaunchedContainers(), nodeUpdatedEvent.getCompletedContainers()); - } - break; + break; case APP_ADDED: - { if (!(event instanceof AppAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } @@ -857,20 +846,16 @@ public class FairScheduler implements Re addApplication(appAddedEvent.getApplicationAttemptId(), queue, appAddedEvent.getUser()); - } - break; + break; case APP_REMOVED: - { if (!(event instanceof AppRemovedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; - this.removeApplication(appRemovedEvent.getApplicationAttemptID(), + removeApplication(appRemovedEvent.getApplicationAttemptID(), appRemovedEvent.getFinalAttemptState()); - } - break; + break; case CONTAINER_EXPIRED: - { if (!(event instanceof ContainerExpiredSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } @@ -882,8 +867,7 @@ public class FairScheduler implements Re containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); - } - break; + break; default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } @@ -897,9 +881,9 @@ public class FairScheduler implements Re @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { - if (!this.initialized) { + if (!initialized) { this.conf = new FairSchedulerConfiguration(conf); - this.rootMetrics = QueueMetrics.forQueue("root", null, true, conf); + rootMetrics = QueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; this.clock = new SystemClock(); this.eventLog = new FairSchedulerEventLog(); @@ -973,7 +957,7 @@ public class FairScheduler implements Re @Override public int getNumClusterNodes() { - return this.nodes.size(); + return nodes.size(); } } Copied: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (from r1407432, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java) URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?p2=hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java&p1=hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java&r1=1407432&r2=1407433&rev=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Fri Nov 9 12:38:10 2012 @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configured */ @Private @Unstable -public class NewJobWeightBooster extends Configured implements WeightAdjuster { +public class NewAppWeightBooster extends Configured implements WeightAdjuster { private static final float DEFAULT_FACTOR = 3; private static final long DEFAULT_DURATION = 5 * 60 * 1000; Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 9 12:38:10 2012 @@ -202,7 +202,7 @@ public class QueueManager { * Get the queue for a given AppSchedulable. */ public FSQueue getQueueForApp(AppSchedulable app) { - return this.getQueue(app.getApp().getQueueName()); + return getQueue(app.getApp().getQueueName()); } /** @@ -388,7 +388,7 @@ public class QueueManager { // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. - synchronized(this) { + synchronized (this) { setMinResources(minQueueResources); setMaxResources(maxQueueResources); setQueueMaxApps(queueMaxApps); @@ -431,14 +431,14 @@ public class QueueManager { synchronized(minQueueResourcesMO) { if (minQueueResources.containsKey(queue)) { return minQueueResources.get(queue); - } else{ + } else { return Resources.createResource(0); } } } private void setMinResources(Map<String, Resource> resources) { - synchronized(minQueueResourcesMO) { + synchronized (minQueueResourcesMO) { minQueueResources = resources; } } @@ -457,7 +457,7 @@ public class QueueManager { } private void setMaxResources(Map<String, Resource> resources) { - synchronized(maxQueueResourcesMO) { + synchronized (maxQueueResourcesMO) { maxQueueResources = resources; } } @@ -472,8 +472,8 @@ public class QueueManager { /** * Remove an app */ - public synchronized void removeJob(FSSchedulerApp app) { - getQueue(app.getQueueName()).removeJob(app); + public synchronized void removeApp(FSSchedulerApp app) { + getQueue(app.getQueueName()).removeApp(app); } /** @@ -543,7 +543,7 @@ public class QueueManager { } private int getQueueMaxAppsDefault(){ - synchronized(queueMaxAppsDefaultMO) { + synchronized (queueMaxAppsDefaultMO) { return queueMaxAppsDefault; } } @@ -575,11 +575,12 @@ public class QueueManager { queueWeights = weights; } } + /** - * Get a queue's min share preemption timeout, in milliseconds. This is the - * time after which jobs in the queue may kill other queues' tasks if they - * are below their min share. - */ + * Get a queue's min share preemption timeout, in milliseconds. This is the + * time after which jobs in the queue may kill other queues' tasks if they + * are below their min share. + */ public long getMinSharePreemptionTimeout(String queueName) { synchronized (minSharePreemptionTimeoutsMO) { if (minSharePreemptionTimeouts.containsKey(queueName)) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1407433&r1=1407432&r2=1407433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Fri Nov 9 12:38:10 2012 @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurab /** * A pluggable object for altering the weights of apps in the fair scheduler, - * which is used for example by {@link NewJobWeightBooster} to give higher + * which is used for example by {@link NewAppWeightBooster} to give higher * weight to new jobs so that short jobs finish faster. * * May implement {@link Configurable} to access configuration parameters.