Author: shv Date: Fri Jun 29 19:05:45 2012 New Revision: 1355514 URL: http://svn.apache.org/viewvc?rev=1355514&view=rev Log: MAPREDUCE-4360. Capacity Scheduler Hierarchical leaf queue does not honor the max capacity of container queue. Contributed by Mayank Bansal.
Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1355514&r1=1355513&r2=1355514&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Fri Jun 29 19:05:45 2012 @@ -83,6 +83,9 @@ Release 0.22.1 - Unreleased MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files get deleted from task tracker. (Mayank Bansal via shv) + MAPREDUCE-4360. Capacity Scheduler Hierarchical leaf queue does not honor + the max capacity of container queue. (Mayank Bansal via shv) + Release 0.22.0 - 2011-11-29 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1355514&r1=1355513&r2=1355514&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Jun 29 19:05:45 2012 @@ -540,11 +540,31 @@ class CapacityTaskScheduler extends Task } return true; } + } else { + QueueSchedulingContext qscParent = qsc.getParentQSC(); + while (qscParent != null) { + if (getTSC(qscParent).getMaxCapacity() < 0) { + qscParent = qscParent.getParentQSC(); + } else { + break; + } + } + if (qscParent != null) { + TaskSchedulingContext tsiParent = getTSC(qscParent); + if ((tsiParent.getNumSlotsOccupied() + noOfSlotsPerTask) > tsiParent + .getMaxCapacity()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + qscParent.getQueueName() + " " + + "has reached its max " + type + "Capacity"); + LOG.debug("Current running tasks " + tsiParent.getCapacity()); + } + return true; + } + } } return false; } - // for debugging. private void printQSCs() { if (LOG.isDebugEnabled()) { Modified: hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java?rev=1355514&r1=1355513&r2=1355514&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java Fri Jun 29 19:05:45 2012 @@ -102,9 +102,8 @@ class QueueHierarchyBuilder { if (childQueues != null && childQueues.size() > 0) { //generate a new ContainerQueue and recursively //create hierarchy. - AbstractQueue cq = - new ContainerQueue(parent, loadContext(qs.getProperties(), - qs.getQueueName(), schedConfig)); + AbstractQueue cq = new ContainerQueue(parent, loadContext( + qs.getProperties(), qs.getQueueName(), schedConfig, parent)); //update totalCapacity totalCapacity += cq.qsc.getCapacityPercent(); LOG.info("Created a ContainerQueue " + qs.getQueueName() @@ -115,9 +114,8 @@ class QueueHierarchyBuilder { //if not this is a JobQueue. //create a JobQueue. - AbstractQueue jq = - new JobQueue(parent, loadContext(qs.getProperties(), - qs.getQueueName(), schedConfig)); + AbstractQueue jq = new JobQueue(parent, loadContext( + qs.getProperties(), qs.getQueueName(), schedConfig, parent)); totalCapacity += jq.qsc.getCapacityPercent(); LOG.info("Created a jobQueue " + qs.getQueueName() + " and added it as a child to " + parent.getName()); @@ -151,7 +149,7 @@ class QueueHierarchyBuilder { * @return the generated {@link QueueSchedulingContext} object */ private QueueSchedulingContext loadContext(Properties props, - String queueName, CapacitySchedulerConf schedConf) { + String queueName, CapacitySchedulerConf schedConf, AbstractQueue parent) { schedConf.setProperties(queueName,props); float capacity = schedConf.getCapacity(queueName); float stretchCapacity = schedConf.getMaxCapacity(queueName); @@ -160,8 +158,8 @@ class QueueHierarchyBuilder { } int ulMin = schedConf.getMinimumUserLimitPercent(queueName); // create our QSC and add to our hashmap - QueueSchedulingContext qsi = new QueueSchedulingContext( - queueName, capacity, stretchCapacity, ulMin + QueueSchedulingContext qsi = new QueueSchedulingContext(queueName, + capacity, stretchCapacity, ulMin, parent.qsc ); qsi.setSupportsPriorities( schedConf.isPrioritySupported( @@ -178,7 +176,7 @@ class QueueHierarchyBuilder { */ static AbstractQueue createRootAbstractQueue() { QueueSchedulingContext rootContext = - new QueueSchedulingContext("", 100, -1, -1); + new QueueSchedulingContext("", 100, -1, -1, null); AbstractQueue root = new ContainerQueue(null, rootContext); return root; } Modified: hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java?rev=1355514&r1=1355513&r2=1355514&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java Fri Jun 29 19:05:45 2012 @@ -97,6 +97,12 @@ public class QueueSchedulingContext { */ private TaskSchedulingContext mapTSC; private TaskSchedulingContext reduceTSC; + + /** + * For Hierarchical queues we need to have the parent context for the child + * queue. + */ + private QueueSchedulingContext parentQSC; QueueSchedulingContext( String queueName, float capacityPercent, float maxCapacityPercent, @@ -108,7 +114,18 @@ public class QueueSchedulingContext { this.setMapTSC(new TaskSchedulingContext()); this.setReduceTSC(new TaskSchedulingContext()); } - + + QueueSchedulingContext(String queueName, float capacityPercent, + float maxCapacityPercent, int ulMin, QueueSchedulingContext parentQSC) { + this.setQueueName(queueName); + this.setCapacityPercent(capacityPercent); + this.setMaxCapacityPercent(maxCapacityPercent); + this.setUlMin(ulMin); + this.setMapTSC(new TaskSchedulingContext()); + this.setReduceTSC(new TaskSchedulingContext()); + this.setParentQSC(parentQSC); + } + /** * return information about the queue * @@ -282,4 +299,23 @@ public class QueueSchedulingContext { prevMapCapacity = getMapCapacity(); prevReduceCapacity = getReduceCapacity(); } + + /** + * This method returns the parent QSC object. + * + * @return parent QSC for this queue + */ + public QueueSchedulingContext getParentQSC() { + return parentQSC; + } + + /** + * Setting the parent QSC object for this queue. + * + * @param parentQSC + * Setting the parent QSC object + */ + public void setParentQSC(QueueSchedulingContext parentQSC) { + this.parentQSC = parentQSC; + } } Modified: hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=1355514&r1=1355513&r2=1355514&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java Fri Jun 29 19:05:45 2012 @@ -23,6 +23,10 @@ import java.util.List; import java.util.HashMap; import java.util.Map; import java.io.IOException; + +import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; + import static org.apache.hadoop.mapred.CapacityTestUtils.*; public class TestContainerQueue extends TestCase { @@ -236,10 +240,10 @@ public class TestContainerQueue extends // Create 2 levels of hierarchy. //Firt level - QueueSchedulingContext sch = - new QueueSchedulingContext("rt.sch", a, -1, -1); - QueueSchedulingContext gta = - new QueueSchedulingContext("rt.gta", b, -1, -1); + QueueSchedulingContext sch = new QueueSchedulingContext("rt.sch", a, -1, + -1, rt.qsc); + QueueSchedulingContext gta = new QueueSchedulingContext("rt.gta", b, -1, + -1, rt.qsc); AbstractQueue schq = new ContainerQueue(rt, sch); @@ -249,11 +253,11 @@ public class TestContainerQueue extends map.put(gtaq.getName(), gtaq); scheduler.jobQueuesManager.addQueue((JobQueue) gtaq); - //Create further children. - QueueSchedulingContext prod = - new QueueSchedulingContext("rt.sch.prod", c, -1, -1); - QueueSchedulingContext misc = - new QueueSchedulingContext("rt.sch.misc", d, -1, -1); + // Create further children. + QueueSchedulingContext prod = new QueueSchedulingContext("rt.sch.prod", c, + -1, -1, sch); + QueueSchedulingContext misc = new QueueSchedulingContext("rt.sch.misc", d, + -1, -1, sch); AbstractQueue prodq = new JobQueue(schq, prod); AbstractQueue miscq = new JobQueue(schq, misc); @@ -265,6 +269,101 @@ public class TestContainerQueue extends return map; } + public void testMaxCapacityContainerQueuehonuredInchildqueue() + throws IOException { + this.setUp(8, 1, 1); + taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager); + // set up some queues + Map<String, AbstractQueue> map = setUpHierarchy(50, 50, 50, 50); + scheduler.updateContextInfoForTests(); + // verify initial capacity distribution + TaskSchedulingContext mapTsc = map.get("rt.gta") + .getQueueSchedulingContext().getMapTSC(); + assertEquals(mapTsc.getCapacity(), 4); + + mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC(); + assertEquals(mapTsc.getCapacity(), 4); + + mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC(); + assertEquals(mapTsc.getCapacity(), 2); + + mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC(); + assertEquals(mapTsc.getCapacity(), 2); + + assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod", + "rt.sch.misc" }, new int[] { 0, 0, 0, 0 }); + + map.get("rt.sch").getQueueSchedulingContext().setMaxCapacityPercent(50.0f); + map.get("rt.sch").getQueueSchedulingContext().getMapTSC().setMaxCapacity(4); + map.get("rt.sch").getQueueSchedulingContext().getReduceTSC() + .setMaxCapacity(4); + + // Only Allow job submission to leaf queue + FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, 5, + 5, "rt.sch.prod", "u1"); + taskTrackerManager.initJob(fjob1); + + Map<String, String> expectedStrings = new HashMap<String, String>(); + expectedStrings.clear(); + expectedStrings.put(CapacityTestUtils.MAP, + "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put(CapacityTestUtils.REDUCE, + "attempt_test_0001_r_000001_0 on tt1"); + List<Task> task1 = checkMultipleTaskAssignment(taskTrackerManager, + scheduler, "tt1", expectedStrings); + assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod", + "rt.sch.misc" }, new int[] { 0, 1, 1, 0 }); + + expectedStrings.clear(); + expectedStrings.put(CapacityTestUtils.MAP, + "attempt_test_0001_m_000002_0 on tt2"); + expectedStrings.put(CapacityTestUtils.REDUCE, + "attempt_test_0001_r_000002_0 on tt2"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt2", + expectedStrings); + assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod", + "rt.sch.misc" }, new int[] { 0, 2, 2, 0 }); + + expectedStrings.clear(); + expectedStrings.put(CapacityTestUtils.MAP, + "attempt_test_0001_m_000003_0 on tt3"); + expectedStrings.put(CapacityTestUtils.REDUCE, + "attempt_test_0001_r_000003_0 on tt3"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt3", + expectedStrings); + assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod", + "rt.sch.misc" }, new int[] { 0, 3, 3, 0 }); + + expectedStrings.clear(); + expectedStrings.put(CapacityTestUtils.MAP, + "attempt_test_0001_m_000004_0 on tt4"); + expectedStrings.put(CapacityTestUtils.REDUCE, + "attempt_test_0001_r_000004_0 on tt4"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt4", + expectedStrings); + assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod", + "rt.sch.misc" }, new int[] { 0, 4, 4, 0 }); + + // we have already reached the limit + // this call would return null + List<Task> task5 = scheduler.assignTasks(tracker("tt5")); + assertNull(task5); + + // Now complete the task 1 i.e map task. + for (Task task : task1) { + taskTrackerManager.finishTask(task.getTaskID().toString(), fjob1); + } + expectedStrings.clear(); + expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt1"); + expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt1"); + task5 = checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", + expectedStrings); + } + + protected TaskTracker tracker(String taskTrackerName) { + return taskTrackerManager.getTaskTracker(taskTrackerName); + } + /** * Verifies that capacities are allocated properly in hierarchical queues. *