Author: jlowe Date: Wed Apr 23 21:57:55 2014 New Revision: 1589526 URL: http://svn.apache.org/r1589526 Log: svn merge -c 1589524 FIXES: MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill. Contributed by Sangjin Lee
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java - copied unchanged from r1589524, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1589526&r1=1589525&r2=1589526&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 23 21:57:55 2014 @@ -85,6 +85,9 @@ Release 2.4.1 - UNRELEASED MAPREDUCE-5832. Fixed TestJobClient to not fail on JDK7 or on Windows. (Jian He and Vinod Kumar Vavilapalli via vinodkv) + MAPREDUCE-5841. uber job doesn't terminate on getting mapred job kill + (Sangjin Lee via jlowe) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1589526&r1=1589525&r2=1589526&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Wed Apr 23 21:57:55 2014 @@ -24,6 +24,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -51,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Runs the container task locally in a thread. * Since all (sub)tasks share the same local directory, they must be executed @@ -71,7 +77,8 @@ public class LocalContainerLauncher exte private final HashSet<File> localizedFiles; private final AppContext context; private final TaskUmbilicalProtocol umbilical; - private Thread eventHandlingThread; + private ExecutorService taskRunner; + private Thread eventHandler; private BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>(); @@ -115,14 +122,24 @@ public class LocalContainerLauncher exte } public void serviceStart() throws Exception { - eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner"); - eventHandlingThread.start(); + // create a single thread for serial execution of tasks + // make it a daemon thread so that the process can exit even if the task is + // not interruptible + taskRunner = + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("uber-SubtaskRunner").build()); + // create and start an event handling thread + eventHandler = new Thread(new EventHandler(), "uber-EventHandler"); + eventHandler.start(); super.serviceStart(); } public void serviceStop() throws Exception { - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); + if (eventHandler != null) { + eventHandler.interrupt(); + } + if (taskRunner != null) { + taskRunner.shutdownNow(); } super.serviceStop(); } @@ -158,12 +175,15 @@ public class LocalContainerLauncher exte * - runs Task (runSubMap() or runSubReduce()) * - TA can safely send TA_UPDATE since in RUNNING state */ - private class SubtaskRunner implements Runnable { + private class EventHandler implements Runnable { - private boolean doneWithMaps = false; - private int finishedSubMaps = 0; + private volatile boolean doneWithMaps = false; + private volatile int finishedSubMaps = 0; - SubtaskRunner() { + private final Map<TaskAttemptId,Future<?>> futures = + new ConcurrentHashMap<TaskAttemptId,Future<?>>(); + + EventHandler() { } @SuppressWarnings("unchecked") @@ -172,7 +192,7 @@ public class LocalContainerLauncher exte ContainerLauncherEvent event = null; // Collect locations of map outputs to give to reduces - Map<TaskAttemptID, MapOutputFile> localMapFiles = + final Map<TaskAttemptID, MapOutputFile> localMapFiles = new HashMap<TaskAttemptID, MapOutputFile>(); // _must_ either run subtasks sequentially or accept expense of new JVMs @@ -183,81 +203,41 @@ public class LocalContainerLauncher exte event = eventQueue.take(); } catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL? LOG.error("Returning, interrupted : " + e); - return; + break; } LOG.info("Processing the event " + event.toString()); if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { - ContainerRemoteLaunchEvent launchEv = + final ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event; - TaskAttemptId attemptID = launchEv.getTaskAttemptID(); - - Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); - int numMapTasks = job.getTotalMaps(); - int numReduceTasks = job.getTotalReduces(); - - // YARN (tracking) Task: - org.apache.hadoop.mapreduce.v2.app.job.Task ytask = - job.getTask(attemptID.getTaskId()); - // classic mapred Task: - org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask(); - - // after "launching," send launched event to task attempt to move - // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must - // do getRemoteTask() call first) - //There is no port number because we are not really talking to a task - // tracker. The shuffle is just done through local files. So the - // port number is set to -1 in this case. - context.getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(attemptID, -1)); - - if (numMapTasks == 0) { - doneWithMaps = true; - } - - try { - if (remoteTask.isMapOrReduce()) { - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1); - if (remoteTask.isMapTask()) { - jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1); - } else { - jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1); - } - context.getEventHandler().handle(jce); + // execute the task on a separate thread + Future<?> future = taskRunner.submit(new Runnable() { + public void run() { + runTask(launchEv, localMapFiles); } - runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, - (numReduceTasks > 0), localMapFiles); - - } catch (RuntimeException re) { - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); - context.getEventHandler().handle(jce); - // this is our signal that the subtask failed in some way, so - // simulate a failed JVM/container and send a container-completed - // event to task attempt (i.e., move state machine from RUNNING - // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED]) - context.getEventHandler().handle(new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); - } catch (IOException ioe) { - // if umbilical itself barfs (in error-handler of runSubMap()), - // we're pretty much hosed, so do what YarnChild main() does - // (i.e., exit clumsily--but can never happen, so no worries!) - LOG.fatal("oopsie... this can never happen: " - + StringUtils.stringifyException(ioe)); - System.exit(-1); - } + }); + // remember the current attempt + futures.put(event.getTaskAttemptID(), future); } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) { - // no container to kill, so just send "cleaned" event to task attempt - // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state - // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP) + // cancel (and interrupt) the current running task associated with the + // event + TaskAttemptId taId = event.getTaskAttemptID(); + Future<?> future = futures.remove(taId); + if (future != null) { + LOG.info("canceling the task attempt " + taId); + future.cancel(true); + } + + // send "cleaned" event to task attempt to move us from + // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or + // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP) context.getEventHandler().handle( - new TaskAttemptEvent(event.getTaskAttemptID(), + new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); } else { @@ -267,7 +247,75 @@ public class LocalContainerLauncher exte } } - @SuppressWarnings("deprecation") + @SuppressWarnings("unchecked") + private void runTask(ContainerRemoteLaunchEvent launchEv, + Map<TaskAttemptID, MapOutputFile> localMapFiles) { + TaskAttemptId attemptID = launchEv.getTaskAttemptID(); + + Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); + int numMapTasks = job.getTotalMaps(); + int numReduceTasks = job.getTotalReduces(); + + // YARN (tracking) Task: + org.apache.hadoop.mapreduce.v2.app.job.Task ytask = + job.getTask(attemptID.getTaskId()); + // classic mapred Task: + org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask(); + + // after "launching," send launched event to task attempt to move + // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must + // do getRemoteTask() call first) + + //There is no port number because we are not really talking to a task + // tracker. The shuffle is just done through local files. So the + // port number is set to -1 in this case. + context.getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(attemptID, -1)); + + if (numMapTasks == 0) { + doneWithMaps = true; + } + + try { + if (remoteTask.isMapOrReduce()) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1); + if (remoteTask.isMapTask()) { + jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1); + } else { + jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1); + } + context.getEventHandler().handle(jce); + } + runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, + (numReduceTasks > 0), localMapFiles); + + } catch (RuntimeException re) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); + context.getEventHandler().handle(jce); + // this is our signal that the subtask failed in some way, so + // simulate a failed JVM/container and send a container-completed + // event to task attempt (i.e., move state machine from RUNNING + // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED]) + context.getEventHandler().handle(new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } catch (IOException ioe) { + // if umbilical itself barfs (in error-handler of runSubMap()), + // we're pretty much hosed, so do what YarnChild main() does + // (i.e., exit clumsily--but can never happen, so no worries!) + LOG.fatal("oopsie... this can never happen: " + + StringUtils.stringifyException(ioe)); + ExitUtil.terminate(-1); + } finally { + // remove my future + if (futures.remove(attemptID) != null) { + LOG.info("removed attempt " + attemptID + + " from the futures to keep track of"); + } + } + } + private void runSubtask(org.apache.hadoop.mapred.Task task, final TaskType taskType, TaskAttemptId attemptID, @@ -397,7 +445,6 @@ public class LocalContainerLauncher exte * filenames instead of "file.out". (All of this is entirely internal, * so there are no particular compatibility issues.) */ - @SuppressWarnings("deprecation") private MapOutputFile renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { FileSystem localFs = FileSystem.getLocal(conf); @@ -456,7 +503,7 @@ public class LocalContainerLauncher exte } } - } // end SubtaskRunner + } // end EventHandler private static class RenamedMapOutputFile extends MapOutputFile { private Path path;