Author: jlowe
Date: Wed Apr 23 21:57:55 2014
New Revision: 1589526

URL: http://svn.apache.org/r1589526
Log:
svn merge -c 1589524 FIXES: MAPREDUCE-5841. uber job doesn't terminate on 
getting mapred job kill. Contributed by Sangjin Lee

Added:
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
      - copied unchanged from r1589524, 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1589526&r1=1589525&r2=1589526&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed 
Apr 23 21:57:55 2014
@@ -85,6 +85,9 @@ Release 2.4.1 - UNRELEASED
     MAPREDUCE-5832. Fixed TestJobClient to not fail on JDK7 or on Windows. 
(Jian
     He and Vinod Kumar Vavilapalli via vinodkv)
 
+    MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill
+    (Sangjin Lee via jlowe)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1589526&r1=1589525&r2=1589526&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
 Wed Apr 23 21:57:55 2014
@@ -24,6 +24,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -51,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Runs the container task locally in a thread.
  * Since all (sub)tasks share the same local directory, they must be executed
@@ -71,7 +77,8 @@ public class LocalContainerLauncher exte
   private final HashSet<File> localizedFiles;
   private final AppContext context;
   private final TaskUmbilicalProtocol umbilical;
-  private Thread eventHandlingThread;
+  private ExecutorService taskRunner;
+  private Thread eventHandler;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
 
@@ -115,14 +122,24 @@ public class LocalContainerLauncher exte
   }
 
   public void serviceStart() throws Exception {
-    eventHandlingThread = new Thread(new SubtaskRunner(), 
"uber-SubtaskRunner");
-    eventHandlingThread.start();
+    // create a single thread for serial execution of tasks
+    // make it a daemon thread so that the process can exit even if the task is
+    // not interruptible
+    taskRunner =
+        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+            setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
+    // create and start an event handling thread
+    eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
+    eventHandler.start();
     super.serviceStart();
   }
 
   public void serviceStop() throws Exception {
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
+    if (eventHandler != null) {
+      eventHandler.interrupt();
+    }
+    if (taskRunner != null) {
+      taskRunner.shutdownNow();
     }
     super.serviceStop();
   }
@@ -158,12 +175,15 @@ public class LocalContainerLauncher exte
    *   - runs Task (runSubMap() or runSubReduce())
    *     - TA can safely send TA_UPDATE since in RUNNING state
    */
