Author: bobby
Date: Tue Jun 26 19:24:43 2012
New Revision: 1354185

URL: http://svn.apache.org/viewvc?rev=1354185&view=rev
Log:
svn merge -c 1354181 FIXES: MAPREDUCE-4228. 
mapreduce.job.reduce.slowstart.completedmaps is not working properly (Jason 
Lowe via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1354185&r1=1354184&r2=1354185&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue 
Jun 26 19:24:43 2012
@@ -252,6 +252,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4361. Fix detailed metrics for protobuf-based RPC on 0.23 
     (Jason Lowe via tgraves)
 
+    MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working
+    properly (Jason Lowe via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1354185&r1=1354184&r2=1354185&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 Tue Jun 26 19:24:43 2012
@@ -408,15 +408,6 @@ public class RMContainerAllocator extend
     
     LOG.info("Recalculating schedule...");
     
-    //if all maps are assigned, then ramp up all reduces irrespective of the 
-    //headroom
-    if (scheduledMaps == 0 && numPendingReduces > 0) {
-      LOG.info("All maps assigned. " +
-               "Ramping up all remaining reduces:" + numPendingReduces);
-      scheduleAllReduces();
-      return;
-    }
-    
     //check for slow start
     if (!getIsReduceStarted()) {//not set yet
       int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * 
@@ -432,6 +423,15 @@ public class RMContainerAllocator extend
       }
     }
     
+    //if all maps are assigned, then ramp up all reduces irrespective of the
+    //headroom
+    if (scheduledMaps == 0 && numPendingReduces > 0) {
+      LOG.info("All maps assigned. " +
+          "Ramping up all remaining reduces:" + numPendingReduces);
+      scheduleAllReduces();
+      return;
+    }
+
     float completedMapPercent = 0f;
     if (totalMaps != 0) {//support for 0 maps
       completedMapPercent = (float)completedMaps/totalMaps;
@@ -489,7 +489,8 @@ public class RMContainerAllocator extend
     }
   }
 
-  private void scheduleAllReduces() {
+  @Private
+  public void scheduleAllReduces() {
     for (ContainerRequest req : pendingReduces) {
       scheduledRequests.addReduce(req);
     }

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1354185&r1=1354184&r2=1354185&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
 Tue Jun 26 19:24:43 2012
@@ -18,15 +18,24 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Matchers.anyFloat;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import junit.framework.Assert;
@@ -63,9 +72,10 @@ import org.apache.hadoop.yarn.api.AMRMPr
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -74,13 +84,11 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -426,29 +434,21 @@ public class TestRMContainerAllocator {
 
     // Finish off 1 map.
     Iterator<Task> it = job.getTasks().values().iterator();
-    finishNextNTasks(mrApp, it, 1);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
     allocator.schedule();
     rmDispatcher.await();
     Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
 
     // Finish off 7 more so that map-progress is 80%
-    finishNextNTasks(mrApp, it, 7);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
     allocator.schedule();
     rmDispatcher.await();
     Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
 
     // Finish off the 2 remaining maps
-    finishNextNTasks(mrApp, it, 2);
-
-    // Wait till all reduce-attempts request for containers
-    for (Task t : job.getTasks().values()) {
-      if (t.getType() == TaskType.REDUCE) {
-        mrApp.waitForState(t.getAttempts().values().iterator().next(),
-          TaskAttemptState.UNASSIGNED);
-      }
-    }
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
 
     allocator.schedule();
     rmDispatcher.await();
@@ -465,7 +465,7 @@ public class TestRMContainerAllocator {
     }
 
     // Finish off 2 reduces
-    finishNextNTasks(mrApp, it, 2);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
 
     allocator.schedule();
     rmDispatcher.await();
@@ -473,7 +473,7 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
 
     // Finish off the remaining 8 reduces.
-    finishNextNTasks(mrApp, it, 8);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
     allocator.schedule();
     rmDispatcher.await();
     // Remaining is JobCleanup
@@ -481,19 +481,28 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
   }
 
-  private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN)
-      throws Exception {
+  private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node,
+      MRApp mrApp, Iterator<Task> it, int nextN) throws Exception {
     Task task;
     for (int i=0; i<nextN; i++) {
       task = it.next();
-      finishTask(mrApp, task);
+      finishTask(rmDispatcher, node, mrApp, task);
     }
   }
 
-  private void finishTask(MRApp mrApp, Task task) throws Exception {
+  private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
+      MRApp mrApp, Task task) throws Exception {
     TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
+    
contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(),
+        ContainerState.COMPLETE, "", 0));
+    Map<ApplicationId,List<ContainerStatus>> statusUpdate =
+        new HashMap<ApplicationId,List<ContainerStatus>>(1);
+    statusUpdate.put(mrApp.getAppID(), contStatus);
+    node.nodeHeartbeat(statusUpdate, true);
+    rmDispatcher.await();
     mrApp.getContext().getEventHandler().handle(
-        new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
+          new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
     mrApp.waitForState(task, TaskState.SUCCEEDED);
   }
 
@@ -574,21 +583,21 @@ public class TestRMContainerAllocator {
     Iterator<Task> it = job.getTasks().values().iterator();
 
     // Finish off 1 map so that map-progress is 10%
-    finishNextNTasks(mrApp, it, 1);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
     allocator.schedule();
     rmDispatcher.await();
     Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
 
     // Finish off 5 more map so that map-progress is 60%
-    finishNextNTasks(mrApp, it, 5);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
     allocator.schedule();
     rmDispatcher.await();
     Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
 
     // Finish off remaining map so that map-progress is 100%
-    finishNextNTasks(mrApp, it, 4);
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
     allocator.schedule();
     rmDispatcher.await();
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
@@ -1239,6 +1248,18 @@ public class TestRMContainerAllocator {
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, never()).setIsReduceStarted(true);
     
+    // verify slow-start still in effect when no more maps need to
+    // be scheduled but some have yet to complete
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps,
+        0, scheduledReduces,
+        totalMaps - succeededMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, never()).setIsReduceStarted(true);
+    verify(allocator, never()).scheduleAllReduces();
+
     succeededMaps = 3;
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 


Reply via email to