Author: jlowe Date: Thu Apr 10 22:09:16 2014 New Revision: 1586482 URL: http://svn.apache.org/r1586482 Log: svn merge -c 1586479 FIXES: MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down. Contributed by Gera Shegalov
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1586482&r1=1586481&r2=1586482&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Apr 10 22:09:16 2014 @@ -24,6 +24,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5804. TestMRJobsWithProfiler#testProfiler timesout (Mit Desai via kihwal) + MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down + (Gera Shegalov via jlowe) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1586482&r1=1586481&r2=1586482&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Apr 10 22:09:16 2014 @@ -90,6 +90,10 @@ public class RMContainerAllocator extend private static final Priority PRIORITY_REDUCE; private static final Priority PRIORITY_MAP; + @VisibleForTesting + public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted " + + "to make room for pending map attempts"; + private Thread eventHandlingThread; private final AtomicBoolean stopped; @@ -1133,7 +1137,7 @@ public class RMContainerAllocator extend TaskAttemptId id = reduceList.remove(0);//remove the one on top LOG.info("Preempting " + id); preemptionWaitingReduces.add(id); - eventHandler.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_KILL)); + eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC)); } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1586482&r1=1586481&r2=1586482&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Apr 10 22:09:16 2014 @@ -360,6 +360,59 @@ public class TestRMContainerAllocator { assigned, false); } + @Test(timeout = 30000) + public void testReducerRampdownDiagnostics() throws Exception { + LOG.info("Running tesReducerRampdownDiagnostics"); + + final Configuration conf = new Configuration(); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); + final MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + final RMApp app = rm.submitApp(1024); + dispatcher.await(); + + final String host = "host1"; + final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048); + nm.nodeHeartbeat(true); + dispatcher.await(); + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + final JobId jobId = MRBuilderUtils + .newJobId(appAttemptId.getApplicationId(), 0); + final Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + // add resources to scheduler + dispatcher.await(); + + // create the container request + final String[] locations = new String[] { host }; + allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + for (int i = 0; i < 1;) { + dispatcher.await(); + i += allocator.schedule().size(); + nm.nodeHeartbeat(true); + } + + allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); + while (allocator.getTaskAttemptKillEvents().size() == 0) { + dispatcher.await(); + allocator.schedule().size(); + nm.nodeHeartbeat(true); + } + final String killEventMessage = allocator.getTaskAttemptKillEvents().get(0) + .getMessage(); + Assert.assertTrue("No reducer rampDown preemption message", + killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC)); + } + @Test public void testMapReduceScheduling() throws Exception {