Author: subru Date: Fri Jul 25 20:33:09 2014 New Revision: 1613514 URL: http://svn.apache.org/r1613514 Log: syncing YARN-1051 branch with trunk
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1609878-1613507 Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt Fri Jul 25 20:33:09 2014 @@ -17,6 +17,9 @@ Trunk (Unreleased) MAPREDUCE-5232. Add a configuration to be able to log classpath and other system properties on mapreduce JVMs startup. (Sangjin Lee via vinodkv) + MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving + RM-restart. (Rohith via jianhe) + IMPROVEMENTS MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk) @@ -153,6 +156,12 @@ Release 2.6.0 - UNRELEASED IMPROVEMENTS + MAPREDUCE-5971. Move the default options for distcp -p to + DistCpOptionSwitch. (clamb via wang) + + MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with + compatible/incompatible changes (Junping Du via jlowe) + OPTIMIZATIONS BUG FIXES @@ -163,6 +172,12 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. (Wangda Tan via zjshen) + MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader + enabled if custom output format/committer is used (Sangjin Lee via jlowe) + + MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories + in its results (Jason Dere via jlowe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -237,6 +252,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5844. Add a configurable delay to reducer-preemption. (Maysam Yabandeh via kasha) + MAPREDUCE-5790. Made it easier to enable hprof profile options by default. + (Gera Shegalov via vinodkv) + OPTIMIZATIONS BUG FIXES @@ -304,6 +322,9 @@ Release 2.5.0 - UNRELEASED resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via vinodkv) + MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly + assumes a single dir for mapOutIndex. (Gera Shegalov via kasha) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1609878-1613507 Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh Fri Jul 25 20:33:09 2014 @@ -133,6 +133,7 @@ case $startStop in else echo no $command to stop fi + rm -f $pid else echo no $command to stop fi Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Fri Jul 25 20:33:09 2014 @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSError; @@ -438,43 +439,6 @@ public class LocalContainerLauncher exte } /** - * Within the _local_ filesystem (not HDFS), all activity takes place within - * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/), - * and all sub-MapTasks create the same filename ("file.out"). Rename that - * to something unique (e.g., "map_0.out") to avoid collisions. - * - * Longer-term, we'll modify [something] to use TaskAttemptID-based - * filenames instead of "file.out". (All of this is entirely internal, - * so there are no particular compatibility issues.) - */ - private MapOutputFile renameMapOutputForReduce(JobConf conf, - TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { - FileSystem localFs = FileSystem.getLocal(conf); - // move map output to reduce input - Path mapOut = subMapOutputFile.getOutputFile(); - FileStatus mStatus = localFs.getFileStatus(mapOut); - Path reduceIn = subMapOutputFile.getInputFileForWrite( - TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); - Path mapOutIndex = new Path(mapOut.toString() + ".index"); - Path reduceInIndex = new Path(reduceIn.toString() + ".index"); - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming map output file for task attempt " - + mapId.toString() + " from original location " + mapOut.toString() - + " to destination " + reduceIn.toString()); - } - if (!localFs.mkdirs(reduceIn.getParent())) { - throw new IOException("Mkdirs failed to create " - + reduceIn.getParent().toString()); - } - if (!localFs.rename(mapOut, reduceIn)) - throw new IOException("Couldn't rename " + mapOut); - if (!localFs.rename(mapOutIndex, reduceInIndex)) - throw new IOException("Couldn't rename " + mapOutIndex); - - return new RenamedMapOutputFile(reduceIn); - } - - /** * Also within the local filesystem, we need to restore the initial state * of the directory as much as possible. Compare current contents against * the saved original state and nuke everything that doesn't belong, with @@ -506,7 +470,46 @@ public class LocalContainerLauncher exte } } // end EventHandler - + + /** + * Within the _local_ filesystem (not HDFS), all activity takes place within + * a subdir inside one of the LOCAL_DIRS + * (${local.dir}/usercache/$user/appcache/$appId/$contId/), + * and all sub-MapTasks create the same filename ("file.out"). Rename that + * to something unique (e.g., "map_0.out") to avoid possible collisions. + * + * Longer-term, we'll modify [something] to use TaskAttemptID-based + * filenames instead of "file.out". (All of this is entirely internal, + * so there are no particular compatibility issues.) + */ + @VisibleForTesting + protected static MapOutputFile renameMapOutputForReduce(JobConf conf, + TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + // move map output to reduce input + Path mapOut = subMapOutputFile.getOutputFile(); + FileStatus mStatus = localFs.getFileStatus(mapOut); + Path reduceIn = subMapOutputFile.getInputFileForWrite( + TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); + Path mapOutIndex = subMapOutputFile.getOutputIndexFile(); + Path reduceInIndex = new Path(reduceIn.toString() + ".index"); + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming map output file for task attempt " + + mapId.toString() + " from original location " + mapOut.toString() + + " to destination " + reduceIn.toString()); + } + if (!localFs.mkdirs(reduceIn.getParent())) { + throw new IOException("Mkdirs failed to create " + + reduceIn.getParent().toString()); + } + if (!localFs.rename(mapOut, reduceIn)) + throw new IOException("Couldn't rename " + mapOut); + if (!localFs.rename(mapOutIndex, reduceInIndex)) + throw new IOException("Couldn't rename " + mapOutIndex); + + return new RenamedMapOutputFile(reduceIn); + } + private static class RenamedMapOutputFile extends MapOutputFile { private Path path; Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jul 25 20:33:09 2014 @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; @@ -200,6 +199,7 @@ public class MRAppMaster extends Composi new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; + private ClassLoader jobClassLoader; private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; @@ -250,6 +250,9 @@ public class MRAppMaster extends Composi @Override protected void serviceInit(final Configuration conf) throws Exception { + // create the job classloader if enabled + createJobClassLoader(conf); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); initJobCredentialsAndUGI(conf); @@ -387,8 +390,13 @@ public class MRAppMaster extends Composi addIfService(committerEventHandler); //policy handling preemption requests from RM - preemptionPolicy = createPreemptionPolicy(conf); - preemptionPolicy.init(context); + callWithJobClassLoader(conf, new Action<Void>() { + public Void call(Configuration conf) { + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + return null; + } + }); //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); @@ -453,33 +461,37 @@ public class MRAppMaster extends Composi } private OutputCommitter createOutputCommitter(Configuration conf) { - OutputCommitter committer = null; - - LOG.info("OutputCommitter set in config " - + conf.get("mapred.output.committer.class")); - - if (newApiCommitter) { - org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils - .newTaskId(jobId, 0, TaskType.MAP); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils - .newTaskAttemptId(taskID, 0); - TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, - TypeConverter.fromYarn(attemptID)); - OutputFormat outputFormat; - try { - outputFormat = ReflectionUtils.newInstance(taskContext - .getOutputFormatClass(), conf); - committer = outputFormat.getOutputCommitter(taskContext); - } catch (Exception e) { - throw new YarnRuntimeException(e); + return callWithJobClassLoader(conf, new Action<OutputCommitter>() { + public OutputCommitter call(Configuration conf) { + OutputCommitter committer = null; + + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); + + if (newApiCommitter) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = + MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + MRBuilderUtils.newTaskAttemptId(taskID, 0); + TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptID)); + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(taskContext + .getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; } - } else { - committer = ReflectionUtils.newInstance(conf.getClass( - "mapred.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapred.OutputCommitter.class), conf); - } - LOG.info("OutputCommitter is " + committer.getClass().getName()); - return committer; + }); } protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { @@ -667,38 +679,42 @@ public class MRAppMaster extends Composi return new StagingDirCleaningService(); } - protected Speculator createSpeculator(Configuration conf, AppContext context) { - Class<? extends Speculator> speculatorClass; - - try { - speculatorClass - // "yarn.mapreduce.job.speculator.class" - = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, - DefaultSpeculator.class, - Speculator.class); - Constructor<? extends Speculator> speculatorConstructor - = speculatorClass.getConstructor - (Configuration.class, AppContext.class); - Speculator result = speculatorConstructor.newInstance(conf, context); - - return result; - } catch (InstantiationException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (IllegalAccessException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (InvocationTargetException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (NoSuchMethodException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } + protected Speculator createSpeculator(Configuration conf, + final AppContext context) { + return callWithJobClassLoader(conf, new Action<Speculator>() { + public Speculator call(Configuration conf) { + Class<? extends Speculator> speculatorClass; + try { + speculatorClass + // "yarn.mapreduce.job.speculator.class" + = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, + DefaultSpeculator.class, + Speculator.class); + Constructor<? extends Speculator> speculatorConstructor + = speculatorClass.getConstructor + (Configuration.class, AppContext.class); + Speculator result = speculatorConstructor.newInstance(conf, context); + + return result; + } catch (InstantiationException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (IllegalAccessException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (InvocationTargetException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (NoSuchMethodException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } + } + }); } protected TaskAttemptListener createTaskAttemptListener(AppContext context, @@ -712,7 +728,7 @@ public class MRAppMaster extends Composi protected EventHandler<CommitterEvent> createCommitterEventHandler( AppContext context, OutputCommitter committer) { return new CommitterEventHandler(context, committer, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), jobClassLoader); } protected ContainerAllocator createContainerAllocator( @@ -1083,8 +1099,8 @@ public class MRAppMaster extends Composi //start all the components super.serviceStart(); - // set job classloader if configured - MRApps.setJobClassLoader(getConfig()); + // finally set the job classloader + MRApps.setClassLoader(jobClassLoader, getConfig()); if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); @@ -1101,19 +1117,24 @@ public class MRAppMaster extends Composi TaskLog.syncLogsShutdown(logSyncer); } - private boolean isRecoverySupported(OutputCommitter committer2) - throws IOException { + private boolean isRecoverySupported() throws IOException { boolean isSupported = false; - JobContext _jobContext; + Configuration conf = getConfig(); if (committer != null) { + final JobContext _jobContext; if (newApiCommitter) { _jobContext = new JobContextImpl( - getConfig(), TypeConverter.fromYarn(getJobId())); + conf, TypeConverter.fromYarn(getJobId())); } else { _jobContext = new org.apache.hadoop.mapred.JobContextImpl( - new JobConf(getConfig()), TypeConverter.fromYarn(getJobId())); + new JobConf(conf), TypeConverter.fromYarn(getJobId())); } - isSupported = committer.isRecoverySupported(_jobContext); + isSupported = callWithJobClassLoader(conf, + new ExceptionAction<Boolean>() { + public Boolean call(Configuration conf) throws IOException { + return committer.isRecoverySupported(_jobContext); + } + }); } return isSupported; } @@ -1127,7 +1148,7 @@ public class MRAppMaster extends Composi MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); - boolean recoverySupportedByCommitter = isRecoverySupported(committer); + boolean recoverySupportedByCommitter = isRecoverySupported(); // If a shuffle secret was not provided by the job client then this app // attempt will generate one. However that disables recovery if there @@ -1312,7 +1333,7 @@ public class MRAppMaster extends Composi this.conf = config; } @Override - public void handle(SpeculatorEvent event) { + public void handle(final SpeculatorEvent event) { if (disabled) { return; } @@ -1339,7 +1360,12 @@ public class MRAppMaster extends Composi if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. - speculator.handle(event); + callWithJobClassLoader(conf, new Action<Void>() { + public Void call(Configuration conf) { + speculator.handle(event); + return null; + } + }); } } @@ -1499,6 +1525,102 @@ public class MRAppMaster extends Composi }); } + /** + * Creates a job classloader based on the configuration if the job classloader + * is enabled. It is a no-op if the job classloader is not enabled. + */ + private void createJobClassLoader(Configuration conf) throws IOException { + jobClassLoader = MRApps.createJobClassLoader(conf); + } + + /** + * Executes the given action with the job classloader set as the configuration + * classloader as well as the thread context class loader if the job + * classloader is enabled. After the call, the original classloader is + * restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + */ + <T> T callWithJobClassLoader(Configuration conf, Action<T> action) { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Executes the given action that can throw a checked exception with the job + * classloader set as the configuration classloader as well as the thread + * context class loader if the job classloader is enabled. After the call, the + * original classloader is restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + * @throws IOException if the underlying action throws an IOException + * @throws YarnRuntimeException if the underlying action throws an exception + * other than an IOException + */ + <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action) + throws IOException { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } catch (IOException e) { + throw e; + } catch (YarnRuntimeException e) { + throw e; + } catch (Exception e) { + // wrap it with a YarnRuntimeException + throw new YarnRuntimeException(e); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Action to be wrapped with setting and unsetting the job classloader + */ + private static interface Action<T> { + T call(Configuration conf); + } + + private static interface ExceptionAction<T> { + T call(Configuration conf) throws Exception; + } + @Override protected void serviceStop() throws Exception { super.serviceStop(); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Fri Jul 25 20:33:09 2014 @@ -68,6 +68,7 @@ public class CommitterEventHandler exten private BlockingQueue<CommitterEvent> eventQueue = new LinkedBlockingQueue<CommitterEvent>(); private final AtomicBoolean stopped; + private final ClassLoader jobClassLoader; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; private long commitWindowMs; @@ -79,11 +80,17 @@ public class CommitterEventHandler exten public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) { + this(context, committer, rmHeartbeatHandler, null); + } + + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) { super("CommitterEventHandler"); this.context = context; this.committer = committer; this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); + this.jobClassLoader = jobClassLoader; } @Override @@ -109,9 +116,23 @@ public class CommitterEventHandler exten @Override protected void serviceStart() throws Exception { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("CommitterEvent Processor #%d") - .build(); + ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder() + .setNameFormat("CommitterEvent Processor #%d"); + if (jobClassLoader != null) { + // if the job classloader is enabled, we need to use the job classloader + // as the thread context classloader (TCCL) of these threads in case the + // committer needs to load another class via TCCL + ThreadFactory backingTf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(jobClassLoader); + return thread; + } + }; + tfBuilder.setThreadFactory(backingTf); + } + ThreadFactory tf = tfBuilder.build(); launcherPool = new ThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); eventHandlingThread = new Thread(new Runnable() { Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Jul 25 20:33:09 2014 @@ -64,6 +64,7 @@ public class LocalContainerAllocator ext private int nmPort; private int nmHttpPort; private ContainerId containerId; + protected int lastResponseID; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -119,6 +120,11 @@ public class LocalContainerAllocator ext if (allocateResponse.getAMCommand() != null) { switch(allocateResponse.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + this.lastResponseID = 0; + register(); + break; case AM_SHUTDOWN: LOG.info("Event from RM: shutting down Application Master"); // This can happen if the RM has been restarted. If it is in that state, Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jul 25 20:33:09 2014 @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -216,20 +217,27 @@ public abstract class RMCommunicator ext FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); - while (true) { - FinishApplicationMasterResponse response = - scheduler.finishApplicationMaster(request); - if (response.getIsUnregistered()) { - // When excepting ClientService, other services are already stopped, - // it is safe to let clients know the final states. ClientService - // should wait for some time so clients have enough time to know the - // final states. - RunningAppContext raContext = (RunningAppContext) context; - raContext.markSuccessfulUnregistration(); - break; + try { + while (true) { + FinishApplicationMasterResponse response = + scheduler.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + // When excepting ClientService, other services are already stopped, + // it is safe to let clients know the final states. ClientService + // should wait for some time so clients have enough time to know the + // final states. + RunningAppContext raContext = (RunningAppContext) context; + raContext.markSuccessfulUnregistration(); + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(rmPollInterval); } - LOG.info("Waiting for application to be successfully unregistered."); - Thread.sleep(rmPollInterval); + } catch (ApplicationMasterNotRegisteredException e) { + // RM might have restarted or failed over and so lost the fact that AM had + // registered before. + register(); + doUnregistration(); } } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Jul 25 20:33:09 2014 @@ -389,6 +389,7 @@ public class RMContainerAllocator extend removed = true; assignedRequests.remove(aId); containersReleased++; + pendingRelease.add(containerId); release(containerId); } } @@ -641,6 +642,15 @@ public class RMContainerAllocator extend if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this + // application + register(); + addOutstandingRequestOnResync(); + break; case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. @@ -700,6 +710,7 @@ public class RMContainerAllocator extend LOG.error("Container complete event for unknown container id " + cont.getContainerId()); } else { + pendingRelease.remove(cont.getContainerId()); assignedRequests.remove(attemptID); // send the container completed event to Task attempt @@ -991,6 +1002,7 @@ public class RMContainerAllocator extend private void containerNotAssigned(Container allocated) { containersReleased++; + pendingRelease.add(allocated.getId()); release(allocated.getId()); } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Jul 25 20:33:09 2014 @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,7 +59,7 @@ public abstract class RMContainerRequest private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; private Resource availableResources; private final RecordFactory recordFactory = @@ -77,8 +78,11 @@ public abstract class RMContainerRequest // numContainers dont end up as duplicates private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); - private final Set<ContainerId> release = new TreeSet<ContainerId>(); - + private final Set<ContainerId> release = new TreeSet<ContainerId>(); + // pendingRelease holds history or release requests.request is removed only if + // RM sends completedContainer. + // How it different from release? --> release is for per allocate() request. + protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); @@ -186,6 +190,10 @@ public abstract class RMContainerRequest } catch (YarnException e) { throw new IOException(e); } + + if (isResyncCommand(allocateResponse)) { + return allocateResponse; + } lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -214,6 +222,28 @@ public abstract class RMContainerRequest return allocateResponse; } + protected boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + + protected void addOutstandingRequestOnResync() { + for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable + .values()) { + for (Map<Resource, ResourceRequest> capabalities : rr.values()) { + for (ResourceRequest request : capabalities.values()) { + addResourceRequestToAsk(request); + } + } + } + if (!ignoreBlacklisting.get()) { + blacklistAdditions.addAll(blacklistedNodes); + } + if (!pendingRelease.isEmpty()) { + release.addAll(pendingRelease); + } + } + // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is // currently based on hosts. Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Fri Jul 25 20:33:09 2014 @@ -18,17 +18,26 @@ package org.apache.hadoop.mapred; +import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer; public class TestLocalContainerLauncher { private static final Log LOG = LogFactory.getLog(TestLocalContainerLauncher.class); + private static File testWorkDir; + private static final String[] localDirs = new String[2]; + + private static void delete(File dir) throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path p = fs.makeQualified(new Path(dir.getAbsolutePath())); + fs.delete(p, true); + } + + @BeforeClass + public static void setupTestDirs() throws IOException { + testWorkDir = new File("target", + TestLocalContainerLauncher.class.getCanonicalName()); + testWorkDir.delete(); + testWorkDir.mkdirs(); + testWorkDir = testWorkDir.getAbsoluteFile(); + for (int i = 0; i < localDirs.length; i++) { + final File dir = new File(testWorkDir, "local-" + i); + dir.mkdirs(); + localDirs[i] = dir.toString(); + } + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testWorkDir != null) { + delete(testWorkDir); + } + } @SuppressWarnings("rawtypes") @Test(timeout=10000) @@ -141,4 +183,35 @@ public class TestLocalContainerLauncher when(container.getNodeId()).thenReturn(nodeId); return container; } + + + @Test + public void testRenameMapOutputForReduce() throws Exception { + final JobConf conf = new JobConf(); + + final MROutputFiles mrOutputFiles = new MROutputFiles(); + mrOutputFiles.setConf(conf); + + // make sure both dirs are distinct + // + conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString()); + final Path mapOut = mrOutputFiles.getOutputFileForWrite(1); + conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString()); + final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1); + Assert.assertNotEquals("Paths must be different!", + mapOut.getParent(), mapOutIdx.getParent()); + + // make both dirs part of LOCAL_DIR + conf.setStrings(MRConfig.LOCAL_DIR, localDirs); + + final FileContext lfc = FileContext.getLocalFSFileContext(conf); + lfc.create(mapOut, EnumSet.of(CREATE)).close(); + lfc.create(mapOutIdx, EnumSet.of(CREATE)).close(); + + final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2); + final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0); + + LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Fri Jul 25 20:33:09 2014 @@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -95,9 +97,13 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -618,6 +624,10 @@ public class TestRMContainerAllocator { super(conf); } + public MyResourceManager(Configuration conf, RMStateStore store) { + super(conf, store); + } + @Override public void serviceStart() throws Exception { super.serviceStart(); @@ -1426,6 +1436,13 @@ public class TestRMContainerAllocator { rm.getMyFifoScheduler().lastBlacklistRemovals.size()); } + private static void assertAsksAndReleases(int expectedAsk, + int expectedRelease, MyResourceManager rm) { + Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size()); + Assert.assertEquals(expectedRelease, + rm.getMyFifoScheduler().lastRelease.size()); + } + private static class MyFifoScheduler extends FifoScheduler { public MyFifoScheduler(RMContext rmContext) { @@ -1440,6 +1457,7 @@ public class TestRMContainerAllocator { } List<ResourceRequest> lastAsk = null; + List<ContainerId> lastRelease = null; List<String> lastBlacklistAdditions; List<String> lastBlacklistRemovals; @@ -1458,6 +1476,7 @@ public class TestRMContainerAllocator { askCopy.add(reqCopy); } lastAsk = ask; + lastRelease = release; lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate( @@ -1505,6 +1524,20 @@ public class TestRMContainerAllocator { return new ContainerFailedEvent(attemptId, host); } + private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, + int taskAttemptId, boolean reduce) { + TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = + MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); + return new ContainerAllocatorEvent(attemptId, + ContainerAllocator.EventType.CONTAINER_DEALLOCATE); + } + private void checkAssignments(ContainerRequestEvent[] requests, List<TaskAttemptContainerAssignedEvent> assignments, boolean checkHostMatch) { @@ -1557,6 +1590,7 @@ public class TestRMContainerAllocator { = new ArrayList<JobUpdatedNodesEvent>(); private MyResourceManager rm; private boolean isUnregistered = false; + private AllocateResponse allocateResponse; private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { AppContext context = mock(AppContext.class); @@ -1668,6 +1702,10 @@ public class TestRMContainerAllocator { super.handleEvent(f); } + public void sendDeallocate(ContainerAllocatorEvent f) { + super.handleEvent(f); + } + // API to be used by tests public List<TaskAttemptContainerAssignedEvent> schedule() throws Exception { @@ -1713,6 +1751,20 @@ public class TestRMContainerAllocator { public boolean isUnregistered() { return isUnregistered; } + + public void updateSchedulerProxy(MyResourceManager rm) { + scheduler = rm.getApplicationMasterService(); + } + + @Override + protected AllocateResponse makeRemoteRequest() throws IOException { + allocateResponse = super.makeRemoteRequest(); + return allocateResponse; + } + + public boolean isResyncCommand() { + return super.isResyncCommand(allocateResponse); + } } @Test @@ -2022,6 +2074,198 @@ public class TestRMContainerAllocator { Assert.assertTrue(allocator.isUnregistered()); } + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 + // blackListeNode + // Step-2 : 2 containers are allocated by RM. + // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to + // RM + // Step-4 : On RM restart, AM(does not know RM is restarted) sends + // additional containerRequest(event4) and blacklisted nodes. + // Intern RM send resync command + // Step-5 : On Resync,AM sends all outstanding + // asks,release,blacklistAaddition + // and another containerRequest(event5) + // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 + @Test + public void testRMContainerAllocatorResendsRequestsOnRMRestart() + throws Exception { + + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + + conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); + conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + conf.setInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MyResourceManager rm1 = new MyResourceManager(conf, memStore); + rm1.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm1.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm1.submitApp(1024); + dispatcher.await(); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + rm1.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = + new MyContainerAllocator(rm1, conf, appAttemptId, mockJob); + + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 + // blackListeNode + // create the container request + // send MAP request + ContainerRequestEvent event1 = + createReq(jobId, 1, 1024, new String[] { "h1" }); + allocator.sendRequest(event1); + + ContainerRequestEvent event2 = + createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); + allocator.sendRequest(event2); + + // Send events to blacklist h2 + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false); + allocator.sendFailure(f1); + + // send allocate request and 1 blacklisted nodes + List<TaskAttemptContainerAssignedEvent> assignedContainers = + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, + assignedContainers.size()); + // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed + assertAsksAndReleases(3, 0, rm1); + assertBlacklistAdditionsAndRemovals(1, 0, rm1); + + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + // Step-2 : 2 containers are allocated by RM. + assignedContainers = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 2", 2, + assignedContainers.size()); + assertAsksAndReleases(0, 0, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + assignedContainers = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, + assignedContainers.size()); + assertAsksAndReleases(3, 0, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to + // RM + // send container request + ContainerRequestEvent event3 = + createReq(jobId, 3, 1000, new String[] { "h1" }); + allocator.sendRequest(event3); + + // send deallocate request + ContainerAllocatorEvent deallocate1 = + createDeallocateEvent(jobId, 1, false); + allocator.sendDeallocate(deallocate1); + + assignedContainers = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, + assignedContainers.size()); + assertAsksAndReleases(3, 1, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + // Phase-2 start 2nd RM is up + MyResourceManager rm2 = new MyResourceManager(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + allocator.updateSchedulerProxy(rm2); + dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); + + // NM should be rebooted on heartbeat, even first heartbeat for nm2 + NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); + + // new NM to represent NM re-register + nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + dispatcher.await(); + + // Step-4 : On RM restart, AM(does not know RM is restarted) sends + // additional containerRequest(event4) and blacklisted nodes. + // Intern RM send resync command + + // send deallocate request, release=1 + ContainerAllocatorEvent deallocate2 = + createDeallocateEvent(jobId, 2, false); + allocator.sendDeallocate(deallocate2); + + // Send events to blacklist nodes h3 + ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false); + allocator.sendFailure(f2); + + ContainerRequestEvent event4 = + createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); + allocator.sendRequest(event4); + + // send allocate request to 2nd RM and get resync command + allocator.schedule(); + dispatcher.await(); + Assert.assertTrue("Last allocate response is not RESYNC", + allocator.isResyncCommand()); + + // Step-5 : On Resync,AM sends all outstanding + // asks,release,blacklistAaddition + // and another containerRequest(event5) + ContainerRequestEvent event5 = + createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); + allocator.sendRequest(event5); + + // send all outstanding request again. + assignedContainers = allocator.schedule(); + dispatcher.await(); + assertAsksAndReleases(3, 2, rm2); + assertBlacklistAdditionsAndRemovals(2, 0, rm2); + + nm1.nodeHeartbeat(true); + dispatcher.await(); + + // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 + assignedContainers = allocator.schedule(); + dispatcher.await(); + + Assert.assertEquals("Number of container should be 3", 3, + assignedContainers.size()); + + for (TaskAttemptContainerAssignedEvent assig : assignedContainers) { + Assert.assertTrue("Assigned count not correct", + "h1".equals(assig.getContainer().getNodeId().getHost())); + } + + rm1.stop(); + rm2.stop(); + + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Jul 25 20:33:09 2014 @@ -327,8 +327,8 @@ public class MRApps extends Apps { } /** - * Sets a {@link ApplicationClassLoader} on the given configuration and as - * the context classloader, if + * Creates and sets a {@link ApplicationClassLoader} on the given + * configuration and as the thread context classloader, if * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and * the APP_CLASSPATH environment variable is set. * @param conf @@ -336,24 +336,52 @@ public class MRApps extends Apps { */ public static void setJobClassLoader(Configuration conf) throws IOException { + setClassLoader(createJobClassLoader(conf), conf); + } + + /** + * Creates a {@link ApplicationClassLoader} if + * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and + * the APP_CLASSPATH environment variable is set. + * @param conf + * @returns the created job classloader, or null if the job classloader is not + * enabled or the APP_CLASSPATH environment variable is not set + * @throws IOException + */ + public static ClassLoader createJobClassLoader(Configuration conf) + throws IOException { + ClassLoader jobClassLoader = null; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); if (appClasspath == null) { - LOG.warn("Not using job classloader since APP_CLASSPATH is not set."); + LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); } else { - LOG.info("Using job classloader"); + LOG.info("Creating job classloader"); if (LOG.isDebugEnabled()) { LOG.debug("APP_CLASSPATH=" + appClasspath); } String[] systemClasses = getSystemClasses(conf); - ClassLoader jobClassLoader = createJobClassLoader(appClasspath, + jobClassLoader = createJobClassLoader(appClasspath, systemClasses); - if (jobClassLoader != null) { - conf.setClassLoader(jobClassLoader); - Thread.currentThread().setContextClassLoader(jobClassLoader); - } } } + return jobClassLoader; + } + + /** + * Sets the provided classloader on the given configuration and as the thread + * context classloader if the classloader is not null. + * @param classLoader + * @param conf + */ + public static void setClassLoader(ClassLoader classLoader, + Configuration conf) { + if (classLoader != null) { + LOG.info("Setting classloader " + classLoader.getClass().getName() + + " on the configuration and as the thread context classloader"); + conf.setClassLoader(classLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } } @VisibleForTesting Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Jul 25 20:33:09 2014 @@ -579,7 +579,7 @@ public abstract class CombineFileInputFo blocks = new OneBlockInfo[0]; } else { - if(locations.length == 0) { + if(locations.length == 0 && !stat.isDirectory()) { locations = new BlockLocation[] { new BlockLocation() }; } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jul 25 20:33:09 2014 @@ -671,7 +671,7 @@ <property> <name>mapreduce.task.profile.params</name> - <value></value> + <value>-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s</value> <description>JVM profiler parameters used to profile map and reduce task attempts. This string may contain a single format specifier %s that will be replaced by the path to profile.out in the task attempt log directory. Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1605892-1613507 Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java Fri Jul 25 20:33:09 2014 @@ -29,11 +29,7 @@ public class TestJobConf { @Test public void testProfileParamsDefaults() { JobConf configuration = new JobConf(); - - Assert.assertNull(configuration.get(MRJobConfig.TASK_PROFILE_PARAMS)); - String result = configuration.getProfileParams(); - Assert.assertNotNull(result); Assert.assertTrue(result.contains("file=%s")); Assert.assertTrue(result.startsWith("-agentlib:hprof")); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri Jul 25 20:33:09 2014 @@ -1275,6 +1275,61 @@ public class TestCombineFileInputFormat } /** + * Test that directories do not get included as part of getSplits() + */ + @Test + public void testGetSplitsWithDirectory() throws Exception { + MiniDFSCluster dfs = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1) + .build(); + dfs.waitActive(); + + dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1) + .build(); + dfs.waitActive(); + + FileSystem fileSys = dfs.getFileSystem(); + + // Set up the following directory structure: + // /dir1/: directory + // /dir1/file: regular file + // /dir1/dir2/: directory + Path dir1 = new Path("/dir1"); + Path file = new Path("/dir1/file1"); + Path dir2 = new Path("/dir1/dir2"); + if (!fileSys.mkdirs(dir1)) { + throw new IOException("Mkdirs failed to create " + dir1.toString()); + } + FSDataOutputStream out = fileSys.create(file); + out.write(new byte[0]); + out.close(); + if (!fileSys.mkdirs(dir2)) { + throw new IOException("Mkdirs failed to create " + dir2.toString()); + } + + // split it using a CombinedFile input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = Job.getInstance(conf); + FileInputFormat.setInputPaths(job, "/dir1"); + List<InputSplit> splits = inFormat.getSplits(job); + + // directories should be omitted from getSplits() - we should only see file1 and not dir2 + assertEquals(1, splits.size()); + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(0, fileSplit.getLength(0)); + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + + /** * Test when input files are from non-default file systems */ @Test Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Fri Jul 25 20:33:09 2014 @@ -33,8 +33,8 @@ import java.util.HashMap; import java.util.Map; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.FailingMapper; @@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; +import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; +import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.junit.AfterClass; @@ -210,7 +215,19 @@ public class TestMRJobs { @Test(timeout = 300000) public void testJobClassloader() throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testJobClassloader()."); + testJobClassloader(false); + } + + @Test(timeout = 300000) + public void testJobClassloaderWithCustomClasses() throws IOException, + InterruptedException, ClassNotFoundException { + testJobClassloader(true); + } + + private void testJobClassloader(boolean useCustomClasses) throws IOException, + InterruptedException, ClassNotFoundException { + LOG.info("\n\n\nStarting testJobClassloader()" + + " useCustomClasses=" + useCustomClasses); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR @@ -221,6 +238,19 @@ public class TestMRJobs { // set master address to local to test that local mode applied iff framework == local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); + if (useCustomClasses) { + // to test AM loading user classes such as output format class, we want + // to blacklist them from the system classes (they need to be prepended + // as the first match wins) + String systemClasses = + sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); + // exclude the custom classes from system classes + systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" + + CustomSpeculator.class.getName() + "," + + systemClasses; + sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES, + systemClasses); + } sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB); sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); @@ -233,12 +263,66 @@ public class TestMRJobs { job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(SleepJob.class); job.setMaxMapAttempts(1); // speed up failures + if (useCustomClasses) { + // set custom output format class and speculator class + job.setOutputFormatClass(CustomOutputFormat.class); + final Configuration jobConf = job.getConfiguration(); + jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class, + Speculator.class); + // speculation needs to be enabled for the speculator to be loaded + jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); + } job.submit(); boolean succeeded = job.waitForCompletion(true); Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), succeeded); } + public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> { + public CustomOutputFormat() { + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class<?> cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + + public static class CustomSpeculator extends DefaultSpeculator { + public CustomSpeculator(Configuration conf, AppContext context) { + super(conf, context); + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class<?> cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java Fri Jul 25 20:33:09 2014 @@ -24,6 +24,7 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.junit.AfterClass; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -39,8 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.junit.After; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class TestMRJobsWithProfiler { @@ -51,6 +51,8 @@ public class TestMRJobsWithProfiler { private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES = EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); + private static final int PROFILED_TASK_ID = 1; + private static MiniMRYarnCluster mrCluster; private static final Configuration CONF = new Configuration(); @@ -69,8 +71,8 @@ public class TestMRJobsWithProfiler { private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); - @Before - public void setup() throws InterruptedException, IOException { + @BeforeClass + public static void setup() throws InterruptedException, IOException { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR @@ -79,7 +81,7 @@ public class TestMRJobsWithProfiler { } if (mrCluster == null) { - mrCluster = new MiniMRYarnCluster(getClass().getName()); + mrCluster = new MiniMRYarnCluster(TestMRJobsWithProfiler.class.getName()); mrCluster.init(CONF); mrCluster.start(); } @@ -90,8 +92,8 @@ public class TestMRJobsWithProfiler { localFs.setPermission(APP_JAR, new FsPermission("700")); } - @After - public void tearDown() { + @AfterClass + public static void tearDown() { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); @@ -103,10 +105,19 @@ public class TestMRJobsWithProfiler { } } + @Test (timeout = 150000) + public void testDefaultProfiler() throws Exception { + LOG.info("Starting testDefaultProfiler"); + testProfilerInternal(true); + } @Test (timeout = 150000) - public void testProfiler() throws IOException, InterruptedException, - ClassNotFoundException { + public void testDifferentProfilers() throws Exception { + LOG.info("Starting testDefaultProfiler"); + testProfilerInternal(false); + } + + private void testProfilerInternal(boolean useDefault) throws Exception { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); @@ -117,18 +128,19 @@ public class TestMRJobsWithProfiler { final JobConf sleepConf = new JobConf(mrCluster.getConfig()); sleepConf.setProfileEnabled(true); - // profile map split 1 - sleepConf.setProfileTaskRange(true, "1"); - // profile reduce of map output partitions 1 - sleepConf.setProfileTaskRange(false, "1"); - - // use hprof for map to profile.out - sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS, - "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n," - + "file=%s"); + sleepConf.setProfileTaskRange(true, String.valueOf(PROFILED_TASK_ID)); + sleepConf.setProfileTaskRange(false, String.valueOf(PROFILED_TASK_ID)); + + if (!useDefault) { + // use hprof for map to profile.out + sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS, + "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n," + + "file=%s"); + + // use Xprof for reduce to stdout + sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof"); + } - // use Xprof for reduce to stdout - sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof"); sleepJob.setConf(sleepConf); // 2-map-2-reduce SleepJob @@ -205,8 +217,8 @@ public class TestMRJobsWithProfiler { TaskLog.LogName.PROFILE.toString()); final Path stdoutPath = new Path(dirEntry.getValue(), TaskLog.LogName.STDOUT.toString()); - if (tid.getTaskType() == TaskType.MAP) { - if (tid.getTaskID().getId() == 1) { + if (useDefault || tid.getTaskType() == TaskType.MAP) { + if (tid.getTaskID().getId() == PROFILED_TASK_ID) { // verify profile.out final BufferedReader br = new BufferedReader(new InputStreamReader( localFs.open(profilePath))); @@ -222,7 +234,8 @@ public class TestMRJobsWithProfiler { } else { Assert.assertFalse("hprof file should not exist", localFs.exists(profilePath)); - if (tid.getTaskID().getId() == 1) { + if (tid.getTaskID().getId() == PROFILED_TASK_ID) { + // reducer is profiled with Xprof final BufferedReader br = new BufferedReader(new InputStreamReader( localFs.open(stdoutPath))); boolean flatProfFound = false;