Author: sseth
Date: Fri Aug 31 22:41:45 2012
New Revision: 1379654

URL: http://svn.apache.org/viewvc?rev=1379654&view=rev
Log:
MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437 (sseth)

Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1379654&r1=1379653&r2=1379654&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 
Fri Aug 31 22:41:45 2012
@@ -8,3 +8,5 @@ Branch MR-3902
   MAPREDUCE-4609. RMContainerAllocator scheduler interval should be 
configurable. (Tsuyoshi OZAWA via sseth)
 
   MAPREDUCE-4625. Statistics logging in the AM scheduler. (sseth)
+
+  MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437. (sseth)

Modified: 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1379654&r1=1379653&r2=1379654&view=diff
==============================================================================
--- 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
 Fri Aug 31 22:41:45 2012
@@ -123,6 +123,9 @@ public class RMContainerAllocator extend
   Timer scheduleTimer;
   ScheduleTimerTask scheduleTimerTask;
   private long lastScheduleTime = 0l;
+  private int lastCompletedTasks = 0;
+  private int completedMaps = 0;
+  private int completedReduces = 0;
   
   /*
   Vocabulary Used: 
@@ -287,13 +290,9 @@ public class RMContainerAllocator extend
 
     @Override
     public void run() {
-      // TODO XXX XXX: Reduces are not being shceduled. Forcing them via this 
for now. Figure out when reduce schedule should be recomputed.
-      // TODO XXX. Does this need to be stopped before the service stop()
       if (clock.getTime() - lastScheduleTime > scheduleInterval && shouldRun) {
         handle(new AMSchedulerEventContainersAllocated(
-            Collections.<ContainerId> emptyList(), true));
-        // Sending a false. Just try to flush available containers.
-        // The decision to schedule reduces may need to be based on available 
containers.
+            Collections.<ContainerId> emptyList(), false));
       }
     }
 
@@ -330,13 +329,12 @@ public class RMContainerAllocator extend
   }
 
   protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
-
+    // Recalculating reduce schedule here since it's required for most events.
+    recalculateReduceSchedule = true;
     LOG.info("XXX: Processing the event " + sEvent.toString());
     switch(sEvent.getType()) {
-    // TODO XXX: recalculateReduceSchedule may need to bet set on other events 
- not just containerAllocated.
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
-      // Add to queue of pending tasks.
       break;
     case S_TA_STOP_REQUEST: //Effectively means a failure.
       handleTaStopRequest((AMSchedulerTAStopRequestEvent)sEvent);
@@ -413,11 +411,6 @@ public class RMContainerAllocator extend
   }
   
   private void handleTaSucceededRequest(AMSchedulerTASucceededEvent event) {
-    // TODO XXX Part of re-use.
-    // TODO XXX Also may change after state machines are finalized.
-    // XXX: Maybe send the request to the task before sending it to the 
scheduler - the scheduler can then
-    //  query the task to figure out whether the taskAttempt is the 
successfulAttempt - and whether to count it towards the reduce ramp up.
-    //  Otherwise -> Job.getCompletedMaps() - will give an out of date 
picture, since the scheduler event will always be generated before the 
TaskCompleted event to the job.
     attemptToLaunchRequestMap.remove(event.getAttemptID());
     ContainerId containerId = assignedRequests.remove(event.getAttemptID());
     if (containerId != null) { // TODO Should not be null. Confirm.
@@ -435,15 +428,20 @@ public class RMContainerAllocator extend
 
   // TODO XXX: Deal with node blacklisting.
   
-  private void handleContainersAllocated(AMSchedulerEventContainersAllocated 
event) {
-
+  private void handleContainersAllocated(
+      AMSchedulerEventContainersAllocated event) {
     availableContainerIds.addAll(event.getContainerIds());
-  
+
+    completedMaps = getJob().getCompletedMaps();
+    completedReduces = getJob().getCompletedReduces();
+    int completedTasks = completedMaps + completedReduces;
+
+    if (lastCompletedTasks != completedTasks) {
+      recalculateReduceSchedule = true;
+      lastCompletedTasks = completedTasks;
+    }
 
     if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
-      // TODO XXX -> recaulculateReduceSchedule in case of released containers
-      // .... would imply CONTAINER_COMPLETED messages are required by the 
Scheduler.
-      // ContainerReleased == headroomChange ?
       recalculateReduceSchedule = true;
     }
     schedule();
@@ -697,8 +695,8 @@ public class RMContainerAllocator extend
         " ScheduledReduces:" + scheduledRequests.reduces.size() +
         " AssignedMaps:" + assignedRequests.maps.size() + 
         " AssignedReduces:" + assignedRequests.reduces.size() +
-        " completedMaps:" + getJob().getCompletedMaps() + 
-        " completedReduces:" + getJob().getCompletedReduces() +
+        " completedMaps:" + completedMaps + 
+        " completedReduces:" + completedReduces +
         " containersAllocated:" + containersAllocated + //Not super useful.
         " newContainersAllocated: " + newContainerAllocations +
         " existingContainersAllocated: " + existingContainerAllocations +


Reply via email to