Author: bobby Date: Tue Jun 26 19:18:49 2012 New Revision: 1354182 URL: http://svn.apache.org/viewvc?rev=1354182&view=rev Log: svn merge -c 1354181 FIXES: MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working properly (Jason Lowe via bobby)
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1354182&r1=1354181&r2=1354182&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Jun 26 19:18:49 2012 @@ -498,6 +498,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4295. RM crashes due to DNS issue (tgraves) + MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working + properly (Jason Lowe via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1354182&r1=1354181&r2=1354182&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jun 26 19:18:49 2012 @@ -417,15 +417,6 @@ public class RMContainerAllocator extend LOG.info("Recalculating schedule..."); - //if all maps are assigned, then ramp up all reduces irrespective of the - //headroom - if (scheduledMaps == 0 && numPendingReduces > 0) { - LOG.info("All maps assigned. " + - "Ramping up all remaining reduces:" + numPendingReduces); - scheduleAllReduces(); - return; - } - //check for slow start if (!getIsReduceStarted()) {//not set yet int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * @@ -441,6 +432,15 @@ public class RMContainerAllocator extend } } + //if all maps are assigned, then ramp up all reduces irrespective of the + //headroom + if (scheduledMaps == 0 && numPendingReduces > 0) { + LOG.info("All maps assigned. " + + "Ramping up all remaining reduces:" + numPendingReduces); + scheduleAllReduces(); + return; + } + float completedMapPercent = 0f; if (totalMaps != 0) {//support for 0 maps completedMapPercent = (float)completedMaps/totalMaps; @@ -498,7 +498,8 @@ public class RMContainerAllocator extend } } - private void scheduleAllReduces() { + @Private + public void scheduleAllReduces() { for (ContainerRequest req : pendingReduces) { scheduledRequests.addReduce(req); } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1354182&r1=1354181&r2=1354182&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jun 26 19:18:49 2012 @@ -18,15 +18,24 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Matchers.anyFloat; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import junit.framework.Assert; @@ -65,9 +74,10 @@ import org.apache.hadoop.yarn.api.AMRMPr import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -76,13 +86,11 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Test; @@ -428,29 +436,21 @@ public class TestRMContainerAllocator { // Finish off 1 map. Iterator<Task> it = job.getTasks().values().iterator(); - finishNextNTasks(mrApp, it, 1); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.095f, job.getProgress(), 0.001f); Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); // Finish off 7 more so that map-progress is 80% - finishNextNTasks(mrApp, it, 7); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.41f, job.getProgress(), 0.001f); Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); // Finish off the 2 remaining maps - finishNextNTasks(mrApp, it, 2); - - // Wait till all reduce-attempts request for containers - for (Task t : job.getTasks().values()) { - if (t.getType() == TaskType.REDUCE) { - mrApp.waitForState(t.getAttempts().values().iterator().next(), - TaskAttemptState.UNASSIGNED); - } - } + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); rmDispatcher.await(); @@ -467,7 +467,7 @@ public class TestRMContainerAllocator { } // Finish off 2 reduces - finishNextNTasks(mrApp, it, 2); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); rmDispatcher.await(); @@ -475,7 +475,7 @@ public class TestRMContainerAllocator { Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off the remaining 8 reduces. - finishNextNTasks(mrApp, it, 8); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8); allocator.schedule(); rmDispatcher.await(); // Remaining is JobCleanup @@ -483,19 +483,28 @@ public class TestRMContainerAllocator { Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } - private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN) - throws Exception { + private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node, + MRApp mrApp, Iterator<Task> it, int nextN) throws Exception { Task task; for (int i=0; i<nextN; i++) { task = it.next(); - finishTask(mrApp, task); + finishTask(rmDispatcher, node, mrApp, task); } } - private void finishTask(MRApp mrApp, Task task) throws Exception { + private void finishTask(DrainDispatcher rmDispatcher, MockNM node, + MRApp mrApp, Task task) throws Exception { TaskAttempt attempt = task.getAttempts().values().iterator().next(); + List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1); + contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(), + ContainerState.COMPLETE, "", 0)); + Map<ApplicationId,List<ContainerStatus>> statusUpdate = + new HashMap<ApplicationId,List<ContainerStatus>>(1); + statusUpdate.put(mrApp.getAppID(), contStatus); + node.nodeHeartbeat(statusUpdate, true); + rmDispatcher.await(); mrApp.getContext().getEventHandler().handle( - new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); + new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); mrApp.waitForState(task, TaskState.SUCCEEDED); } @@ -576,21 +585,21 @@ public class TestRMContainerAllocator { Iterator<Task> it = job.getTasks().values().iterator(); // Finish off 1 map so that map-progress is 10% - finishNextNTasks(mrApp, it, 1); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.14f, job.getProgress(), 0.001f); Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); // Finish off 5 more map so that map-progress is 60% - finishNextNTasks(mrApp, it, 5); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.59f, job.getProgress(), 0.001f); Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off remaining map so that map-progress is 100% - finishNextNTasks(mrApp, it, 4); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4); allocator.schedule(); rmDispatcher.await(); Assert.assertEquals(0.95f, job.getProgress(), 0.001f); @@ -1338,6 +1347,18 @@ public class TestRMContainerAllocator { maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); + // verify slow-start still in effect when no more maps need to + // be scheduled but some have yet to complete + allocator.scheduleReduces( + totalMaps, succeededMaps, + 0, scheduledReduces, + totalMaps - succeededMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator, never()).setIsReduceStarted(true); + verify(allocator, never()).scheduleAllReduces(); + succeededMaps = 3; allocator.scheduleReduces( totalMaps, succeededMaps,