Author: jlowe
Date: Thu Apr 10 22:09:16 2014
New Revision: 1586482

URL: http://svn.apache.org/r1586482
Log:
svn merge -c 1586479 FIXES: MAPREDUCE-5825. Provide diagnostics for reducers 
killed during ramp down. Contributed by Gera Shegalov

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1586482&r1=1586481&r2=1586482&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu 
Apr 10 22:09:16 2014
@@ -24,6 +24,9 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5804. TestMRJobsWithProfiler#testProfiler timesout (Mit Desai
     via kihwal)
 
+    MAPREDUCE-5825. Provide diagnostics for reducers killed during ramp down
+    (Gera Shegalov via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1586482&r1=1586481&r2=1586482&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 Thu Apr 10 22:09:16 2014
@@ -90,6 +90,10 @@ public class RMContainerAllocator extend
   private static final Priority PRIORITY_REDUCE;
   private static final Priority PRIORITY_MAP;
 
+  @VisibleForTesting
+  public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
+      + "to make room for pending map attempts";
+
   private Thread eventHandlingThread;
   private final AtomicBoolean stopped;
 
@@ -1133,7 +1137,7 @@ public class RMContainerAllocator extend
         TaskAttemptId id = reduceList.remove(0);//remove the one on top
         LOG.info("Preempting " + id);
         preemptionWaitingReduces.add(id);
-        eventHandler.handle(new TaskAttemptEvent(id, 
TaskAttemptEventType.TA_KILL));
+        eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC));
       }
     }
     

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1586482&r1=1586481&r2=1586482&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
 Thu Apr 10 22:09:16 2014
@@ -360,6 +360,59 @@ public class TestRMContainerAllocator {
         assigned, false);
   }
 
+  @Test(timeout = 30000)
+  public void testReducerRampdownDiagnostics() throws Exception {
+    LOG.info("Running tesReducerRampdownDiagnostics");
+
+    final Configuration conf = new Configuration();
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+    final RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    final String host = "host1";
+    final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048);
+    nm.nodeHeartbeat(true);
+    dispatcher.await();
+    final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+    final JobId jobId = MRBuilderUtils
+        .newJobId(appAttemptId.getApplicationId(), 0);
+    final Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+    // add resources to scheduler
+    dispatcher.await();
+
+    // create the container request
+    final String[] locations = new String[] { host };
+    allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+    for (int i = 0; i < 1;) {
+      dispatcher.await();
+      i += allocator.schedule().size();
+      nm.nodeHeartbeat(true);
+    }
+
+    allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
+    while (allocator.getTaskAttemptKillEvents().size() == 0) {
+      dispatcher.await();
+      allocator.schedule().size();
+      nm.nodeHeartbeat(true);
+    }
+    final String killEventMessage = allocator.getTaskAttemptKillEvents().get(0)
+        .getMessage();
+    Assert.assertTrue("No reducer rampDown preemption message",
+        killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
+  }
+
   @Test
   public void testMapReduceScheduling() throws Exception {
 


Reply via email to