Author: szetszwo Date: Thu Jan 31 21:39:42 2013 New Revision: 1441206 URL: http://svn.apache.org/viewvc?rev=1441206&view=rev Log: Merge r1440222 through r1441205 from trunk.
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1440222-1441205 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1441206&r1=1441205&r2=1441206&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Thu Jan 31 21:39:42 2013 @@ -217,6 +217,9 @@ Release 2.0.3-alpha - Unreleased OPTIMIZATIONS + MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of + containers to get maximum locality. (Bikas Saha via vinodkv) + BUG FIXES MAPREDUCE-4607. Race condition in ReduceTask completion can result in Task @@ -278,6 +281,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-2264. Job status exceeds 100% in some cases. (devaraj.k and sandyr via tucu) + MAPREDUCE-4969. TestKeyValueTextInputFormat test fails with Open JDK 7. + (Arpit Agarwal via suresh) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1440222-1441205 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1440222-1441205 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1441206&r1=1441205&r2=1441206&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jan 31 21:39:42 2013 @@ -747,7 +747,7 @@ public class RMContainerAllocator extend addContainerReq(req); } - @SuppressWarnings("unchecked") + // this method will change the list of allocatedContainers. private void assign(List<Container> allocatedContainers) { Iterator<Container> it = allocatedContainers.iterator(); LOG.info("Got allocated containers " + allocatedContainers.size()); @@ -788,84 +788,97 @@ public class RMContainerAllocator extend + reduces.isEmpty()); isAssignable = false; } - } + } else { + LOG.warn("Container allocated at unwanted priority: " + priority + + ". Returning to RM..."); + isAssignable = false; + } - boolean blackListed = false; - ContainerRequest assigned = null; + if(!isAssignable) { + // release container if we could not assign it + containerNotAssigned(allocated); + it.remove(); + continue; + } - if (isAssignable) { - // do not assign if allocated container is on a - // blacklisted host - String allocatedHost = allocated.getNodeId().getHost(); - blackListed = isNodeBlacklisted(allocatedHost); - if (blackListed) { - // we need to request for a new container - // and release the current one - LOG.info("Got allocated container on a blacklisted " - + " host "+allocatedHost - +". Releasing container " + allocated); - - // find the request matching this allocated container - // and replace it with a new one - ContainerRequest toBeReplacedReq = - getContainerReqToReplace(allocated); - if (toBeReplacedReq != null) { - LOG.info("Placing a new container request for task attempt " - + toBeReplacedReq.attemptID); - ContainerRequest newReq = - getFilteredContainerRequest(toBeReplacedReq); - decContainerReq(toBeReplacedReq); - if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == - TaskType.MAP) { - maps.put(newReq.attemptID, newReq); - } - else { - reduces.put(newReq.attemptID, newReq); - } - addContainerReq(newReq); + // do not assign if allocated container is on a + // blacklisted host + String allocatedHost = allocated.getNodeId().getHost(); + if (isNodeBlacklisted(allocatedHost)) { + // we need to request for a new container + // and release the current one + LOG.info("Got allocated container on a blacklisted " + + " host "+allocatedHost + +". Releasing container " + allocated); + + // find the request matching this allocated container + // and replace it with a new one + ContainerRequest toBeReplacedReq = + getContainerReqToReplace(allocated); + if (toBeReplacedReq != null) { + LOG.info("Placing a new container request for task attempt " + + toBeReplacedReq.attemptID); + ContainerRequest newReq = + getFilteredContainerRequest(toBeReplacedReq); + decContainerReq(toBeReplacedReq); + if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == + TaskType.MAP) { + maps.put(newReq.attemptID, newReq); } else { - LOG.info("Could not map allocated container to a valid request." - + " Releasing allocated container " + allocated); + reduces.put(newReq.attemptID, newReq); } + addContainerReq(newReq); } else { - assigned = assign(allocated); - if (assigned != null) { - // Update resource requests - decContainerReq(assigned); - - // send the container-assigned event to task attempt - eventHandler.handle(new TaskAttemptContainerAssignedEvent( - assigned.attemptID, allocated, applicationACLs)); - - assignedRequests.add(allocated, assigned.attemptID); - - if (LOG.isDebugEnabled()) { - LOG.info("Assigned container (" + allocated + ") " - + " to task " + assigned.attemptID + " on node " - + allocated.getNodeId().toString()); - } - } - else { - //not assigned to any request, release the container - LOG.info("Releasing unassigned and invalid container " - + allocated + ". RM has gone crazy, someone go look!" - + " Hey RM, if you are so rich, go donate to non-profits!"); - } + LOG.info("Could not map allocated container to a valid request." + + " Releasing allocated container " + allocated); } + + // release container if we could not assign it + containerNotAssigned(allocated); + it.remove(); + continue; } - - // release container if it was blacklisted - // or if we could not assign it - if (blackListed || assigned == null) { - containersReleased++; - release(allocated.getId()); - } + } + + assignContainers(allocatedContainers); + + // release container if we could not assign it + it = allocatedContainers.iterator(); + while (it.hasNext()) { + Container allocated = it.next(); + LOG.info("Releasing unassigned and invalid container " + + allocated + ". RM may have assignment issues"); + containerNotAssigned(allocated); + } + } + + @SuppressWarnings("unchecked") + private void containerAssigned(Container allocated, + ContainerRequest assigned) { + // Update resource requests + decContainerReq(assigned); + + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + assigned.attemptID, allocated, applicationACLs)); + + assignedRequests.add(allocated, assigned.attemptID); + + if (LOG.isDebugEnabled()) { + LOG.info("Assigned container (" + allocated + ") " + + " to task " + assigned.attemptID + " on node " + + allocated.getNodeId().toString()); } } - private ContainerRequest assign(Container allocated) { + private void containerNotAssigned(Container allocated) { + containersReleased++; + release(allocated.getId()); + } + + private ContainerRequest assignWithoutLocality(Container allocated) { ContainerRequest assigned = null; Priority priority = allocated.getPriority(); @@ -877,18 +890,24 @@ public class RMContainerAllocator extend LOG.debug("Assigning container " + allocated + " to reduce"); } assigned = assignToReduce(allocated); - } else if (PRIORITY_MAP.equals(priority)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assigning container " + allocated + " to map"); - } - assigned = assignToMap(allocated); - } else { - LOG.warn("Container allocated at unwanted priority: " + priority + - ". Returning to RM..."); } return assigned; } + + private void assignContainers(List<Container> allocatedContainers) { + Iterator<Container> it = allocatedContainers.iterator(); + while (it.hasNext()) { + Container allocated = it.next(); + ContainerRequest assigned = assignWithoutLocality(allocated); + if (assigned != null) { + containerAssigned(allocated, assigned); + it.remove(); + } + } + + assignMapsWithLocality(allocatedContainers); + } private ContainerRequest getContainerReqToReplace(Container allocated) { LOG.info("Finding containerReq for allocated container: " + allocated); @@ -959,11 +978,15 @@ public class RMContainerAllocator extend } @SuppressWarnings("unchecked") - private ContainerRequest assignToMap(Container allocated) { - //try to assign to maps if present - //first by host, then by rack, followed by * - ContainerRequest assigned = null; - while (assigned == null && maps.size() > 0) { + private void assignMapsWithLocality(List<Container> allocatedContainers) { + // try to assign to all nodes first to match node local + Iterator<Container> it = allocatedContainers.iterator(); + while(it.hasNext() && maps.size() > 0){ + Container allocated = it.next(); + Priority priority = allocated.getPriority(); + assert PRIORITY_MAP.equals(priority); + // "if (maps.containsKey(tId))" below should be almost always true. + // hence this while loop would almost always have O(1) complexity String host = allocated.getNodeId().getHost(); LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); while (list != null && list.size() > 0) { @@ -972,7 +995,9 @@ public class RMContainerAllocator extend } TaskAttemptId tId = list.removeFirst(); if (maps.containsKey(tId)) { - assigned = maps.remove(tId); + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); @@ -984,39 +1009,56 @@ public class RMContainerAllocator extend break; } } - if (assigned == null) { - String rack = RackResolver.resolve(host).getNetworkLocation(); - list = mapsRackMapping.get(rack); - while (list != null && list.size() > 0) { - TaskAttemptId tId = list.removeFirst(); - if (maps.containsKey(tId)) { - assigned = maps.remove(tId); - JobCounterUpdateEvent jce = - new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); - eventHandler.handle(jce); - rackLocalAssigned++; - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned based on rack match " + rack); - } - break; - } - } - if (assigned == null && maps.size() > 0) { - TaskAttemptId tId = maps.keySet().iterator().next(); - assigned = maps.remove(tId); + } + + // try to match all rack local + it = allocatedContainers.iterator(); + while(it.hasNext() && maps.size() > 0){ + Container allocated = it.next(); + Priority priority = allocated.getPriority(); + assert PRIORITY_MAP.equals(priority); + // "if (maps.containsKey(tId))" below should be almost always true. + // hence this while loop would almost always have O(1) complexity + String host = allocated.getNodeId().getHost(); + String rack = RackResolver.resolve(host).getNetworkLocation(); + LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); + while (list != null && list.size() > 0) { + TaskAttemptId tId = list.removeFirst(); + if (maps.containsKey(tId)) { + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); + jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); eventHandler.handle(jce); + rackLocalAssigned++; if (LOG.isDebugEnabled()) { - LOG.debug("Assigned based on * match"); + LOG.debug("Assigned based on rack match " + rack); } break; } } } - return assigned; + + // assign remaining + it = allocatedContainers.iterator(); + while(it.hasNext() && maps.size() > 0){ + Container allocated = it.next(); + Priority priority = allocated.getPriority(); + assert PRIORITY_MAP.equals(priority); + TaskAttemptId tId = maps.keySet().iterator().next(); + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); + JobCounterUpdateEvent jce = + new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); + eventHandler.handle(jce); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on * match"); + } + } } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1441206&r1=1441205&r2=1441206&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Jan 31 21:39:42 2013 @@ -190,6 +190,92 @@ public class TestRMContainerAllocator { checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); } + + @Test + public void testMapNodeLocality() throws Exception { + // test checks that ordering of allocated containers list from the RM does + // not affect the map->container assignment done by the AM. If there is a + // node local container available for a map then it should be assigned to + // that container and not a rack-local container that happened to be seen + // earlier in the allocated containers list from the RM. + // Regression test for MAPREDUCE-4893 + LOG.info("Running testMapNodeLocality"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps + rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node + MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map + dispatcher.await(); + + // create the container requests for maps + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + ContainerRequestEvent event2 = createReq(jobId, 2, 1024, + new String[] { "h1" }); + allocator.sendRequest(event2); + ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + new String[] { "h2" }); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + // Node heartbeat from rack-local first. This makes node h3 the first in the + // list of allocated containers but it should not be assigned to task1. + nodeManager3.nodeHeartbeat(true); + // Node heartbeat from node-local next. This allocates 2 node local + // containers for task1 and task2. These should be matched with those tasks. + nodeManager1.nodeHeartbeat(true); + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + assigned, false); + // remove the rack-local assignment that should have happened for task3 + for(TaskAttemptContainerAssignedEvent event : assigned) { + if(event.getTaskAttemptID().equals(event3.getAttemptID())) { + assigned.remove(event); + Assert.assertTrue( + event.getContainer().getNodeId().getHost().equals("h3")); + break; + } + } + checkAssignments(new ContainerRequestEvent[] { event1, event2}, + assigned, true); + } @Test public void testResource() throws Exception { @@ -1202,7 +1288,7 @@ public class TestRMContainerAllocator { if (checkHostMatch) { Assert.assertTrue("Not assigned to requested host", Arrays.asList( request.getHosts()).contains( - assigned.getContainer().getNodeId().toString())); + assigned.getContainer().getNodeId().getHost())); } } Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1440222-1441205 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?rev=1441206&r1=1441205&r2=1441206&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Thu Jan 31 21:39:42 2013 @@ -136,32 +136,47 @@ public class TestKeyValueTextInputFormat } public void testUTF8() throws Exception { - LineReader in = makeStream("abcd\u20acbdcd\u20ac"); - Text line = new Text(); - in.readLine(line); - assertEquals("readLine changed utf8 characters", - "abcd\u20acbdcd\u20ac", line.toString()); - in = makeStream("abc\u200axyz"); - in.readLine(line); - assertEquals("split on fake newline", "abc\u200axyz", line.toString()); + LineReader in = null; + + try { + in = makeStream("abcd\u20acbdcd\u20ac"); + Text line = new Text(); + in.readLine(line); + assertEquals("readLine changed utf8 characters", + "abcd\u20acbdcd\u20ac", line.toString()); + in = makeStream("abc\u200axyz"); + in.readLine(line); + assertEquals("split on fake newline", "abc\u200axyz", line.toString()); + } finally { + if (in != null) { + in.close(); + } + } } public void testNewLines() throws Exception { - LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); - Text out = new Text(); - in.readLine(out); - assertEquals("line1 length", 1, out.getLength()); - in.readLine(out); - assertEquals("line2 length", 2, out.getLength()); - in.readLine(out); - assertEquals("line3 length", 0, out.getLength()); - in.readLine(out); - assertEquals("line4 length", 3, out.getLength()); - in.readLine(out); - assertEquals("line5 length", 4, out.getLength()); - in.readLine(out); - assertEquals("line5 length", 5, out.getLength()); - assertEquals("end of file", 0, in.readLine(out)); + LineReader in = null; + try { + in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); + Text out = new Text(); + in.readLine(out); + assertEquals("line1 length", 1, out.getLength()); + in.readLine(out); + assertEquals("line2 length", 2, out.getLength()); + in.readLine(out); + assertEquals("line3 length", 0, out.getLength()); + in.readLine(out); + assertEquals("line4 length", 3, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 4, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 5, out.getLength()); + assertEquals("end of file", 0, in.readLine(out)); + } finally { + if (in != null) { + in.close(); + } + } } private static void writeFile(FileSystem fs, Path name, @@ -183,14 +198,21 @@ public class TestKeyValueTextInputFormat InputSplit split, JobConf job) throws IOException { List<Text> result = new ArrayList<Text>(); - RecordReader<Text, Text> reader = format.getRecordReader(split, job, - voidReporter); - Text key = reader.createKey(); - Text value = reader.createValue(); - while (reader.next(key, value)) { - result.add(value); - value = reader.createValue(); - } + RecordReader<Text, Text> reader = null; + + try { + reader = format.getRecordReader(split, job, voidReporter); + Text key = reader.createKey(); + Text value = reader.createValue(); + while (reader.next(key, value)) { + result.add(value); + value = (Text) reader.createValue(); + } + } finally { + if (reader != null) { + reader.close(); + } + } return result; }