-  private class SubtaskRunner implements Runnable {
+  private class EventHandler implements Runnable {
 
-    private boolean doneWithMaps = false;
-    private int finishedSubMaps = 0;
+    private volatile boolean doneWithMaps = false;
+    private volatile int finishedSubMaps = 0;
 
-    SubtaskRunner() {
+    private final Map<TaskAttemptId,Future<?>> futures =
+        new ConcurrentHashMap<TaskAttemptId,Future<?>>();
+
+    EventHandler() {
     }
 
     @SuppressWarnings("unchecked")
@@ -172,7 +192,7 @@ public class LocalContainerLauncher exte
       ContainerLauncherEvent event = null;
 
       // Collect locations of map outputs to give to reduces
-      Map<TaskAttemptID, MapOutputFile> localMapFiles =
+      final Map<TaskAttemptID, MapOutputFile> localMapFiles =
           new HashMap<TaskAttemptID, MapOutputFile>();
       
       // _must_ either run subtasks sequentially or accept expense of new JVMs
@@ -183,81 +203,41 @@ public class LocalContainerLauncher exte
           event = eventQueue.take();
         } catch (InterruptedException e) {  // mostly via T_KILL? JOB_KILL?
           LOG.error("Returning, interrupted : " + e);
-          return;
+          break;
         }
 
         LOG.info("Processing the event " + event.toString());
 
         if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
 
-          ContainerRemoteLaunchEvent launchEv =
+          final ContainerRemoteLaunchEvent launchEv =
               (ContainerRemoteLaunchEvent)event;
-          TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
-
-          Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
-          int numMapTasks = job.getTotalMaps();
-          int numReduceTasks = job.getTotalReduces();
-
-          // YARN (tracking) Task:
-          org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
-              job.getTask(attemptID.getTaskId());
-          // classic mapred Task:
-          org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
-
-          // after "launching," send launched event to task attempt to move
-          // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
-          // do getRemoteTask() call first)
           
-          //There is no port number because we are not really talking to a task
-          // tracker.  The shuffle is just done through local files.  So the
-          // port number is set to -1 in this case.
-          context.getEventHandler().handle(
-              new TaskAttemptContainerLaunchedEvent(attemptID, -1));
-
-          if (numMapTasks == 0) {
-            doneWithMaps = true;
-          }
-
-          try {
-            if (remoteTask.isMapOrReduce()) {
-              JobCounterUpdateEvent jce = new 
JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
-              jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
-              if (remoteTask.isMapTask()) {
-                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
-              } else {
-                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
-              }
-              context.getEventHandler().handle(jce);
+          // execute the task on a separate thread
+          Future<?> future = taskRunner.submit(new Runnable() {
+            public void run() {
+              runTask(launchEv, localMapFiles);
             }
-            runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
-                       (numReduceTasks > 0), localMapFiles);
-            
-          } catch (RuntimeException re) {
-            JobCounterUpdateEvent jce = new 
JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
-            context.getEventHandler().handle(jce);
-            // this is our signal that the subtask failed in some way, so
-            // simulate a failed JVM/container and send a container-completed
-            // event to task attempt (i.e., move state machine from RUNNING
-            // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
-            context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
-                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-          } catch (IOException ioe) {
-            // if umbilical itself barfs (in error-handler of runSubMap()),
-            // we're pretty much hosed, so do what YarnChild main() does
-            // (i.e., exit clumsily--but can never happen, so no worries!)
-            LOG.fatal("oopsie...  this can never happen: "
-                + StringUtils.stringifyException(ioe));
-            System.exit(-1);
-          }
+          });
+          // remember the current attempt
+          futures.put(event.getTaskAttemptID(), future);
 
         } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
 
-          // no container to kill, so just send "cleaned" event to task attempt
-          // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state
-          // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
+          // cancel (and interrupt) the current running task associated with 
the
+          // event
+          TaskAttemptId taId = event.getTaskAttemptID();
+          Future<?> future = futures.remove(taId);
+          if (future != null) {
+            LOG.info("canceling the task attempt " + taId);
+            future.cancel(true);
+          }
+
+          // send "cleaned" event to task attempt to move us from
+          // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or 
+          // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
           context.getEventHandler().handle(
-              new TaskAttemptEvent(event.getTaskAttemptID(),
+              new TaskAttemptEvent(taId,
                   TaskAttemptEventType.TA_CONTAINER_CLEANED));
 
         } else {
@@ -267,7 +247,75 @@ public class LocalContainerLauncher exte
       }
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("unchecked")
+    private void runTask(ContainerRemoteLaunchEvent launchEv,
+        Map<TaskAttemptID, MapOutputFile> localMapFiles) {
+      TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
+
+      Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
+      int numMapTasks = job.getTotalMaps();
+      int numReduceTasks = job.getTotalReduces();
+
+      // YARN (tracking) Task:
+      org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+          job.getTask(attemptID.getTaskId());
+      // classic mapred Task:
+      org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+      // after "launching," send launched event to task attempt to move
+      // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
+      // do getRemoteTask() call first)
+      
+      //There is no port number because we are not really talking to a task
+      // tracker.  The shuffle is just done through local files.  So the
+      // port number is set to -1 in this case.
+      context.getEventHandler().handle(
+          new TaskAttemptContainerLaunchedEvent(attemptID, -1));
+
+      if (numMapTasks == 0) {
+        doneWithMaps = true;
+      }
+
+      try {
+        if (remoteTask.isMapOrReduce()) {
+          JobCounterUpdateEvent jce = new 
JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+          jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+          if (remoteTask.isMapTask()) {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+          } else {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+          }
+          context.getEventHandler().handle(jce);
+        }
+        runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+                   (numReduceTasks > 0), localMapFiles);
+        
+      } catch (RuntimeException re) {
+        JobCounterUpdateEvent jce = new 
JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+        jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+        context.getEventHandler().handle(jce);
+        // this is our signal that the subtask failed in some way, so
+        // simulate a failed JVM/container and send a container-completed
+        // event to task attempt (i.e., move state machine from RUNNING
+        // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
+        context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+      } catch (IOException ioe) {
+        // if umbilical itself barfs (in error-handler of runSubMap()),
+        // we're pretty much hosed, so do what YarnChild main() does
+        // (i.e., exit clumsily--but can never happen, so no worries!)
+        LOG.fatal("oopsie...  this can never happen: "
+            + StringUtils.stringifyException(ioe));
+        ExitUtil.terminate(-1);
+      } finally {
+        // remove my future
+        if (futures.remove(attemptID) != null) {
+          LOG.info("removed attempt " + attemptID +
+              " from the futures to keep track of");
+        }
+      }
+    }
+
     private void runSubtask(org.apache.hadoop.mapred.Task task,
                             final TaskType taskType,
                             TaskAttemptId attemptID,
@@ -397,7 +445,6 @@ public class LocalContainerLauncher exte
      * filenames instead of "file.out". (All of this is entirely internal,
      * so there are no particular compatibility issues.)
      */
-    @SuppressWarnings("deprecation")
     private MapOutputFile renameMapOutputForReduce(JobConf conf,
         TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws 
IOException {
       FileSystem localFs = FileSystem.getLocal(conf);
@@ -456,7 +503,7 @@ public class LocalContainerLauncher exte
       }
     }
 
-  } // end SubtaskRunner
+  } // end EventHandler
   
   private static class RenamedMapOutputFile extends MapOutputFile {
     private Path path;


Reply via email to