Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Aug 19 23:49:39 2014 @@ -24,8 +24,14 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSError; @@ -51,11 +57,14 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Runs the container task locally in a thread. * Since all (sub)tasks share the same local directory, they must be executed @@ -71,7 +80,8 @@ public class LocalContainerLauncher exte private final HashSet<File> localizedFiles; private final AppContext context; private final TaskUmbilicalProtocol umbilical; - private Thread eventHandlingThread; + private ExecutorService taskRunner; + private Thread eventHandler; private BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>(); @@ -115,14 +125,24 @@ public class LocalContainerLauncher exte } public void serviceStart() throws Exception { - eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner"); - eventHandlingThread.start(); + // create a single thread for serial execution of tasks + // make it a daemon thread so that the process can exit even if the task is + // not interruptible + taskRunner = + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("uber-SubtaskRunner").build()); + // create and start an event handling thread + eventHandler = new Thread(new EventHandler(), "uber-EventHandler"); + eventHandler.start(); super.serviceStart(); } public void serviceStop() throws Exception { - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); + if (eventHandler != null) { + eventHandler.interrupt(); + } + if (taskRunner != null) { + taskRunner.shutdownNow(); } super.serviceStop(); } @@ -158,12 +178,17 @@ public class LocalContainerLauncher exte * - runs Task (runSubMap() or runSubReduce()) * - TA can safely send TA_UPDATE since in RUNNING state */ - private class SubtaskRunner implements Runnable { + private class EventHandler implements Runnable { + // doneWithMaps and finishedSubMaps are accessed from only + // one thread. Therefore, no need to make them volatile. private boolean doneWithMaps = false; private int finishedSubMaps = 0; - SubtaskRunner() { + private final Map<TaskAttemptId,Future<?>> futures = + new ConcurrentHashMap<TaskAttemptId,Future<?>>(); + + EventHandler() { } @SuppressWarnings("unchecked") @@ -172,7 +197,7 @@ public class LocalContainerLauncher exte ContainerLauncherEvent event = null; // Collect locations of map outputs to give to reduces - Map<TaskAttemptID, MapOutputFile> localMapFiles = + final Map<TaskAttemptID, MapOutputFile> localMapFiles = new HashMap<TaskAttemptID, MapOutputFile>(); // _must_ either run subtasks sequentially or accept expense of new JVMs @@ -183,81 +208,41 @@ public class LocalContainerLauncher exte event = eventQueue.take(); } catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL? LOG.error("Returning, interrupted : " + e); - return; + break; } LOG.info("Processing the event " + event.toString()); if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { - ContainerRemoteLaunchEvent launchEv = + final ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event; - TaskAttemptId attemptID = launchEv.getTaskAttemptID(); - - Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); - int numMapTasks = job.getTotalMaps(); - int numReduceTasks = job.getTotalReduces(); - - // YARN (tracking) Task: - org.apache.hadoop.mapreduce.v2.app.job.Task ytask = - job.getTask(attemptID.getTaskId()); - // classic mapred Task: - org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask(); - - // after "launching," send launched event to task attempt to move - // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must - // do getRemoteTask() call first) - //There is no port number because we are not really talking to a task - // tracker. The shuffle is just done through local files. So the - // port number is set to -1 in this case. - context.getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(attemptID, -1)); - - if (numMapTasks == 0) { - doneWithMaps = true; - } - - try { - if (remoteTask.isMapOrReduce()) { - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1); - if (remoteTask.isMapTask()) { - jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1); - } else { - jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1); - } - context.getEventHandler().handle(jce); + // execute the task on a separate thread + Future<?> future = taskRunner.submit(new Runnable() { + public void run() { + runTask(launchEv, localMapFiles); } - runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, - (numReduceTasks > 0), localMapFiles); - - } catch (RuntimeException re) { - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); - context.getEventHandler().handle(jce); - // this is our signal that the subtask failed in some way, so - // simulate a failed JVM/container and send a container-completed - // event to task attempt (i.e., move state machine from RUNNING - // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED]) - context.getEventHandler().handle(new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); - } catch (IOException ioe) { - // if umbilical itself barfs (in error-handler of runSubMap()), - // we're pretty much hosed, so do what YarnChild main() does - // (i.e., exit clumsily--but can never happen, so no worries!) - LOG.fatal("oopsie... this can never happen: " - + StringUtils.stringifyException(ioe)); - System.exit(-1); - } + }); + // remember the current attempt + futures.put(event.getTaskAttemptID(), future); } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) { - // no container to kill, so just send "cleaned" event to task attempt - // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state - // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP) + // cancel (and interrupt) the current running task associated with the + // event + TaskAttemptId taId = event.getTaskAttemptID(); + Future<?> future = futures.remove(taId); + if (future != null) { + LOG.info("canceling the task attempt " + taId); + future.cancel(true); + } + + // send "cleaned" event to task attempt to move us from + // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or + // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP) context.getEventHandler().handle( - new TaskAttemptEvent(event.getTaskAttemptID(), + new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); } else { @@ -267,7 +252,75 @@ public class LocalContainerLauncher exte } } - @SuppressWarnings("deprecation") + @SuppressWarnings("unchecked") + private void runTask(ContainerRemoteLaunchEvent launchEv, + Map<TaskAttemptID, MapOutputFile> localMapFiles) { + TaskAttemptId attemptID = launchEv.getTaskAttemptID(); + + Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId()); + int numMapTasks = job.getTotalMaps(); + int numReduceTasks = job.getTotalReduces(); + + // YARN (tracking) Task: + org.apache.hadoop.mapreduce.v2.app.job.Task ytask = + job.getTask(attemptID.getTaskId()); + // classic mapred Task: + org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask(); + + // after "launching," send launched event to task attempt to move + // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must + // do getRemoteTask() call first) + + //There is no port number because we are not really talking to a task + // tracker. The shuffle is just done through local files. So the + // port number is set to -1 in this case. + context.getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(attemptID, -1)); + + if (numMapTasks == 0) { + doneWithMaps = true; + } + + try { + if (remoteTask.isMapOrReduce()) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1); + if (remoteTask.isMapTask()) { + jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1); + } else { + jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1); + } + context.getEventHandler().handle(jce); + } + runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, + (numReduceTasks > 0), localMapFiles); + + } catch (RuntimeException re) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); + context.getEventHandler().handle(jce); + // this is our signal that the subtask failed in some way, so + // simulate a failed JVM/container and send a container-completed + // event to task attempt (i.e., move state machine from RUNNING + // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED]) + context.getEventHandler().handle(new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } catch (IOException ioe) { + // if umbilical itself barfs (in error-handler of runSubMap()), + // we're pretty much hosed, so do what YarnChild main() does + // (i.e., exit clumsily--but can never happen, so no worries!) + LOG.fatal("oopsie... this can never happen: " + + StringUtils.stringifyException(ioe)); + ExitUtil.terminate(-1); + } finally { + // remove my future + if (futures.remove(attemptID) != null) { + LOG.info("removed attempt " + attemptID + + " from the futures to keep track of"); + } + } + } + private void runSubtask(org.apache.hadoop.mapred.Task task, final TaskType taskType, TaskAttemptId attemptID, @@ -355,7 +408,9 @@ public class LocalContainerLauncher exte } catch (FSError e) { LOG.fatal("FSError from child", e); // umbilical: MRAppMaster creates (taskAttemptListener), passes to us - umbilical.fsError(classicAttemptID, e.getMessage()); + if (!ShutdownHookManager.get().isShutdownInProgress()) { + umbilical.fsError(classicAttemptID, e.getMessage()); + } throw new RuntimeException(); } catch (Exception exception) { @@ -378,54 +433,18 @@ public class LocalContainerLauncher exte } catch (Throwable throwable) { LOG.fatal("Error running local (uberized) 'child' : " + StringUtils.stringifyException(throwable)); - Throwable tCause = throwable.getCause(); - String cause = (tCause == null) - ? throwable.getMessage() - : StringUtils.stringifyException(tCause); - umbilical.fatalError(classicAttemptID, cause); + if (!ShutdownHookManager.get().isShutdownInProgress()) { + Throwable tCause = throwable.getCause(); + String cause = + (tCause == null) ? throwable.getMessage() : StringUtils + .stringifyException(tCause); + umbilical.fatalError(classicAttemptID, cause); + } throw new RuntimeException(); } } /** - * Within the _local_ filesystem (not HDFS), all activity takes place within - * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/), - * and all sub-MapTasks create the same filename ("file.out"). Rename that - * to something unique (e.g., "map_0.out") to avoid collisions. - * - * Longer-term, we'll modify [something] to use TaskAttemptID-based - * filenames instead of "file.out". (All of this is entirely internal, - * so there are no particular compatibility issues.) - */ - @SuppressWarnings("deprecation") - private MapOutputFile renameMapOutputForReduce(JobConf conf, - TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { - FileSystem localFs = FileSystem.getLocal(conf); - // move map output to reduce input - Path mapOut = subMapOutputFile.getOutputFile(); - FileStatus mStatus = localFs.getFileStatus(mapOut); - Path reduceIn = subMapOutputFile.getInputFileForWrite( - TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); - Path mapOutIndex = new Path(mapOut.toString() + ".index"); - Path reduceInIndex = new Path(reduceIn.toString() + ".index"); - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming map output file for task attempt " - + mapId.toString() + " from original location " + mapOut.toString() - + " to destination " + reduceIn.toString()); - } - if (!localFs.mkdirs(reduceIn.getParent())) { - throw new IOException("Mkdirs failed to create " - + reduceIn.getParent().toString()); - } - if (!localFs.rename(mapOut, reduceIn)) - throw new IOException("Couldn't rename " + mapOut); - if (!localFs.rename(mapOutIndex, reduceInIndex)) - throw new IOException("Couldn't rename " + mapOutIndex); - - return new RenamedMapOutputFile(reduceIn); - } - - /** * Also within the local filesystem, we need to restore the initial state * of the directory as much as possible. Compare current contents against * the saved original state and nuke everything that doesn't belong, with @@ -456,8 +475,47 @@ public class LocalContainerLauncher exte } } - } // end SubtaskRunner - + } // end EventHandler + + /** + * Within the _local_ filesystem (not HDFS), all activity takes place within + * a subdir inside one of the LOCAL_DIRS + * (${local.dir}/usercache/$user/appcache/$appId/$contId/), + * and all sub-MapTasks create the same filename ("file.out"). Rename that + * to something unique (e.g., "map_0.out") to avoid possible collisions. + * + * Longer-term, we'll modify [something] to use TaskAttemptID-based + * filenames instead of "file.out". (All of this is entirely internal, + * so there are no particular compatibility issues.) + */ + @VisibleForTesting + protected static MapOutputFile renameMapOutputForReduce(JobConf conf, + TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + // move map output to reduce input + Path mapOut = subMapOutputFile.getOutputFile(); + FileStatus mStatus = localFs.getFileStatus(mapOut); + Path reduceIn = subMapOutputFile.getInputFileForWrite( + TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); + Path mapOutIndex = subMapOutputFile.getOutputIndexFile(); + Path reduceInIndex = new Path(reduceIn.toString() + ".index"); + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming map output file for task attempt " + + mapId.toString() + " from original location " + mapOut.toString() + + " to destination " + reduceIn.toString()); + } + if (!localFs.mkdirs(reduceIn.getParent())) { + throw new IOException("Mkdirs failed to create " + + reduceIn.getParent().toString()); + } + if (!localFs.rename(mapOut, reduceIn)) + throw new IOException("Couldn't rename " + mapOut); + if (!localFs.rename(mapOutIndex, reduceInIndex)) + throw new IOException("Couldn't rename " + mapOutIndex); + + return new RenamedMapOutputFile(reduceIn); + } + private static class RenamedMapOutputFile extends MapOutputFile { private Path path;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Aug 19 23:49:39 2014 @@ -141,7 +141,9 @@ public class TaskAttemptListenerImpl ext } server.start(); - this.address = NetUtils.getConnectAddress(server); + this.address = NetUtils.createSocketAddrForHost( + context.getNMHostname(), + server.getListenerAddress().getPort()); } catch (IOException e) { throw new YarnRuntimeException(e); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue Aug 19 23:49:39 2014 @@ -56,6 +56,7 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -176,7 +177,9 @@ class YarnChild { }); } catch (FSError e) { LOG.fatal("FSError from child", e); - umbilical.fsError(taskid, e.getMessage()); + if (!ShutdownHookManager.get().isShutdownInProgress()) { + umbilical.fsError(taskid, e.getMessage()); + } } catch (Exception exception) { LOG.warn("Exception running child : " + StringUtils.stringifyException(exception)); @@ -201,17 +204,22 @@ class YarnChild { } // Report back any failures, for diagnostic purposes if (taskid != null) { - umbilical.fatalError(taskid, StringUtils.stringifyException(exception)); + if (!ShutdownHookManager.get().isShutdownInProgress()) { + umbilical.fatalError(taskid, + StringUtils.stringifyException(exception)); + } } } catch (Throwable throwable) { LOG.fatal("Error running child : " + StringUtils.stringifyException(throwable)); if (taskid != null) { - Throwable tCause = throwable.getCause(); - String cause = tCause == null - ? throwable.getMessage() - : StringUtils.stringifyException(tCause); - umbilical.fatalError(taskid, cause); + if (!ShutdownHookManager.get().isShutdownInProgress()) { + Throwable tCause = throwable.getCause(); + String cause = + tCause == null ? throwable.getMessage() : StringUtils + .stringifyException(tCause); + umbilical.fatalError(taskid, cause); + } } } finally { RPC.stopProxy(umbilical); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Aug 19 23:49:39 2014 @@ -28,13 +28,13 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -74,7 +74,9 @@ public class JobHistoryEventHandler exte private int eventCounter; - //TODO Does the FS object need to be different ? + // Those file systems may differ from the job configuration + // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils + // #ensurePathInDefaultFileSystem private FileSystem stagingDirFS; // log Dir FileSystem private FileSystem doneDirFS; // done Dir FileSystem @@ -141,7 +143,7 @@ public class JobHistoryEventHandler exte //Check for the existence of the history staging dir. Maybe create it. try { stagingDirPath = - FileSystem.get(conf).makeQualified(new Path(stagingDirStr)); + FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr)); stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf); mkdir(stagingDirFS, stagingDirPath, new FsPermission( JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS)); @@ -154,7 +156,7 @@ public class JobHistoryEventHandler exte //Check for the existence of intermediate done dir. Path doneDirPath = null; try { - doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr)); + doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr)); doneDirFS = FileSystem.get(doneDirPath.toUri(), conf); // This directory will be in a common location, or this may be a cluster // meant for a single user. Creating based on the conf. Should ideally be @@ -194,7 +196,7 @@ public class JobHistoryEventHandler exte //Check/create user directory under intermediate done dir. try { doneDirPrefixPath = - FileSystem.get(conf).makeQualified(new Path(userDoneDirStr)); + FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr)); mkdir(doneDirFS, doneDirPrefixPath, new FsPermission( JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS)); } catch (IOException e) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Tue Aug 19 23:49:39 2014 @@ -66,4 +66,5 @@ public interface AppContext { boolean hasSuccessfullyUnregistered(); + String getNMHostname(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Aug 19 23:49:39 2014 @@ -41,13 +41,13 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -67,6 +67,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -184,7 +185,6 @@ public class MRAppMaster extends Composi private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; - private final int maxAppAttempts; private Map<TaskId, TaskInfo> completedTasksFromPreviousRun; private List<AMInfo> amInfos; private AppContext context; @@ -199,6 +199,7 @@ public class MRAppMaster extends Composi new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; + private ClassLoader jobClassLoader; private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; @@ -225,14 +226,14 @@ public class MRAppMaster extends Composi public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - long appSubmitTime, int maxAppAttempts) { + long appSubmitTime) { this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, - new SystemClock(), appSubmitTime, maxAppAttempts); + new SystemClock(), appSubmitTime); } public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - Clock clock, long appSubmitTime, int maxAppAttempts) { + Clock clock, long appSubmitTime) { super(MRAppMaster.class.getName()); this.clock = clock; this.startTime = clock.getTime(); @@ -243,25 +244,21 @@ public class MRAppMaster extends Composi this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); - this.maxAppAttempts = maxAppAttempts; logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @Override protected void serviceInit(final Configuration conf) throws Exception { + // create the job classloader if enabled + createJobClassLoader(conf); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); initJobCredentialsAndUGI(conf); context = new RunningAppContext(conf); - ((RunningAppContext)context).computeIsLastAMRetry(); - LOG.info("The specific max attempts: " + maxAppAttempts + - " for application: " + appAttemptID.getApplicationId().getId() + - ". Attempt num: " + appAttemptID.getAttemptId() + - " is last retry: " + isLastAMRetry); - // Job name is the same as the app name util we support DAG of jobs // for an app later appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>"); @@ -393,8 +390,13 @@ public class MRAppMaster extends Composi addIfService(committerEventHandler); //policy handling preemption requests from RM - preemptionPolicy = createPreemptionPolicy(conf); - preemptionPolicy.init(context); + callWithJobClassLoader(conf, new Action<Void>() { + public Void call(Configuration conf) { + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + return null; + } + }); //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); @@ -459,33 +461,37 @@ public class MRAppMaster extends Composi } private OutputCommitter createOutputCommitter(Configuration conf) { - OutputCommitter committer = null; - - LOG.info("OutputCommitter set in config " - + conf.get("mapred.output.committer.class")); - - if (newApiCommitter) { - org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils - .newTaskId(jobId, 0, TaskType.MAP); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils - .newTaskAttemptId(taskID, 0); - TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, - TypeConverter.fromYarn(attemptID)); - OutputFormat outputFormat; - try { - outputFormat = ReflectionUtils.newInstance(taskContext - .getOutputFormatClass(), conf); - committer = outputFormat.getOutputCommitter(taskContext); - } catch (Exception e) { - throw new YarnRuntimeException(e); + return callWithJobClassLoader(conf, new Action<OutputCommitter>() { + public OutputCommitter call(Configuration conf) { + OutputCommitter committer = null; + + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); + + if (newApiCommitter) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = + MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + MRBuilderUtils.newTaskAttemptId(taskID, 0); + TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptID)); + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(taskContext + .getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; } - } else { - committer = ReflectionUtils.newInstance(conf.getClass( - "mapred.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapred.OutputCommitter.class), conf); - } - LOG.info("OutputCommitter is " + committer.getClass().getName()); - return committer; + }); } protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { @@ -600,10 +606,6 @@ public class MRAppMaster extends Composi LOG.warn("Graceful stop failed ", t); } - //Bring the process down by force. - //Not needed after HADOOP-7140 - LOG.info("Exiting MR AppMaster..GoodBye!"); - sysexit(); } private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { @@ -677,38 +679,42 @@ public class MRAppMaster extends Composi return new StagingDirCleaningService(); } - protected Speculator createSpeculator(Configuration conf, AppContext context) { - Class<? extends Speculator> speculatorClass; - - try { - speculatorClass - // "yarn.mapreduce.job.speculator.class" - = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, - DefaultSpeculator.class, - Speculator.class); - Constructor<? extends Speculator> speculatorConstructor - = speculatorClass.getConstructor - (Configuration.class, AppContext.class); - Speculator result = speculatorConstructor.newInstance(conf, context); - - return result; - } catch (InstantiationException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (IllegalAccessException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (InvocationTargetException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (NoSuchMethodException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } + protected Speculator createSpeculator(Configuration conf, + final AppContext context) { + return callWithJobClassLoader(conf, new Action<Speculator>() { + public Speculator call(Configuration conf) { + Class<? extends Speculator> speculatorClass; + try { + speculatorClass + // "yarn.mapreduce.job.speculator.class" + = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, + DefaultSpeculator.class, + Speculator.class); + Constructor<? extends Speculator> speculatorConstructor + = speculatorClass.getConstructor + (Configuration.class, AppContext.class); + Speculator result = speculatorConstructor.newInstance(conf, context); + + return result; + } catch (InstantiationException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (IllegalAccessException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (InvocationTargetException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (NoSuchMethodException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } + } + }); } protected TaskAttemptListener createTaskAttemptListener(AppContext context, @@ -722,7 +728,7 @@ public class MRAppMaster extends Composi protected EventHandler<CommitterEvent> createCommitterEventHandler( AppContext context, OutputCommitter committer) { return new CommitterEventHandler(context, committer, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), jobClassLoader); } protected ContainerAllocator createContainerAllocator( @@ -1009,8 +1015,13 @@ public class MRAppMaster extends Composi successfullyUnregistered.set(true); } - public void computeIsLastAMRetry() { - isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; + public void resetIsLastAMRetry() { + isLastAMRetry = false; + } + + @Override + public String getNMHostname() { + return nmHost; } } @@ -1054,6 +1065,7 @@ public class MRAppMaster extends Composi // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); + boolean initFailed = false; if (!errorHappenedShutDown) { // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); @@ -1062,6 +1074,10 @@ public class MRAppMaster extends Composi // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); + // If job is still not initialized, an error happened during + // initialization. Must complete starting all of the services so failure + // events can be processed. + initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to @@ -1088,10 +1104,16 @@ public class MRAppMaster extends Composi //start all the components super.serviceStart(); - // set job classloader if configured - MRApps.setJobClassLoader(getConfig()); - // All components have started, start the job. - startJobs(); + // finally set the job classloader + MRApps.setClassLoader(jobClassLoader, getConfig()); + + if (initFailed) { + JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); + jobEventDispatcher.handle(initFailedEvent); + } else { + // All components have started, start the job. + startJobs(); + } } @Override @@ -1100,7 +1122,29 @@ public class MRAppMaster extends Composi TaskLog.syncLogsShutdown(logSyncer); } - private void processRecovery() { + private boolean isRecoverySupported() throws IOException { + boolean isSupported = false; + Configuration conf = getConfig(); + if (committer != null) { + final JobContext _jobContext; + if (newApiCommitter) { + _jobContext = new JobContextImpl( + conf, TypeConverter.fromYarn(getJobId())); + } else { + _jobContext = new org.apache.hadoop.mapred.JobContextImpl( + new JobConf(conf), TypeConverter.fromYarn(getJobId())); + } + isSupported = callWithJobClassLoader(conf, + new ExceptionAction<Boolean>() { + public Boolean call(Configuration conf) throws IOException { + return committer.isRecoverySupported(_jobContext); + } + }); + } + return isSupported; + } + + private void processRecovery() throws IOException{ if (appAttemptID.getAttemptId() == 1) { return; // no need to recover on the first attempt } @@ -1108,8 +1152,8 @@ public class MRAppMaster extends Composi boolean recoveryEnabled = getConfig().getBoolean( MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); - boolean recoverySupportedByCommitter = - committer != null && committer.isRecoverySupported(); + + boolean recoverySupportedByCommitter = isRecoverySupported(); // If a shuffle secret was not provided by the job client then this app // attempt will generate one. However that disables recovery if there @@ -1294,7 +1338,7 @@ public class MRAppMaster extends Composi this.conf = config; } @Override - public void handle(SpeculatorEvent event) { + public void handle(final SpeculatorEvent event) { if (disabled) { return; } @@ -1321,7 +1365,12 @@ public class MRAppMaster extends Composi if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. - speculator.handle(event); + callWithJobClassLoader(conf, new Action<Void>() { + public Void call(Configuration conf) { + speculator.handle(event); + return null; + } + }); } } @@ -1362,8 +1411,6 @@ public class MRAppMaster extends Composi System.getenv(Environment.NM_HTTP_PORT.name()); String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); - String maxAppAttempts = - System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV); validateInputParam(containerIdStr, Environment.CONTAINER_ID.name()); @@ -1373,8 +1420,6 @@ public class MRAppMaster extends Composi Environment.NM_HTTP_PORT.name()); validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV); - validateInputParam(maxAppAttempts, - ApplicationConstants.MAX_APP_ATTEMPTS_ENV); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId applicationAttemptId = @@ -1385,8 +1430,7 @@ public class MRAppMaster extends Composi MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), - Integer.parseInt(nodeHttpPortString), appSubmitTime, - Integer.parseInt(maxAppAttempts)); + Integer.parseInt(nodeHttpPortString), appSubmitTime); ShutdownHookManager.get().addShutdownHook( new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); JobConf conf = new JobConf(new YarnConfiguration()); @@ -1486,6 +1530,102 @@ public class MRAppMaster extends Composi }); } + /** + * Creates a job classloader based on the configuration if the job classloader + * is enabled. It is a no-op if the job classloader is not enabled. + */ + private void createJobClassLoader(Configuration conf) throws IOException { + jobClassLoader = MRApps.createJobClassLoader(conf); + } + + /** + * Executes the given action with the job classloader set as the configuration + * classloader as well as the thread context class loader if the job + * classloader is enabled. After the call, the original classloader is + * restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + */ + <T> T callWithJobClassLoader(Configuration conf, Action<T> action) { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Executes the given action that can throw a checked exception with the job + * classloader set as the configuration classloader as well as the thread + * context class loader if the job classloader is enabled. After the call, the + * original classloader is restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + * @throws IOException if the underlying action throws an IOException + * @throws YarnRuntimeException if the underlying action throws an exception + * other than an IOException + */ + <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action) + throws IOException { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } catch (IOException e) { + throw e; + } catch (YarnRuntimeException e) { + throw e; + } catch (Exception e) { + // wrap it with a YarnRuntimeException + throw new YarnRuntimeException(e); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Action to be wrapped with setting and unsetting the job classloader + */ + private static interface Action<T> { + T call(Configuration conf); + } + + private static interface ExceptionAction<T> { + T call(Configuration conf) throws Exception; + } + @Override protected void serviceStop() throws Exception { super.serviceStop(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Tue Aug 19 23:49:39 2014 @@ -131,7 +131,8 @@ public class MRClientService extends Abs } server.start(); - this.bindAddress = NetUtils.getConnectAddress(server); + this.bindAddress = NetUtils.createSocketAddrForHost(appContext.getNMHostname(), + server.getListenerAddress().getPort()); LOG.info("Instantiated MRClientService at " + this.bindAddress); try { // Explicitly disabling SSL for map reduce task as we can't allow MR users Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Tue Aug 19 23:49:39 2014 @@ -68,6 +68,7 @@ public class CommitterEventHandler exten private BlockingQueue<CommitterEvent> eventQueue = new LinkedBlockingQueue<CommitterEvent>(); private final AtomicBoolean stopped; + private final ClassLoader jobClassLoader; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; private long commitWindowMs; @@ -79,11 +80,17 @@ public class CommitterEventHandler exten public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) { + this(context, committer, rmHeartbeatHandler, null); + } + + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) { super("CommitterEventHandler"); this.context = context; this.committer = committer; this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); + this.jobClassLoader = jobClassLoader; } @Override @@ -109,9 +116,23 @@ public class CommitterEventHandler exten @Override protected void serviceStart() throws Exception { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("CommitterEvent Processor #%d") - .build(); + ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder() + .setNameFormat("CommitterEvent Processor #%d"); + if (jobClassLoader != null) { + // if the job classloader is enabled, we need to use the job classloader + // as the thread context classloader (TCCL) of these threads in case the + // committer needs to load another class via TCCL + ThreadFactory backingTf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(jobClassLoader); + return thread; + } + }; + tfBuilder.setThreadFactory(backingTf); + } + ThreadFactory tf = tfBuilder.build(); launcherPool = new ThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); eventHandlingThread = new Thread(new Runnable() { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Aug 19 23:49:39 2014 @@ -28,6 +28,7 @@ public enum JobEventType { //Producer:MRAppMaster JOB_INIT, + JOB_INIT_FAILED, JOB_START, //Producer:Task Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Aug 19 23:49:39 2014 @@ -32,6 +32,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. */ @@ -145,10 +148,10 @@ public class JobImpl implements org.apac private static final Log LOG = LogFactory.getLog(JobImpl.class); //The maximum fraction of fetch failures allowed for a map - private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5; - - // Maximum no. of fetch-failure notifications after which map task is failed - private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; + private float maxAllowedFetchFailuresFraction; + + //Maximum no. of fetch-failure notifications after which map task is failed + private int maxFetchFailuresNotifications; public static final String JOB_KILLED_DIAG = "Job received Kill while in RUNNING state."; @@ -250,9 +253,12 @@ public class JobImpl implements org.apac JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition (JobStateInternal.NEW, - EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), + EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW), JobEventType.JOB_INIT, new InitTransition()) + .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT, + JobEventType.JOB_INIT_FAILED, + new InitFailedTransition()) .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED, JobEventType.JOB_KILL, new KillNewJobTransition()) @@ -265,7 +271,7 @@ public class JobImpl implements org.apac // Ignore-able events .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_UPDATED_NODES) - + // Transitions from INITED state .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -641,8 +647,8 @@ public class JobImpl implements org.apac private JobStateInternal forcedState = null; - //Executor used for running future tasks. Setting thread pool size to 1 - private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + //Executor used for running future tasks. + private ScheduledThreadPoolExecutor executor; private ScheduledFuture failWaitTriggerScheduledFuture; private JobState lastNonFinalState = JobState.NEW; @@ -684,6 +690,13 @@ public class JobImpl implements org.apac this.aclsManager = new JobACLsManager(conf); this.username = System.getProperty("user.name"); this.jobACLs = aclsManager.constructJobACLs(conf); + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("Job Fail Wait Timeout Monitor #%d") + .setDaemon(true) + .build(); + this.executor = new ScheduledThreadPoolExecutor(1, threadFactory); + // This "this leak" is okay because the retained pointer is in an // instance variable. stateMachine = stateMachineFactory.make(this); @@ -691,6 +704,13 @@ public class JobImpl implements org.apac if(forcedDiagnostic != null) { this.diagnostics.add(forcedDiagnostic); } + + this.maxAllowedFetchFailuresFraction = conf.getFloat( + MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION, + MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION); + this.maxFetchFailuresNotifications = conf.getInt( + MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, + MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); } protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { @@ -717,7 +737,7 @@ public class JobImpl implements org.apac if (jobACL == null) { return true; } - return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL); + return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL); } @Override @@ -1205,22 +1225,25 @@ public class JobImpl implements org.apac boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces); boolean smallInput = (dataInputLength <= sysMaxBytes); // ignoring overhead due to UberAM and statics as negligible here: + long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0); + long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0); + long requiredMB = Math.max(requiredMapMB, requiredReduceMB); + int requiredMapCores = conf.getInt( + MRJobConfig.MAP_CPU_VCORES, + MRJobConfig.DEFAULT_MAP_CPU_VCORES); + int requiredReduceCores = conf.getInt( + MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + int requiredCores = Math.max(requiredMapCores, requiredReduceCores); + if (numReduceTasks == 0) { + requiredMB = requiredMapMB; + requiredCores = requiredMapCores; + } boolean smallMemory = - ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), - conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0)) - <= sysMemSizeForUberSlot) - || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)); - boolean smallCpu = - ( - Math.max( - conf.getInt( - MRJobConfig.MAP_CPU_VCORES, - MRJobConfig.DEFAULT_MAP_CPU_VCORES), - conf.getInt( - MRJobConfig.REDUCE_CPU_VCORES, - MRJobConfig.DEFAULT_REDUCE_CPU_VCORES)) - <= sysCPUSizeForUberSlot - ); + (requiredMB <= sysMemSizeForUberSlot) + || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT); + + boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot; boolean notChainJob = !isChainJob(conf); // User has overall veto power over uberization, or user can modify @@ -1285,6 +1308,7 @@ public class JobImpl implements org.apac } } catch (ClassNotFoundException cnfe) { // don't care; assume it's not derived from ChainMapper + } catch (NoClassDefFoundError ignored) { } try { String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR); @@ -1295,6 +1319,7 @@ public class JobImpl implements org.apac } } catch (ClassNotFoundException cnfe) { // don't care; assume it's not derived from ChainReducer + } catch (NoClassDefFoundError ignored) { } return isChainJob; } @@ -1374,6 +1399,15 @@ public class JobImpl implements org.apac public JobStateInternal transition(JobImpl job, JobEvent event) { job.metrics.submittedJob(job); job.metrics.preparingJob(job); + + if (job.newApiCommitter) { + job.jobContext = new JobContextImpl(job.conf, + job.oldJobId); + } else { + job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( + job.conf, job.oldJobId); + } + try { setup(job); job.fs = job.getFileSystem(job.conf); @@ -1409,14 +1443,6 @@ public class JobImpl implements org.apac checkTaskLimits(); - if (job.newApiCommitter) { - job.jobContext = new JobContextImpl(job.conf, - job.oldJobId); - } else { - job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( - job.conf, job.oldJobId); - } - long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); @@ -1443,15 +1469,14 @@ public class JobImpl implements org.apac job.metrics.endPreparingJob(job); return JobStateInternal.INITED; - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAILED; + // Leave job in the NEW state. The MR AM will detect that the state is + // not INITED and send a JOB_INIT_FAILED event. + return JobStateInternal.NEW; } } @@ -1552,6 +1577,16 @@ public class JobImpl implements org.apac } } // end of InitTransition + private static class InitFailedTransition + implements SingleArcTransition<JobImpl, JobEvent> { + @Override + public void transition(JobImpl job, JobEvent event) { + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, + org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + } + } + private static class SetupCompletedTransition implements SingleArcTransition<JobImpl, JobEvent> { @Override @@ -1872,9 +1907,8 @@ public class JobImpl implements org.apac float failureRate = shufflingReduceTasks == 0 ? 1.0f : (float) fetchFailures / shufflingReduceTasks; // declare faulty if fetch-failures >= max-allowed-failures - boolean isMapFaulty = - (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION); - if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) { + if (fetchFailures >= job.getMaxFetchFailuresNotifications() + && failureRate >= job.getMaxAllowedFetchFailuresFraction()) { LOG.info("Too many fetch-failures for output of task attempt: " + mapId + " ... raising fetch failure to map"); job.eventHandler.handle(new TaskAttemptEvent(mapId, @@ -2157,4 +2191,12 @@ public class JobImpl implements org.apac jobConf.addResource(fc.open(confPath), confPath.toString()); return jobConf; } + + public float getMaxAllowedFetchFailuresFraction() { + return maxAllowedFetchFailuresFraction; + } + + public int getMaxFetchFailuresNotifications() { + return maxFetchFailuresNotifications; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Aug 19 23:49:39 2014 @@ -335,6 +335,15 @@ public abstract class TaskAttemptImpl im .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) + // AM is likely to receive duplicate TA_COMMIT_PENDINGs as the task attempt + // will re-send the commit message until it doesn't encounter any + // IOException and succeeds in delivering the commit message. + // Ignoring the duplicate commit message is a short-term fix. In long term, + // we need to make use of retry cache to help this and other MR protocol + // APIs that can be considered as @AtMostOnce. + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptEventType.TA_COMMIT_PENDING) // Transitions from SUCCESS_CONTAINER_CLEANUP state // kill and cleanup the container Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Aug 19 23:49:39 2014 @@ -249,8 +249,16 @@ public abstract class TaskImpl implement TaskEventType.T_ATTEMPT_SUCCEEDED)) // Transitions from KILLED state + // There could be a race condition where TaskImpl might receive + // T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt. + // a. The task is in KILL_WAIT. + // b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event. + // c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED + // to the task. The task transitions to KILLED state. + // d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task. .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of(TaskEventType.T_KILL, + TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ADD_SPEC_ATTEMPT)) // create the topology tables Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Aug 19 23:49:39 2014 @@ -64,6 +64,7 @@ public class LocalContainerAllocator ext private int nmPort; private int nmHttpPort; private ContainerId containerId; + protected int lastResponseID; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -119,6 +120,11 @@ public class LocalContainerAllocator ext if (allocateResponse.getAMCommand() != null) { switch(allocateResponse.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + this.lastResponseID = 0; + register(); + break; case AM_SHUTDOWN: LOG.info("Event from RM: shutting down Application Master"); // This can happen if the RM has been restarted. If it is in that state, Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Aug 19 23:49:39 2014 @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -185,7 +186,7 @@ public abstract class RMCommunicator ext // if unregistration failed, isLastAMRetry needs to be recalculated // to see whether AM really has the chance to retry RunningAppContext raContext = (RunningAppContext) context; - raContext.computeIsLastAMRetry(); + raContext.resetIsLastAMRetry(); } } @@ -216,20 +217,27 @@ public abstract class RMCommunicator ext FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); - while (true) { - FinishApplicationMasterResponse response = - scheduler.finishApplicationMaster(request); - if (response.getIsUnregistered()) { - // When excepting ClientService, other services are already stopped, - // it is safe to let clients know the final states. ClientService - // should wait for some time so clients have enough time to know the - // final states. - RunningAppContext raContext = (RunningAppContext) context; - raContext.markSuccessfulUnregistration(); - break; + try { + while (true) { + FinishApplicationMasterResponse response = + scheduler.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + // When excepting ClientService, other services are already stopped, + // it is safe to let clients know the final states. ClientService + // should wait for some time so clients have enough time to know the + // final states. + RunningAppContext raContext = (RunningAppContext) context; + raContext.markSuccessfulUnregistration(); + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(rmPollInterval); } - LOG.info("Waiting for application to be successfully unregistered."); - Thread.sleep(rmPollInterval); + } catch (ApplicationMasterNotRegisteredException e) { + // RM might have restarted or failed over and so lost the fact that AM had + // registered before. + register(); + doUnregistration(); } }