Author: acmurthy Date: Mon May 20 21:08:36 2013 New Revision: 1484597 URL: http://svn.apache.org/r1484597 Log: Merge -c 1484596 from branch-1 to branch-1.2 to fix MAPREDUCE-3859. Fix CapacityScheduler to correctly compute runtime queue limits for high-ram jobs. Contributed by Sergey Tryuber.
Modified: hadoop/common/branches/branch-1.2/CHANGES.txt hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Modified: hadoop/common/branches/branch-1.2/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1484597&r1=1484596&r2=1484597&view=diff ============================================================================== --- hadoop/common/branches/branch-1.2/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.2/CHANGES.txt Mon May 20 21:08:36 2013 @@ -17,6 +17,9 @@ Release 1.2.1 - Unreleased available for previous installs by putting it in hadoop-core.jar. (acmurthy) + MAPREDUCE-3859. Fix CapacityScheduler to correctly compute runtime queue + limits for high-ram jobs. (Sergey Tryuber via acmurthy) + Release 1.2.0 - 2013.05.05 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1484597&r1=1484596&r2=1484597&view=diff ============================================================================== --- hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (original) +++ hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Mon May 20 21:08:36 2013 @@ -1157,7 +1157,7 @@ class CapacitySchedulerQueue { int queueSlotsOccupied = getNumSlotsOccupied(taskType); int currentCapacity; - if (queueSlotsOccupied < queueCapacity) { + if (queueSlotsOccupied + numSlotsRequested <= queueCapacity) { currentCapacity = queueCapacity; } else { Modified: hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1484597&r1=1484596&r2=1484597&view=diff ============================================================================== --- hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/common/branches/branch-1.2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Mon May 20 21:08:36 2013 @@ -702,12 +702,18 @@ public class TestCapacityScheduler exten float capacity; boolean supportsPrio; int ulMin; + Float ulFactor; public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) { + this(queueName, capacity, supportsPrio, ulMin, null); + } + + public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin, Float ulFactor) { this.queueName = queueName; this.capacity = capacity; this.supportsPrio = supportsPrio; this.ulMin = ulMin; + this.ulFactor = ulFactor; } } @@ -733,18 +739,29 @@ public class TestCapacityScheduler exten /*public synchronized String getFirstQueue() { return firstQueue; }*/ - + + @Override public float getCapacity(String queue) { if(queueMap.get(queue).capacity == -1) { return super.getCapacity(queue); } return queueMap.get(queue).capacity; } - + + @Override public int getMinimumUserLimitPercent(String queue) { return queueMap.get(queue).ulMin; } - + + @Override + public float getUserLimitFactor(String queue) { + if(queueMap.get(queue).ulFactor != null) { + return queueMap.get(queue).ulFactor; + } + return super.getUserLimitFactor(queue); + } + + @Override public boolean isPrioritySupported(String queue) { return queueMap.get(queue).supportsPrio; } @@ -1333,6 +1350,88 @@ public class TestCapacityScheduler exten } /** + * Test checks that high memory job is able to consume more slots then + * queue's configured capacity, but not more then max capacity. + * (of course, if user-limit-factor was set up properly) + */ + public void testHighMemoryCanConsumeMaxCapacity() throws IOException { + //cluster with 20 map and 20 reduce slots + final int NUM_MAP_SLOTS = 4; + final int NUM_REDUCE_SLOTS = 4; + final int NUM_TASK_TRACKERS = 5; + + taskTrackerManager = + new FakeTaskTrackerManager(NUM_TASK_TRACKERS, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS); + + //Q1 capacity is 4*5*0.5=10 map and 4*5*0.5=10 reduce slots + final String Q1 = "q1"; + final float Q1_CAP = 50.f; + final int Q1_ULMIN = 50; + final float Q1_ULFACTOR = 2; + + //Q2 just to fill sum capacity up to 100% + final String Q2 = "q2"; + final float Q2_CAP = 50.f; + final int Q2_ULMIN = 50; + + taskTrackerManager.addQueues(new String[] { Q1, Q2 }); + ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>(); + + + queues.add(new FakeQueueInfo(Q1, Q1_CAP, true, Q1_ULMIN, Q1_ULFACTOR)); + queues.add(new FakeQueueInfo(Q2, Q2_CAP, true, Q2_ULMIN)); + resConf.setFakeQueues(queues); + + //q1 can go up to 4*5*0.8=16 map and 4*5*0.8=16 reduce slots + resConf.setMaxCapacity(Q1, 80.0f); + + //configure and start scheduler + scheduler.setTaskTrackerManager(taskTrackerManager); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 4 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 4 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + //submit high mem job with 5 mappers and 1 reducer with 4 slots each + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(4 * 1024); + jConf.setMemoryForReduceTask(4 * 1024); + jConf.setNumMapTasks(5); + jConf.setNumReduceTasks(1); + jConf.setQueueName(Q1); + jConf.setUser("u1"); + FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); + + //tt1-tt4 are full (max capacity of q1 is 16 slots) + List<Task> tasks = checkAssignments("tt1", + new String[] { + "attempt_test_0001_m_000001_0 on tt1", + "attempt_test_0001_r_000001_0 on tt1"}); + List<Task> tasks2 = checkAssignments("tt2", + new String[] {"attempt_test_0001_m_000002_0 on tt2"}); + List<Task> tasks3 = checkAssignments("tt3", + new String[] {"attempt_test_0001_m_000003_0 on tt3"}); + List<Task> tasks4 = checkAssignments("tt4", + new String[] {"attempt_test_0001_m_000004_0 on tt4"}); + + assertTrue("Shouldn't assign more slots (reached max capacity)", + scheduler.assignTasks(tracker("tt5")).isEmpty()); + + checkOccupiedSlots(Q1, TaskType.MAP, 1, 16, 160.0f, 1, 0); + checkOccupiedSlots(Q1, TaskType.REDUCE, 1, 4, 40.0f, 0, 2); + + //don't check 5th map task completeness. That's not this test case. + } + + /** * Creates a queue with max capacity of 50% * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are * given to high ram job and are reserved , no other tasks are accepted .