Author: sseth Date: Fri Aug 31 22:39:10 2012 New Revision: 1379650 URL: http://svn.apache.org/viewvc?rev=1379650&view=rev Log: MAPREDUCE-4625. Statistics logging in the AM scheduler.
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 31 22:39:10 2012 @@ -6,3 +6,5 @@ Branch MR-3902 MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Tsuyoshi OZAWA via sseth) MAPREDUCE-4609. RMContainerAllocator scheduler interval should be configurable. (Tsuyoshi OZAWA via sseth) + + MAPREDUCE-4625. Statistics logging in the AM scheduler. (sseth) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Aug 31 22:39:10 2012 @@ -444,6 +444,7 @@ public class MRAppMaster extends Composi // second timeout before the exit. // TODO XXX: Modify TaskAttemptCleaner to empty it's queue while stopping. public void handle(JobFinishEvent event) { + LOG.info("Handling JobFinished Event"); AMShutdownRunnable r = new AMShutdownRunnable(); Thread t = new Thread(r, "AMShutdownThread"); t.start(); @@ -506,6 +507,8 @@ public class MRAppMaster extends Composi @Override public void run() { maybeSendJobEndNotification(); + // TODO XXX Add a timeout. + LOG.info("Waiting for all containers and TaskAttempts to complete"); while (!allContainersComplete() || !allTaskAttemptsComplete()) { try { synchronized(this) { @@ -516,6 +519,7 @@ public class MRAppMaster extends Composi break; } } + LOG.info("All Containers and TaskAttempts Complete. Stopping services"); stopAM(); LOG.info("AM Shutdown Thread Completing"); } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java Fri Aug 31 22:39:10 2012 @@ -25,8 +25,7 @@ public enum TaskAttemptEventType { //Producer:Task, Speculator TA_SCHEDULE, - TA_RESCHEDULE, - + //Producer: TaskAttemptListener TA_STARTED_REMOTELY, TA_STATUS_UPDATE, Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java Fri Aug 31 22:39:10 2012 @@ -591,7 +591,8 @@ public abstract class TaskImpl implement ++numberUncompletedAttempts; //schedule the nextAttemptNumber - eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(), TaskAttemptEventType.TA_SCHEDULE, failedAttempts > 0)); + eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(), + TaskAttemptEventType.TA_SCHEDULE, failedAttempts > 0)); } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java Fri Aug 31 22:39:10 2012 @@ -1,3 +1,21 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.hadoop.mapreduce.v2.app2.rm; import java.util.List; @@ -9,6 +27,10 @@ public class AMSchedulerEventContainersA private final List<ContainerId> containerIds; private final boolean headRoomChanged; + // TODO XXX: Maybe distinguish between newly allocated containers and + // existing containers being re-used. + // headRoomChanged is a strange API - making an assumption about how the + // scheduler will use this info. public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds, boolean headRoomChanged) { super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED); Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java Fri Aug 31 22:39:10 2012 @@ -6,21 +6,17 @@ public enum AMSchedulerEventType { S_TA_STOP_REQUEST, // Maybe renamed to S_TA_END / S_TA_ABNORMAL_END S_TA_SUCCEEDED, S_TA_ENDED, - + //Producer: RMCommunicator S_CONTAINERS_ALLOCATED, - + //Producer: Container. (Maybe RMCommunicator) S_CONTAINER_COMPLETED, - - // Add events for nodes being blacklisted. - - // TODO XXX - //Producer: RMCommunicator. May not be needed. -// S_CONTAINER_COMPLETED, - - //Producer: RMComm -// S_NODE_UNHEALTHY, -// S_NODE_HEALTHY, - + + //Producer: Node + S_NODE_BLACKLISTED, + S_NODE_UNHEALTHY, + S_NODE_HEALTHY + // The scheduler should have a way of knowing about unusable nodes. Acting on + // this information to change requests etc is scheduler specific. } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Aug 31 22:39:10 2012 @@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest; import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest; +import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer; import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent; import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent; import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType; @@ -158,6 +159,8 @@ public class RMContainerAllocator extend attemptToLaunchRequestMap = new HashMap<TaskAttemptId, AMSchedulerTALaunchRequestEvent>(); private int containersAllocated = 0; + private int newContainerAllocations = 0; + private int existingContainerAllocations = 0; private int containersReleased = 0; private int hostLocalAssigned = 0; private int rackLocalAssigned = 0; @@ -177,7 +180,6 @@ public class RMContainerAllocator extend BlockingQueue<AMSchedulerEvent> eventQueue = new LinkedBlockingQueue<AMSchedulerEvent>(); - @SuppressWarnings("rawtypes") public RMContainerAllocator(RMContainerRequestor requestor, AppContext appContext) { super("RMContainerAllocator"); @@ -211,7 +213,10 @@ public class RMContainerAllocator extend MRJobConfig.MR_AM_SCHEDULER_INTERVAL, MRJobConfig.DEFAULT_MR_AM_SCHEDULER_INTERVAL); shouldReUse = conf.getBoolean("am.scheduler.shouldReuse", false); - LOG.info("XXX: ShouldReUse: " + shouldReUse); + LOG.info("AMSchedulerConfiguration: " + "ReUseEnabled: " + shouldReUse + + ", reduceSlowStart: " + reduceSlowStart + ", maxReduceRampupLimit: " + + maxReduceRampupLimit + ", maxReducePreemptionLimit: " + + maxReducePreemptionLimit); RackResolver.init(conf); } @@ -249,7 +254,8 @@ public class RMContainerAllocator extend scheduleTimer = new Timer("AMSchedulerTimer", true); scheduleTimerTask = new ScheduleTimerTask(); - scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval, scheduleInterval); + scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval, + scheduleInterval); this.job = appContext.getJob(jobId); super.start(); @@ -260,10 +266,10 @@ public class RMContainerAllocator extend this.stopEventHandling = true; if (eventHandlingThread != null) eventHandlingThread.interrupt(); - super.stop(); if (scheduleTimerTask != null) { scheduleTimerTask.stop(); } + super.stop(); LOG.info("Final Scheduler Stats: " + getStat()); } @@ -322,11 +328,10 @@ public class RMContainerAllocator extend throw new YarnException(e); } } - - // TODO XXX: Before and after makeRemoteRequest statistics. -protected synchronized void handleEvent(AMSchedulerEvent sEvent) { - + protected synchronized void handleEvent(AMSchedulerEvent sEvent) { + + LOG.info("XXX: Processing the event " + sEvent.toString()); switch(sEvent.getType()) { // TODO XXX: recalculateReduceSchedule may need to bet set on other events - not just containerAllocated. case S_TA_LAUNCH_REQUEST: @@ -345,21 +350,22 @@ protected synchronized void handleEvent( case S_CONTAINERS_ALLOCATED: handleContainersAllocated((AMSchedulerEventContainersAllocated) sEvent); break; - // No HEALTH_CHANGE events. Not modifying the table based on these. - case S_CONTAINER_COMPLETED: // Maybe use this to reschedule reduces ? + case S_CONTAINER_COMPLETED: //Nothing specific to be done in this scheduler. break; - // Node State Change Event. May want to withdraw requests related to the node, and put - // in fresh requests. - - // Similarly for the case where a node gets blacklisted. - default: + case S_NODE_BLACKLISTED: + // TODO XXX Withdraw requests related to this node and place new ones. + break; + case S_NODE_UNHEALTHY: + // Ignore. RM will not allocated containers on this node. + break; + case S_NODE_HEALTHY: + // Ignore. RM will start allocating containers if there's pending requests. break; } } private void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) { // Add to queue of pending tasks. - LOG.info("Processing the event " + event.toString()); attemptToLaunchRequestMap.put(event.getAttemptID(), event); if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) { mapResourceReqt = maybeComputeNormalizedRequestForType(event, @@ -381,10 +387,8 @@ protected synchronized void handleEvent( } private void handleTaStopRequest(AMSchedulerTAStopRequestEvent event) { - LOG.info("Processing the event " + event.toString()); TaskAttemptId aId = event.getAttemptID(); attemptToLaunchRequestMap.remove(aId); - // XXX Not very efficient. List / check type. boolean removed = pendingReduces.remove(aId); if (!removed) { removed = scheduledRequests.remove(aId); @@ -393,11 +397,16 @@ protected synchronized void handleEvent( ContainerId containerId = assignedRequests.getContainerId(aId); if (containerId != null) { // Ask the container to stop. - sendEvent(new AMContainerEvent(containerId, AMContainerEventType.C_STOP_REQUEST)); - // Inform the Node - the task has asked to be STOPPED / has already stopped. - sendEvent(new AMNodeEventTaskAttemptEnded(containerMap.get(containerId).getContainer().getNodeId(), containerId, event.getAttemptID(), event.failed())); + sendEvent(new AMContainerEvent(containerId, + AMContainerEventType.C_STOP_REQUEST)); + // Inform the Node - the task has asked to be STOPPED / has already + // stopped. + sendEvent(new AMNodeEventTaskAttemptEnded(containerMap + .get(containerId).getContainer().getNodeId(), containerId, + event.getAttemptID(), event.failed())); } else { - LOG.warn("Received a STOP request for absent taskAttempt: " + event.getAttemptID()); + LOG.warn("Received a STOP request for absent taskAttempt: " + + event.getAttemptID()); } } } @@ -409,33 +418,28 @@ protected synchronized void handleEvent( // XXX: Maybe send the request to the task before sending it to the scheduler - the scheduler can then // query the task to figure out whether the taskAttempt is the successfulAttempt - and whether to count it towards the reduce ramp up. // Otherwise -> Job.getCompletedMaps() - will give an out of date picture, since the scheduler event will always be generated before the TaskCompleted event to the job. - - LOG.info("Processing the event " + event.toString()); attemptToLaunchRequestMap.remove(event.getAttemptID()); ContainerId containerId = assignedRequests.remove(event.getAttemptID()); if (containerId != null) { // TODO Should not be null. Confirm. - sendEvent(new AMContainerTASucceededEvent(containerId, event.getAttemptID())); - sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap.get(containerId).getContainer().getNodeId(), containerId, event.getAttemptID())); + sendEvent(new AMContainerTASucceededEvent(containerId, + event.getAttemptID())); + sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap + .get(containerId).getContainer().getNodeId(), containerId, + event.getAttemptID())); containerAvailable(containerId); } else { LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: " + event.getAttemptID() + ". Full event: " + event); } } - + // TODO XXX: Deal with node blacklisting. private void handleContainersAllocated(AMSchedulerEventContainersAllocated event) { - // TODO XXX: Maybe have an event from the Requestor -> saying AllocationChanged -> listOfNewContainers, listOfFinishedContainers (finished containers goes to Containers directly, but should always come in from the RM) - // TODO XXX - /* - * Start allocating containers. Match requests to capabilities. - * Send out Container_START / Container_TA_ASSIGNED events. - */ - // TODO XXX: Logging of the assigned containerIds. - LOG.info("Processing the event " + event.toString()); availableContainerIds.addAll(event.getContainerIds()); + + if (event.didHeadroomChange() || event.getContainerIds().size() > 0) { // TODO XXX -> recaulculateReduceSchedule in case of released containers // .... would imply CONTAINER_COMPLETED messages are required by the Scheduler. @@ -445,6 +449,7 @@ protected synchronized void handleEvent( schedule(); } + // TODO XXX: Deal with node blacklisting. // TODO Override for re-use. @@ -480,6 +485,7 @@ protected synchronized void handleEvent( private synchronized void schedule() { assignContainers(); requestContainers(); + lastScheduleTime = clock.getTime(); } protected void containerAvailable(ContainerId containerId) { @@ -493,17 +499,6 @@ protected synchronized void handleEvent( } } - // TODO Override for container re-use. -// protected void containerAvailable(ContainerId containerId) { -// // For now releasing the container. -// // allocatedContainerIds.add(containerId); -// sendEvent(new AMContainerEvent(containerId, -// AMContainerEventType.C_STOP_REQUEST)); -// // XXX A release should not be required. Only required when a container -// // cannot be assigned, or if there's an explicit request to stop the container, -// // in which case the release request will go out from the container itself. -// } - @SuppressWarnings("unchecked") private int maybeComputeNormalizedRequestForType( AMSchedulerTALaunchRequestEvent event, TaskType taskType, @@ -704,11 +699,14 @@ protected synchronized void handleEvent( " AssignedReduces:" + assignedRequests.reduces.size() + " completedMaps:" + getJob().getCompletedMaps() + " completedReduces:" + getJob().getCompletedReduces() + - " containersAllocated:" + containersAllocated + + " containersAllocated:" + containersAllocated + //Not super useful. + " newContainersAllocated: " + newContainerAllocations + + " existingContainersAllocated: " + existingContainerAllocations + " containersReleased:" + containersReleased + " hostLocalAssigned:" + hostLocalAssigned + " rackLocalAssigned:" + rackLocalAssigned + " availableResources(headroom):" + requestor.getAvailableResources(); + // TODO (Post 3902): Can hostLocal/rackLocal be handled elsewhere. } @@ -719,11 +717,17 @@ protected synchronized void handleEvent( @Private public int getMemLimit() { - int headRoom = requestor.getAvailableResources() != null ? requestor.getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceReqt + - assignedRequests.reduces.size() * reduceResourceReqt; + int headRoom = requestor.getAvailableResources() != null ? requestor + .getAvailableResources().getMemory() : 0; + return headRoom + assignedRequests.maps.size() * mapResourceReqt + + assignedRequests.reduces.size() * reduceResourceReqt; } + + /** + * Tracks attempts for which a Container ask has been sent to the + * RMCommunicator. + */ private class ScheduledRequests { private final LinkedList<TaskAttemptId> earlierFailedMaps = @@ -820,7 +824,13 @@ protected synchronized void handleEvent( containersAllocated += allocatedContainerIds.size(); while (it.hasNext()) { ContainerId containerId = it.next(); - Container allocated = containerMap.get(containerId).getContainer(); + AMContainer amContainer = containerMap.get(containerId); + Container allocated = amContainer.getContainer(); + if (amContainer.getState() == AMContainerState.ALLOCATED) { + newContainerAllocations++; + } else { + existingContainerAllocations++; + } if (LOG.isDebugEnabled()) { LOG.debug("Assigning container " + allocated.getId() + " with priority " + allocated.getPriority() + " to NM " @@ -1108,6 +1118,9 @@ protected synchronized void handleEvent( } } + /** + * Tracks TaskAttempts which have been assigned a Container. + */ private class AssignedRequests { private final LinkedHashMap<TaskAttemptId, Container> maps = new LinkedHashMap<TaskAttemptId, Container>(); @@ -1159,6 +1172,7 @@ protected synchronized void handleEvent( } // TODO XXX Check where all this is being used. + // XXX: Likely needed in case of TA failed / killed / terminated as well. // Old code was removing when CONTAINER_COMPLETED was received fromthe RM. ContainerId remove(TaskAttemptId tId) { ContainerId containerId = null; Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1379650&r1=1379649&r2=1379650&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Fri Aug 31 22:39:10 2012 @@ -301,9 +301,19 @@ public class RMContainerRequestor extend } } + private String getStat() { + StringBuilder sb = new StringBuilder(); + sb.append("ContainersAllocated: ").append(numContainersAllocated) + .append(", ContainersFinished: ").append(numFinishedContainers) + .append(", NumContainerReleaseRequests: ") + .append(numContainerReleaseRequests); + return sb.toString(); + } + @SuppressWarnings("unchecked") @Override protected void heartbeat() throws Exception { + LOG.info("BeforeHeartbeat: " + getStat()); int headRoom = getAvailableResources() != null ? getAvailableResources() .getMemory() : 0;// first time it would be null AMResponse response = errorCheckedMakeRemoteRequest(); @@ -322,6 +332,8 @@ public class RMContainerRequestor extend List<NodeReport> updatedNodeReports = response.getUpdatedNodes(); logUpdatedNodes(updatedNodeReports); + LOG.info("AfterHeartbeat: " + getStat()); + // Inform the Containers about completion.. for (ContainerStatus c : finishedContainers) { eventHandler.handle(new AMContainerEventReleased(c)); @@ -420,6 +432,7 @@ public class RMContainerRequestor extend RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent) rawEvent; releaseLock.lock(); try { + // TODO XXX: Currently the RM does not handle release requests for RUNNING containers. numContainerReleaseRequests++; release.add(event.getContainerId()); } finally { @@ -506,7 +519,7 @@ public class RMContainerRequestor extend private void logFinishedContainers(List<ContainerStatus> finishedContainers) { if (finishedContainers.size() > 0) { - LOG.info(finishedContainers.size() + " finished"); + LOG.info(finishedContainers.size() + " containers finished"); for (ContainerStatus cs : finishedContainers) { LOG.info("FinihsedContainer: " + cs); }