Author: sseth Date: Fri Aug 31 22:41:45 2012 New Revision: 1379654 URL: http://svn.apache.org/viewvc?rev=1379654&view=rev Log: MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437 (sseth)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1379654&r1=1379653&r2=1379654&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 31 22:41:45 2012 @@ -8,3 +8,5 @@ Branch MR-3902 MAPREDUCE-4609. RMContainerAllocator scheduler interval should be configurable. (Tsuyoshi OZAWA via sseth) MAPREDUCE-4625. Statistics logging in the AM scheduler. (sseth) + + MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437. (sseth) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1379654&r1=1379653&r2=1379654&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Aug 31 22:41:45 2012 @@ -123,6 +123,9 @@ public class RMContainerAllocator extend Timer scheduleTimer; ScheduleTimerTask scheduleTimerTask; private long lastScheduleTime = 0l; + private int lastCompletedTasks = 0; + private int completedMaps = 0; + private int completedReduces = 0; /* Vocabulary Used: @@ -287,13 +290,9 @@ public class RMContainerAllocator extend @Override public void run() { - // TODO XXX XXX: Reduces are not being shceduled. Forcing them via this for now. Figure out when reduce schedule should be recomputed. - // TODO XXX. Does this need to be stopped before the service stop() if (clock.getTime() - lastScheduleTime > scheduleInterval && shouldRun) { handle(new AMSchedulerEventContainersAllocated( - Collections.<ContainerId> emptyList(), true)); - // Sending a false. Just try to flush available containers. - // The decision to schedule reduces may need to be based on available containers. + Collections.<ContainerId> emptyList(), false)); } } @@ -330,13 +329,12 @@ public class RMContainerAllocator extend } protected synchronized void handleEvent(AMSchedulerEvent sEvent) { - + // Recalculating reduce schedule here since it's required for most events. + recalculateReduceSchedule = true; LOG.info("XXX: Processing the event " + sEvent.toString()); switch(sEvent.getType()) { - // TODO XXX: recalculateReduceSchedule may need to bet set on other events - not just containerAllocated. case S_TA_LAUNCH_REQUEST: handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent); - // Add to queue of pending tasks. break; case S_TA_STOP_REQUEST: //Effectively means a failure. handleTaStopRequest((AMSchedulerTAStopRequestEvent)sEvent); @@ -413,11 +411,6 @@ public class RMContainerAllocator extend } private void handleTaSucceededRequest(AMSchedulerTASucceededEvent event) { - // TODO XXX Part of re-use. - // TODO XXX Also may change after state machines are finalized. - // XXX: Maybe send the request to the task before sending it to the scheduler - the scheduler can then - // query the task to figure out whether the taskAttempt is the successfulAttempt - and whether to count it towards the reduce ramp up. - // Otherwise -> Job.getCompletedMaps() - will give an out of date picture, since the scheduler event will always be generated before the TaskCompleted event to the job. attemptToLaunchRequestMap.remove(event.getAttemptID()); ContainerId containerId = assignedRequests.remove(event.getAttemptID()); if (containerId != null) { // TODO Should not be null. Confirm. @@ -435,15 +428,20 @@ public class RMContainerAllocator extend // TODO XXX: Deal with node blacklisting. - private void handleContainersAllocated(AMSchedulerEventContainersAllocated event) { - + private void handleContainersAllocated( + AMSchedulerEventContainersAllocated event) { availableContainerIds.addAll(event.getContainerIds()); - + + completedMaps = getJob().getCompletedMaps(); + completedReduces = getJob().getCompletedReduces(); + int completedTasks = completedMaps + completedReduces; + + if (lastCompletedTasks != completedTasks) { + recalculateReduceSchedule = true; + lastCompletedTasks = completedTasks; + } if (event.didHeadroomChange() || event.getContainerIds().size() > 0) { - // TODO XXX -> recaulculateReduceSchedule in case of released containers - // .... would imply CONTAINER_COMPLETED messages are required by the Scheduler. - // ContainerReleased == headroomChange ? recalculateReduceSchedule = true; } schedule(); @@ -697,8 +695,8 @@ public class RMContainerAllocator extend " ScheduledReduces:" + scheduledRequests.reduces.size() + " AssignedMaps:" + assignedRequests.maps.size() + " AssignedReduces:" + assignedRequests.reduces.size() + - " completedMaps:" + getJob().getCompletedMaps() + - " completedReduces:" + getJob().getCompletedReduces() + + " completedMaps:" + completedMaps + + " completedReduces:" + completedReduces + " containersAllocated:" + containersAllocated + //Not super useful. " newContainersAllocated: " + newContainerAllocations + " existingContainersAllocated: " + existingContainerAllocations +