Author: jlowe Date: Fri Feb 8 23:09:04 2013 New Revision: 1444276 URL: http://svn.apache.org/r1444276 Log: MAPREDUCE-4893. MR AppMaster can do sub-optimal assignment of containers to map tasks leading to poor node locality. Contributed by Bikas Saha
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1444276&r1=1444275&r2=1444276&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Feb 8 23:09:04 2013 @@ -56,6 +56,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4671. AM does not tell the RM about container requests which are no longer needed. (Bikas Saha via tgraves) + MAPREDUCE-4893. MR AppMaster can do sub-optimal assignment of containers + to map tasks leading to poor node locality (Bikas Saha via jlowe) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1444276&r1=1444275&r2=1444276&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Feb 8 23:09:04 2013 @@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -703,7 +702,7 @@ public class RMContainerAllocator extend addContainerReq(req); } - @SuppressWarnings("unchecked") + // this method will change the list of allocatedContainers. private void assign(List<Container> allocatedContainers) { Iterator<Container> it = allocatedContainers.iterator(); LOG.info("Got allocated containers " + allocatedContainers.size()); @@ -744,85 +743,97 @@ public class RMContainerAllocator extend + reduces.isEmpty()); isAssignable = false; } - } + } else { + LOG.warn("Container allocated at unwanted priority: " + priority + + ". Returning to RM..."); + isAssignable = false; + } - boolean blackListed = false; - ContainerRequest assigned = null; + if(!isAssignable) { + // release container if we could not assign it + containerNotAssigned(allocated); + it.remove(); + continue; + } - ContainerId allocatedContainerId = allocated.getId(); - if (isAssignable) { - // do not assign if allocated container is on a - // blacklisted host - String allocatedHost = allocated.getNodeId().getHost(); - blackListed = isNodeBlacklisted(allocatedHost); - if (blackListed) { - // we need to request for a new container - // and release the current one - LOG.info("Got allocated container on a blacklisted " - + " host "+allocatedHost - +". Releasing container " + allocated); - - // find the request matching this allocated container - // and replace it with a new one - ContainerRequest toBeReplacedReq = - getContainerReqToReplace(allocated); - if (toBeReplacedReq != null) { - LOG.info("Placing a new container request for task attempt " - + toBeReplacedReq.attemptID); - ContainerRequest newReq = - getFilteredContainerRequest(toBeReplacedReq); - decContainerReq(toBeReplacedReq); - if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == - TaskType.MAP) { - maps.put(newReq.attemptID, newReq); - } - else { - reduces.put(newReq.attemptID, newReq); - } - addContainerReq(newReq); + // do not assign if allocated container is on a + // blacklisted host + String allocatedHost = allocated.getNodeId().getHost(); + if (isNodeBlacklisted(allocatedHost)) { + // we need to request for a new container + // and release the current one + LOG.info("Got allocated container on a blacklisted " + + " host "+allocatedHost + +". Releasing container " + allocated); + + // find the request matching this allocated container + // and replace it with a new one + ContainerRequest toBeReplacedReq = + getContainerReqToReplace(allocated); + if (toBeReplacedReq != null) { + LOG.info("Placing a new container request for task attempt " + + toBeReplacedReq.attemptID); + ContainerRequest newReq = + getFilteredContainerRequest(toBeReplacedReq); + decContainerReq(toBeReplacedReq); + if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == + TaskType.MAP) { + maps.put(newReq.attemptID, newReq); } else { - LOG.info("Could not map allocated container to a valid request." - + " Releasing allocated container " + allocated); + reduces.put(newReq.attemptID, newReq); } + addContainerReq(newReq); } else { - assigned = assign(allocated); - if (assigned != null) { - // Update resource requests - decContainerReq(assigned); - - // send the container-assigned event to task attempt - eventHandler.handle(new TaskAttemptContainerAssignedEvent( - assigned.attemptID, allocated, applicationACLs)); - - assignedRequests.add(allocatedContainerId, assigned.attemptID); - - if (LOG.isDebugEnabled()) { - LOG.info("Assigned container (" + allocated + ") " - + " to task " + assigned.attemptID + " on node " - + allocated.getNodeId().toString()); - } - } - else { - //not assigned to any request, release the container - LOG.info("Releasing unassigned and invalid container " - + allocated + ". RM has gone crazy, someone go look!" - + " Hey RM, if you are so rich, go donate to non-profits!"); - } + LOG.info("Could not map allocated container to a valid request." + + " Releasing allocated container " + allocated); } + + // release container if we could not assign it + containerNotAssigned(allocated); + it.remove(); + continue; } - - // release container if it was blacklisted - // or if we could not assign it - if (blackListed || assigned == null) { - containersReleased++; - release(allocatedContainerId); - } + } + + assignContainers(allocatedContainers); + + // release container if we could not assign it + it = allocatedContainers.iterator(); + while (it.hasNext()) { + Container allocated = it.next(); + LOG.info("Releasing unassigned and invalid container " + + allocated + ". RM may have assignment issues"); + containerNotAssigned(allocated); } } - private ContainerRequest assign(Container allocated) { + @SuppressWarnings("unchecked") + private void containerAssigned(Container allocated, + ContainerRequest assigned) { + // Update resource requests + decContainerReq(assigned); + + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + assigned.attemptID, allocated, applicationACLs)); + + assignedRequests.add(allocated.getId(), assigned.attemptID); + + if (LOG.isDebugEnabled()) { + LOG.info("Assigned container (" + allocated + ") " + + " to task " + assigned.attemptID + " on node " + + allocated.getNodeId().toString()); + } + } + + private void containerNotAssigned(Container allocated) { + containersReleased++; + release(allocated.getId()); + } + + private ContainerRequest assignWithoutLocality(Container allocated) { ContainerRequest assigned = null; Priority priority = allocated.getPriority(); @@ -834,18 +845,24 @@ public class RMContainerAllocator extend LOG.debug("Assigning container " + allocated + " to reduce"); } assigned = assignToReduce(allocated); - } else if (PRIORITY_MAP.equals(priority)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assigning container " + allocated + " to map"); - } - assigned = assignToMap(allocated); - } else { - LOG.warn("Container allocated at unwanted priority: " + priority + - ". Returning to RM..."); } return assigned; } + + private void assignContainers(List<Container> allocatedContainers) { + Iterator<Container> it = allocatedContainers.iterator(); + while (it.hasNext()) { + Container allocated = it.next(); + ContainerRequest assigned = assignWithoutLocality(allocated); + if (assigned != null) { + containerAssigned(allocated, assigned); + it.remove(); + } + } + + assignMapsWithLocality(allocatedContainers); + } private ContainerRequest getContainerReqToReplace(Container allocated) { LOG.info("Finding containerReq for allocated container: " + allocated); @@ -916,11 +933,15 @@ public class RMContainerAllocator extend } @SuppressWarnings("unchecked") - private ContainerRequest assignToMap(Container allocated) { - //try to assign to maps if present - //first by host, then by rack, followed by * - ContainerRequest assigned = null; - while (assigned == null && maps.size() > 0) { + private void assignMapsWithLocality(List<Container> allocatedContainers) { + // try to assign to all nodes first to match node local + Iterator<Container> it = allocatedContainers.iterator(); + while(it.hasNext() && maps.size() > 0){ + Container allocated = it.next(); + Priority priority = allocated.getPriority(); + assert PRIORITY_MAP.equals(priority); + // "if (maps.containsKey(tId))" below should be almost always true. + // hence this while loop would almost always have O(1) complexity String host = allocated.getNodeId().getHost(); LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); while (list != null && list.size() > 0) { @@ -929,7 +950,9 @@ public class RMContainerAllocator extend } TaskAttemptId tId = list.removeFirst(); if (maps.containsKey(tId)) { - assigned = maps.remove(tId); + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); @@ -941,39 +964,56 @@ public class RMContainerAllocator extend break; } } - if (assigned == null) { - String rack = RackResolver.resolve(host).getNetworkLocation(); - list = mapsRackMapping.get(rack); - while (list != null && list.size() > 0) { - TaskAttemptId tId = list.removeFirst(); - if (maps.containsKey(tId)) { - assigned = maps.remove(tId); - JobCounterUpdateEvent jce = - new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); - eventHandler.handle(jce); - rackLocalAssigned++; - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned based on rack match " + rack); - } - break; - } - } - if (assigned == null && maps.size() > 0) { - TaskAttemptId tId = maps.keySet().iterator().next(); - assigned = maps.remove(tId); + } + + // try to match all rack local + it = allocatedContainers.iterator(); + while(it.hasNext() && maps.size() > 0){ + Container allocated = it.next(); + Priority priority = allocated.getPriority(); + assert PRIORITY_MAP.equals(priority); + // "if (maps.containsKey(tId))" below should be almost always true. + // hence this while loop would almost always have O(1) complexity + String host = allocated.getNodeId().getHost(); + String rack = RackResolver.resolve(host).getNetworkLocation(); + LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); + while (list != null && list.size() > 0) { + TaskAttemptId tId = list.removeFirst(); + if (maps.containsKey(tId)) { + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); + jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); eventHandler.handle(jce); + rackLocalAssigned++; if (LOG.isDebugEnabled()) { - LOG.debug("Assigned based on * match"); + LOG.debug("Assigned based on rack match " + rack); } break; } } } - return assigned; + + // assign remaining + it = allocatedContainers.iterator(); + while(it.hasNext() && maps.size() > 0){ + Container allocated = it.next(); + Priority priority = allocated.getPriority(); + assert PRIORITY_MAP.equals(priority); + TaskAttemptId tId = maps.keySet().iterator().next(); + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); + JobCounterUpdateEvent jce = + new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); + eventHandler.handle(jce); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on * match"); + } + } } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1444276&r1=1444275&r2=1444276&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Feb 8 23:09:04 2013 @@ -262,6 +262,92 @@ public class TestRMContainerAllocator { checkAssignments(new ContainerRequestEvent[] { event1, event2 }, assigned, false); } + + @Test + public void testMapNodeLocality() throws Exception { + // test checks that ordering of allocated containers list from the RM does + // not affect the map->container assignment done by the AM. If there is a + // node local container available for a map then it should be assigned to + // that container and not a rack-local container that happened to be seen + // earlier in the allocated containers list from the RM. + // Regression test for MAPREDUCE-4893 + LOG.info("Running testMapNodeLocality"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps + rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node + MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map + dispatcher.await(); + + // create the container requests for maps + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + ContainerRequestEvent event2 = createReq(jobId, 2, 1024, + new String[] { "h1" }); + allocator.sendRequest(event2); + ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + new String[] { "h2" }); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + // Node heartbeat from rack-local first. This makes node h3 the first in the + // list of allocated containers but it should not be assigned to task1. + nodeManager3.nodeHeartbeat(true); + // Node heartbeat from node-local next. This allocates 2 node local + // containers for task1 and task2. These should be matched with those tasks. + nodeManager1.nodeHeartbeat(true); + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + assigned, false); + // remove the rack-local assignment that should have happened for task3 + for(TaskAttemptContainerAssignedEvent event : assigned) { + if(event.getTaskAttemptID().equals(event3.getAttemptID())) { + assigned.remove(event); + Assert.assertTrue( + event.getContainer().getNodeId().getHost().equals("h3")); + break; + } + } + checkAssignments(new ContainerRequestEvent[] { event1, event2}, + assigned, true); + } @Test public void testMapReduceScheduling() throws Exception { @@ -1133,7 +1219,7 @@ public class TestRMContainerAllocator { if (checkHostMatch) { Assert.assertTrue("Not assigned to requested host", Arrays.asList( request.getHosts()).contains( - assigned.getContainer().getNodeId().toString())); + assigned.getContainer().getNodeId().getHost())); } }