Author: sseth Date: Wed Aug 29 01:46:47 2012 New Revision: 1378420 URL: http://svn.apache.org/viewvc?rev=1378420&view=rev Log: MAPREDUCE-4602. [MR-3902] Re-create ask list correctly in case of a temporary error in the AM-RM allocate call (sseth)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1378420&r1=1378419&r2=1378420&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Wed Aug 29 01:46:47 2012 @@ -1,2 +1,4 @@ Branch MR-3902 MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase (Tsuyoshi OZAWA via sseth) + + MAPREDUCE-4602. Re-create ask list correctly in case of a temporary error in the AM-RM allocate call (sseth) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1378420&r1=1378419&r2=1378420&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java Wed Aug 29 01:46:47 2012 @@ -132,7 +132,7 @@ public abstract class RMCommunicator ext return this.job.getProgress(); } - // TODO XXX: Get rid of the dependencies on the ClientService. + // TODO (After 3902): Get rid of the dependencies on the ClientService. protected void register() { //Register InetSocketAddress serviceAddr = clientService.getBindAddress(); Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1378420&r1=1378419&r2=1378420&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Wed Aug 29 01:46:47 2012 @@ -30,6 +30,7 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.app2.AppContext; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.util.Recor /** * Keeps the data structures to send container requests to RM. */ +// TODO XXX: Eventually rename to RMCommunicator public class RMContainerRequestor extends RMCommunicator implements EventHandler<RMCommunicatorEvent> { private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); @@ -74,10 +76,7 @@ public class RMContainerRequestor extend private int numContainersAllocated; private int numFinishedContainers; // Not very useful. - // TODO XXX: Lots of cleanup. - - // TODO XXX: Maintain some statistics on containers allocated, released etc. - + //Key -> Priority //Value -> Map // Key->ResourceName (e.g., hostname, rackname, *) @@ -89,20 +88,19 @@ public class RMContainerRequestor extend new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>(); private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(); - private Set<ContainerId> release = new TreeSet<ContainerId>(); + private final Set<ContainerId> release = new TreeSet<ContainerId>(); private Lock releaseLock = new ReentrantLock(); private Lock askLock = new ReentrantLock(); private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0); private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>(); - // TODO XXX: May need to pass this to the NodeManager. + // TODO XXX: May need to pass this to the AMNodeMap private int clusterNmCount = 0; // TODO XXX Consider allowing sync comm between the requestor and allocator... - // TODO XXX: Why does the RMRequestor require the ClientService ?? - // TODO XXX: Get rid of the clock. Available from the context. + // TODO (after 3902): Why does the RMRequestor require the ClientService ?? (for the RPC address. get rid of this.) public RMContainerRequestor(ClientService clientService, AppContext context) { super(clientService, context); this.clock = context.getClock(); @@ -138,12 +136,12 @@ public class RMContainerRequestor extend @Override public void init(Configuration conf) { super.init(conf); - retrystartTime = clock.getTime(); - retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, + retryInterval = getConfig().getLong( + MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); } - + public void stop(Configuration conf) { LOG.info("NumAllocatedContainers: " + numContainersAllocated + "NumFinihsedContainers: " + numFinishedContainers @@ -188,6 +186,11 @@ public class RMContainerRequestor extend private void addResourceRequest(Priority priority, String resourceName, Resource capability) { + addResourceRequest(priority, resourceName, capability, 1); + } + + private void addResourceRequest(Priority priority, String resourceName, + Resource capability, int increment) { Map<String, Map<Resource, ResourceRequest>> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -211,7 +214,8 @@ public class RMContainerRequestor extend remoteRequest.setNumContainers(0); reqMap.put(capability, remoteRequest); } - remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); + remoteRequest.setNumContainers(remoteRequest.getNumContainers() + increment); + // 0 is a special case to re-add the request to the ask table. // Note this down for next interaction with ResourceManager int askSize = 0; @@ -300,15 +304,18 @@ public class RMContainerRequestor extend @SuppressWarnings("unchecked") @Override protected void heartbeat() throws Exception { - int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null + int headRoom = getAvailableResources() != null ? getAvailableResources() + .getMemory() : 0;// first time it would be null AMResponse response = errorCheckedMakeRemoteRequest(); - int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; + int newHeadRoom = getAvailableResources() != null ? getAvailableResources() + .getMemory() : 0; List<Container> newContainers = response.getAllocatedContainers(); logNewContainers(newContainers); numContainersAllocated += newContainers.size(); - List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses(); + List<ContainerStatus> finishedContainers = response + .getCompletedContainersStatuses(); logFinishedContainers(finishedContainers); numFinishedContainers += finishedContainers.size(); @@ -325,9 +332,9 @@ public class RMContainerRequestor extend if (newContainers.size() > 0) { newContainerIds = new ArrayList<ContainerId>(newContainers.size()); for (Container container : newContainers) { + // TODO XXX Re-factor AMNodes and AMContainers. context.getAllContainers().addNewContainer(container); - newContainerIds.add(container.getId()); - // TODO XXX: Maybe send this out as an Asynchrounous event ? | Scheduler events will also have to be async in that case, and the order is critical. + newContainerIds.add(container.getId()); context.getAllNodes().nodeSeen(container.getNodeId()); } eventHandler.handle(new AMSchedulerEventContainersAllocated( @@ -359,7 +366,6 @@ public class RMContainerRequestor extend JobEventType.INTERNAL_ERROR)); throw new YarnException("Could not contact RM after " + retryInterval + " milliseconds."); - // TODO XXX: Some changes to the exception handling -> e is ignored, YarnException causes an exit. } // Throw this up to the caller, which may decide to ignore it and // continue to attempt to contact the RM. @@ -411,7 +417,7 @@ public class RMContainerRequestor extend public void handle(RMCommunicatorEvent rawEvent) { switch(rawEvent.getType()) { case CONTAINER_DEALLOCATE: - RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent)rawEvent; + RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent) rawEvent; releaseLock.lock(); try { numContainerReleaseRequests++; @@ -472,15 +478,22 @@ public class RMContainerRequestor extend } askLock.lock(); try { - ask.addAll(clonedAskList); - // TODO XXX: Asks cannot be populated like this. Would be better to - // iterate over the asks and get corresponding requests from the main - // table. + // Asks for a particular ressource could have changed (increased or + // decresed) during the failure. Re-pull the list from the + // remoteRequestTable. ask being a hashSet and using the same objects + // avoids duplicates. + rePopulateAskList(clonedAskList); } finally { askLock.unlock(); } } + private void rePopulateAskList(List<ResourceRequest> clonedAskList) { + for (ResourceRequest rr : clonedAskList) { + addResourceRequest(rr.getPriority(), rr.getHostName(), + rr.getCapability(), 0); + } + } private void logNewContainers(List<Container> newContainers) { if (newContainers.size() > 0) { @@ -508,4 +521,19 @@ public class RMContainerRequestor extend } } } + + @Private + Map<Priority, Map<String, Map<Resource, ResourceRequest>>> getRemoteRequestTable() { + return remoteRequestsTable; + } + + @Private + Set<ResourceRequest> getAskSet() { + return ask; + } + + @Private + Set<ContainerId> getReleaseSet() { + return release; + } }