Author: tgraves Date: Tue Nov 13 23:50:23 2012 New Revision: 1409035 URL: http://svn.apache.org/viewvc?rev=1409035&view=rev Log: merge -r 1409031:1409032 from trunk. FIXES: MAPREDUCE-4517
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1409035&r1=1409034&r2=1409035&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Nov 13 23:50:23 2012 @@ -96,6 +96,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by default (Ravi Prakash via bobby) + + MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat + (Jason Lowe via tgraves) Release 0.23.4 - UNRELEASED Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1409035&r1=1409034&r2=1409035&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Nov 13 23:50:23 2012 @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -140,6 +141,8 @@ public class RMContainerAllocator extend BlockingQueue<ContainerAllocatorEvent> eventQueue = new LinkedBlockingQueue<ContainerAllocatorEvent>(); + private ScheduleStats scheduleStats = new ScheduleStats(); + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); this.stopped = new AtomicBoolean(false); @@ -203,13 +206,10 @@ public class RMContainerAllocator extend @Override protected synchronized void heartbeat() throws Exception { - LOG.info("Before Scheduling: " + getStat()); + scheduleStats.updateAndLogIfChanged("Before Scheduling: "); List<Container> allocatedContainers = getResources(); - LOG.info("After Scheduling: " + getStat()); if (allocatedContainers.size() > 0) { - LOG.info("Before Assign: " + getStat()); scheduledRequests.assign(allocatedContainers); - LOG.info("After Assign: " + getStat()); } int completedMaps = getJob().getCompletedMaps(); @@ -230,6 +230,8 @@ public class RMContainerAllocator extend maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; } + + scheduleStats.updateAndLogIfChanged("After Scheduling: "); } @Override @@ -240,7 +242,7 @@ public class RMContainerAllocator extend } eventHandlingThread.interrupt(); super.stop(); - LOG.info("Final Stats: " + getStat()); + scheduleStats.log("Final Stats: "); } public boolean getIsReduceStarted() { @@ -422,7 +424,9 @@ public class RMContainerAllocator extend return; } - LOG.info("Recalculating schedule..."); + int headRoom = getAvailableResources() != null ? + getAvailableResources().getMemory() : 0; + LOG.info("Recalculating schedule, headroom=" + headRoom); //check for slow start if (!getIsReduceStarted()) {//not set yet @@ -531,24 +535,6 @@ public class RMContainerAllocator extend } } - /** - * Synchronized to avoid findbugs warnings - */ - private synchronized String getStat() { - return "PendingReduces:" + pendingReduces.size() + - " ScheduledMaps:" + scheduledRequests.maps.size() + - " ScheduledReduces:" + scheduledRequests.reduces.size() + - " AssignedMaps:" + assignedRequests.maps.size() + - " AssignedReduces:" + assignedRequests.reduces.size() + - " completedMaps:" + getJob().getCompletedMaps() + - " completedReduces:" + getJob().getCompletedReduces() + - " containersAllocated:" + containersAllocated + - " containersReleased:" + containersReleased + - " hostLocalAssigned:" + hostLocalAssigned + - " rackLocalAssigned:" + rackLocalAssigned + - " availableResources(headroom):" + getAvailableResources(); - } - @SuppressWarnings("unchecked") private List<Container> getResources() throws Exception { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null @@ -590,6 +576,9 @@ public class RMContainerAllocator extend if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { //something changed recalculateReduceSchedule = true; + if (LOG.isDebugEnabled() && headRoom != newHeadRoom) { + LOG.debug("headroom=" + newHeadRoom); + } } if (LOG.isDebugEnabled()) { @@ -1064,4 +1053,60 @@ public class RMContainerAllocator extend } } } + + private class ScheduleStats { + int numPendingReduces; + int numScheduledMaps; + int numScheduledReduces; + int numAssignedMaps; + int numAssignedReduces; + int numCompletedMaps; + int numCompletedReduces; + int numContainersAllocated; + int numContainersReleased; + + public void updateAndLogIfChanged(String msgPrefix) { + boolean changed = false; + + // synchronized to fix findbug warnings + synchronized (RMContainerAllocator.this) { + changed |= (numPendingReduces != pendingReduces.size()); + numPendingReduces = pendingReduces.size(); + changed |= (numScheduledMaps != scheduledRequests.maps.size()); + numScheduledMaps = scheduledRequests.maps.size(); + changed |= (numScheduledReduces != scheduledRequests.reduces.size()); + numScheduledReduces = scheduledRequests.reduces.size(); + changed |= (numAssignedMaps != assignedRequests.maps.size()); + numAssignedMaps = assignedRequests.maps.size(); + changed |= (numAssignedReduces != assignedRequests.reduces.size()); + numAssignedReduces = assignedRequests.reduces.size(); + changed |= (numCompletedMaps != getJob().getCompletedMaps()); + numCompletedMaps = getJob().getCompletedMaps(); + changed |= (numCompletedReduces != getJob().getCompletedReduces()); + numCompletedReduces = getJob().getCompletedReduces(); + changed |= (numContainersAllocated != containersAllocated); + numContainersAllocated = containersAllocated; + changed |= (numContainersReleased != containersReleased); + numContainersReleased = containersReleased; + } + + if (changed) { + log(msgPrefix); + } + } + + public void log(String msgPrefix) { + LOG.info(msgPrefix + "PendingReds:" + numPendingReduces + + " ScheduledMaps:" + numScheduledMaps + + " ScheduledReds:" + numScheduledReduces + + " AssignedMaps:" + numAssignedMaps + + " AssignedReds:" + numAssignedReduces + + " CompletedMaps:" + numCompletedMaps + + " CompletedReds:" + numCompletedReduces + + " ContAlloc:" + numContainersAllocated + + " ContRel:" + numContainersReleased + + " HostLocal:" + hostLocalAssigned + + " RackLocal:" + rackLocalAssigned); + } + } }