Author: todd Date: Tue Nov 22 19:35:39 2011 New Revision: 1205141 URL: http://svn.apache.org/viewvc?rev=1205141&view=rev Log: MAPREDUCE-2905. Fix fair scheduler to prevent clumping of tasks when assignmultiple is enabled. Contributed by Todd Lipcon.
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1205141&r1=1205140&r2=1205141&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Nov 22 19:35:39 2011 @@ -46,6 +46,9 @@ Release 0.20.206.0 - unreleased MAPREDUCE-2377. task-controller fails to parse configuration if it doesn't end in \n. (todd via eli) + MAPREDUCE-2905. Fix fair scheduler to prevent clumping of tasks when + assignmultiple is enabled. (todd) + IMPROVEMENTS MAPREDUCE-3008. [Gridmix] Improve cumulative CPU usage emulation for Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=1205141&r1=1205140&r2=1205141&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (original) +++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Tue Nov 22 19:35:39 2011 @@ -49,16 +49,17 @@ public class CapBasedLoadManager extends @Override public boolean canAssignMap(TaskTrackerStatus tracker, - int totalRunnableMaps, int totalMapSlots) { - return tracker.countMapTasks() < getCap(totalRunnableMaps, - tracker.getMaxMapSlots(), totalMapSlots); + int totalRunnableMaps, int totalMapSlots, int alreadyAssigned) { + int cap = getCap(totalRunnableMaps, tracker.getMaxMapSlots(), totalMapSlots); + return tracker.countMapTasks() + alreadyAssigned < cap; } @Override public boolean canAssignReduce(TaskTrackerStatus tracker, - int totalRunnableReduces, int totalReduceSlots) { - return tracker.countReduceTasks() < getCap(totalRunnableReduces, - tracker.getMaxReduceSlots(), totalReduceSlots); + int totalRunnableReduces, int totalReduceSlots, int alreadyAssigned) { + int cap = getCap(totalRunnableReduces, tracker.getMaxReduceSlots(), + totalReduceSlots); + return tracker.countReduceTasks() + alreadyAssigned < cap; } @Override Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1205141&r1=1205140&r2=1205141&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Nov 22 19:35:39 2011 @@ -439,7 +439,8 @@ public class FairScheduler extends TaskS if (!mapRejected) { if (mapsAssigned == mapCapacity || runningMaps == runnableMaps || - !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) { + !loadMgr.canAssignMap(tts, runnableMaps, + totalMapSlots, mapsAssigned)) { eventLog.log("INFO", "Can't assign another MAP to " + trackerName); mapRejected = true; } @@ -447,7 +448,8 @@ public class FairScheduler extends TaskS if (!reduceRejected) { if (reducesAssigned == reduceCapacity || runningReduces == runnableReduces || - !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) { + !loadMgr.canAssignReduce(tts, runnableReduces, + totalReduceSlots, reducesAssigned)) { eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName); reduceRejected = true; } @@ -470,7 +472,8 @@ public class FairScheduler extends TaskS } else { // If both types are available, choose the task type with fewer running // tasks on the task tracker to prevent that task type from starving - if (tts.countMapTasks() <= tts.countReduceTasks()) { + if (tts.countMapTasks() + mapsAssigned <= + tts.countReduceTasks() + reducesAssigned) { taskType = TaskType.MAP; } else { taskType = TaskType.REDUCE; Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=1205141&r1=1205140&r2=1205141&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (original) +++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Tue Nov 22 19:35:39 2011 @@ -72,10 +72,12 @@ public abstract class LoadManager implem * @param tracker The machine we wish to run a new map on * @param totalRunnableMaps Set of running jobs in the cluster * @param totalMapSlots The total number of map slots in the cluster + * @param alreadyAssigned the number of maps already assigned to + * this tracker during this heartbeat * @return true if another map can be launched on <code>tracker</code> */ public abstract boolean canAssignMap(TaskTrackerStatus tracker, - int totalRunnableMaps, int totalMapSlots); + int totalRunnableMaps, int totalMapSlots, int alreadyAssigned); /** * Can a given {@link TaskTracker} run another reduce task? @@ -84,10 +86,12 @@ public abstract class LoadManager implem * @param tracker The machine we wish to run a new map on * @param totalRunnableReduces Set of running jobs in the cluster * @param totalReduceSlots The total number of reduce slots in the cluster + * @param alreadyAssigned the number of reduces already assigned to + * this tracker during this heartbeat * @return true if another reduce can be launched on <code>tracker</code> */ public abstract boolean canAssignReduce(TaskTrackerStatus tracker, - int totalRunnableReduces, int totalReduceSlots); + int totalRunnableReduces, int totalReduceSlots, int alreadyAssigned); /** * Can a given {@link TaskTracker} run another new task from a given job? Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java?rev=1205141&r1=1205140&r2=1205141&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java (original) +++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java Tue Nov 22 19:35:39 2011 @@ -75,7 +75,7 @@ public class TestCapBasedLoadManager ext * A single test of canAssignMap. */ private void oneTestCanAssignMap(float maxDiff, int mapCap, int runningMap, - int totalMapSlots, int totalRunnableMap, boolean expected) { + int totalMapSlots, int totalRunnableMap, int expectedAssigned) { CapBasedLoadManager manager = new CapBasedLoadManager(); Configuration conf = new Configuration(); @@ -84,14 +84,16 @@ public class TestCapBasedLoadManager ext TaskTrackerStatus ts = getTaskTrackerStatus(mapCap, 1, runningMap, 1); + int numAssigned = 0; + while (manager.canAssignMap(ts, totalRunnableMap, totalMapSlots, numAssigned)) { + numAssigned++; + } + assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableMap=" + totalRunnableMap + " and totalMapSlots=" + totalMapSlots + ", a tracker with runningMap=" + runningMap + " and mapCap=" - + mapCap + " should " + (expected ? "" : "not ") - + "be able to take more Maps.", - expected, - manager.canAssignMap(ts, totalRunnableMap, totalMapSlots) - ); + + mapCap + " should be able to assign " + expectedAssigned + " maps", + expectedAssigned, numAssigned); } @@ -99,52 +101,60 @@ public class TestCapBasedLoadManager ext * Test canAssignMap method. */ public void testCanAssignMap() { - oneTestCanAssignMap(0.0f, 5, 0, 50, 1, true); - oneTestCanAssignMap(0.0f, 5, 1, 50, 10, false); - oneTestCanAssignMap(0.2f, 5, 1, 50, 10, true); - oneTestCanAssignMap(0.0f, 5, 1, 50, 11, true); - oneTestCanAssignMap(0.0f, 5, 2, 50, 11, false); - oneTestCanAssignMap(0.3f, 5, 2, 50, 6, true); - oneTestCanAssignMap(1.0f, 5, 5, 50, 50, false); + oneTestCanAssignMap(0.0f, 5, 0, 50, 1, 1); + oneTestCanAssignMap(0.0f, 5, 1, 50, 10, 0); + // 20% load + 20% diff = 40% of available slots, but rounds + // up with floating point error: so we get 3/5 slots on TT. + // 1 already taken, so assigns 2 more + oneTestCanAssignMap(0.2f, 5, 1, 50, 10, 2); + oneTestCanAssignMap(0.0f, 5, 1, 50, 11, 1); + oneTestCanAssignMap(0.0f, 5, 2, 50, 11, 0); + oneTestCanAssignMap(0.3f, 5, 2, 50, 6, 1); + oneTestCanAssignMap(1.0f, 5, 5, 50, 50, 0); } /** * A single test of canAssignReduce. */ - private void oneTestCanAssignReduce(float maxDiff, int ReduceCap, + private void oneTestCanAssignReduce(float maxDiff, int reduceCap, int runningReduce, int totalReduceSlots, int totalRunnableReduce, - boolean expected) { + int expectedAssigned) { CapBasedLoadManager manager = new CapBasedLoadManager(); Configuration conf = new Configuration(); conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff); manager.setConf(conf); - TaskTrackerStatus ts = getTaskTrackerStatus(1, ReduceCap, 1, + TaskTrackerStatus ts = getTaskTrackerStatus(1, reduceCap, 1, runningReduce); + int numAssigned = 0; + while (manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots, numAssigned)) { + numAssigned++; + } + assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableReduce=" + totalRunnableReduce + " and totalReduceSlots=" + totalReduceSlots - + ", a tracker with runningReduce=" + runningReduce - + " and ReduceCap=" + ReduceCap + " should " - + (expected ? "" : "not ") + "be able to take more Reduces.", - expected, - manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots) - ); + + ", a tracker with runningReduce=" + runningReduce + " and reduceCap=" + + reduceCap + " should be able to assign " + expectedAssigned + " reduces", + expectedAssigned, numAssigned); } /** * Test canAssignReduce method. */ public void testCanAssignReduce() { - oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, true); - oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, false); - oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, true); - oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, true); - oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, false); - oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, true); - oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, false); + oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, 1); + oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, 0); + // 20% load + 20% diff = 40% of available slots, but rounds + // up with floating point error: so we get 3/5 slots on TT. + // 1 already taken, so assigns 2 more + oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, 2); + oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, 1); + oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, 0); + oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, 1); + oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, 0); } } Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1205141&r1=1205140&r2=1205141&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Nov 22 19:35:39 2011 @@ -479,7 +479,12 @@ public class TestFairScheduler extends T statuses.put(attemptId, status); trackerForTip.put(attemptId, trackerStatus); status.setRunState(TaskStatus.State.RUNNING); - trackerStatus.getTaskReports().add(status); + } + + public void reportTaskOnTracker(String trackerName, Task t) { + FakeTaskInProgress tip = tips.get(t.getTaskID().toString()); + TaskTrackerStatus trackerStatus = trackers.get(trackerName).getStatus(); + trackerStatus.getTaskReports().add(tip.getTaskStatus(t.getTaskID())); } public void finishTask(String taskTrackerName, String attemptId) { @@ -2852,6 +2857,9 @@ public class TestFairScheduler extends T protected void checkAssignment(String taskTrackerName, String... expectedTasks) throws IOException { List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName)); + for (Task t : tasks) { + taskTrackerManager.reportTaskOnTracker(taskTrackerName, t); + } assertNotNull(tasks); System.out.println("Assigned tasks:"); for (int i = 0; i < tasks.size(); i++)