Author: szetszwo Date: Mon Jul 21 23:37:54 2014 New Revision: 1612432 URL: http://svn.apache.org/r1612432 Log: Merge r1609845 through r1612431 from trunk.
Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1611529-1612431 Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt Mon Jul 21 23:37:54 2014 @@ -169,6 +169,12 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. (Wangda Tan via zjshen) + MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader + enabled if custom output format/committer is used (Sangjin Lee via jlowe) + + MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories + in its results (Jason Dere via jlowe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1611529-1612431 Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Jul 21 23:37:54 2014 @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; @@ -200,6 +199,7 @@ public class MRAppMaster extends Composi new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; + private ClassLoader jobClassLoader; private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; @@ -250,6 +250,9 @@ public class MRAppMaster extends Composi @Override protected void serviceInit(final Configuration conf) throws Exception { + // create the job classloader if enabled + createJobClassLoader(conf); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); initJobCredentialsAndUGI(conf); @@ -387,8 +390,13 @@ public class MRAppMaster extends Composi addIfService(committerEventHandler); //policy handling preemption requests from RM - preemptionPolicy = createPreemptionPolicy(conf); - preemptionPolicy.init(context); + callWithJobClassLoader(conf, new Action<Void>() { + public Void call(Configuration conf) { + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + return null; + } + }); //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); @@ -453,33 +461,37 @@ public class MRAppMaster extends Composi } private OutputCommitter createOutputCommitter(Configuration conf) { - OutputCommitter committer = null; - - LOG.info("OutputCommitter set in config " - + conf.get("mapred.output.committer.class")); - - if (newApiCommitter) { - org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils - .newTaskId(jobId, 0, TaskType.MAP); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils - .newTaskAttemptId(taskID, 0); - TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, - TypeConverter.fromYarn(attemptID)); - OutputFormat outputFormat; - try { - outputFormat = ReflectionUtils.newInstance(taskContext - .getOutputFormatClass(), conf); - committer = outputFormat.getOutputCommitter(taskContext); - } catch (Exception e) { - throw new YarnRuntimeException(e); + return callWithJobClassLoader(conf, new Action<OutputCommitter>() { + public OutputCommitter call(Configuration conf) { + OutputCommitter committer = null; + + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); + + if (newApiCommitter) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = + MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + MRBuilderUtils.newTaskAttemptId(taskID, 0); + TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptID)); + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(taskContext + .getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; } - } else { - committer = ReflectionUtils.newInstance(conf.getClass( - "mapred.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapred.OutputCommitter.class), conf); - } - LOG.info("OutputCommitter is " + committer.getClass().getName()); - return committer; + }); } protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { @@ -667,38 +679,42 @@ public class MRAppMaster extends Composi return new StagingDirCleaningService(); } - protected Speculator createSpeculator(Configuration conf, AppContext context) { - Class<? extends Speculator> speculatorClass; - - try { - speculatorClass - // "yarn.mapreduce.job.speculator.class" - = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, - DefaultSpeculator.class, - Speculator.class); - Constructor<? extends Speculator> speculatorConstructor - = speculatorClass.getConstructor - (Configuration.class, AppContext.class); - Speculator result = speculatorConstructor.newInstance(conf, context); - - return result; - } catch (InstantiationException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (IllegalAccessException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (InvocationTargetException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (NoSuchMethodException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } + protected Speculator createSpeculator(Configuration conf, + final AppContext context) { + return callWithJobClassLoader(conf, new Action<Speculator>() { + public Speculator call(Configuration conf) { + Class<? extends Speculator> speculatorClass; + try { + speculatorClass + // "yarn.mapreduce.job.speculator.class" + = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, + DefaultSpeculator.class, + Speculator.class); + Constructor<? extends Speculator> speculatorConstructor + = speculatorClass.getConstructor + (Configuration.class, AppContext.class); + Speculator result = speculatorConstructor.newInstance(conf, context); + + return result; + } catch (InstantiationException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (IllegalAccessException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (InvocationTargetException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (NoSuchMethodException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } + } + }); } protected TaskAttemptListener createTaskAttemptListener(AppContext context, @@ -712,7 +728,7 @@ public class MRAppMaster extends Composi protected EventHandler<CommitterEvent> createCommitterEventHandler( AppContext context, OutputCommitter committer) { return new CommitterEventHandler(context, committer, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), jobClassLoader); } protected ContainerAllocator createContainerAllocator( @@ -1083,8 +1099,8 @@ public class MRAppMaster extends Composi //start all the components super.serviceStart(); - // set job classloader if configured - MRApps.setJobClassLoader(getConfig()); + // finally set the job classloader + MRApps.setClassLoader(jobClassLoader, getConfig()); if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); @@ -1101,19 +1117,24 @@ public class MRAppMaster extends Composi TaskLog.syncLogsShutdown(logSyncer); } - private boolean isRecoverySupported(OutputCommitter committer2) - throws IOException { + private boolean isRecoverySupported() throws IOException { boolean isSupported = false; - JobContext _jobContext; + Configuration conf = getConfig(); if (committer != null) { + final JobContext _jobContext; if (newApiCommitter) { _jobContext = new JobContextImpl( - getConfig(), TypeConverter.fromYarn(getJobId())); + conf, TypeConverter.fromYarn(getJobId())); } else { _jobContext = new org.apache.hadoop.mapred.JobContextImpl( - new JobConf(getConfig()), TypeConverter.fromYarn(getJobId())); + new JobConf(conf), TypeConverter.fromYarn(getJobId())); } - isSupported = committer.isRecoverySupported(_jobContext); + isSupported = callWithJobClassLoader(conf, + new ExceptionAction<Boolean>() { + public Boolean call(Configuration conf) throws IOException { + return committer.isRecoverySupported(_jobContext); + } + }); } return isSupported; } @@ -1127,7 +1148,7 @@ public class MRAppMaster extends Composi MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); - boolean recoverySupportedByCommitter = isRecoverySupported(committer); + boolean recoverySupportedByCommitter = isRecoverySupported(); // If a shuffle secret was not provided by the job client then this app // attempt will generate one. However that disables recovery if there @@ -1312,7 +1333,7 @@ public class MRAppMaster extends Composi this.conf = config; } @Override - public void handle(SpeculatorEvent event) { + public void handle(final SpeculatorEvent event) { if (disabled) { return; } @@ -1339,7 +1360,12 @@ public class MRAppMaster extends Composi if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. - speculator.handle(event); + callWithJobClassLoader(conf, new Action<Void>() { + public Void call(Configuration conf) { + speculator.handle(event); + return null; + } + }); } } @@ -1499,6 +1525,102 @@ public class MRAppMaster extends Composi }); } + /** + * Creates a job classloader based on the configuration if the job classloader + * is enabled. It is a no-op if the job classloader is not enabled. + */ + private void createJobClassLoader(Configuration conf) throws IOException { + jobClassLoader = MRApps.createJobClassLoader(conf); + } + + /** + * Executes the given action with the job classloader set as the configuration + * classloader as well as the thread context class loader if the job + * classloader is enabled. After the call, the original classloader is + * restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + */ + <T> T callWithJobClassLoader(Configuration conf, Action<T> action) { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Executes the given action that can throw a checked exception with the job + * classloader set as the configuration classloader as well as the thread + * context class loader if the job classloader is enabled. After the call, the + * original classloader is restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + * @throws IOException if the underlying action throws an IOException + * @throws YarnRuntimeException if the underlying action throws an exception + * other than an IOException + */ + <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action) + throws IOException { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } catch (IOException e) { + throw e; + } catch (YarnRuntimeException e) { + throw e; + } catch (Exception e) { + // wrap it with a YarnRuntimeException + throw new YarnRuntimeException(e); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Action to be wrapped with setting and unsetting the job classloader + */ + private static interface Action<T> { + T call(Configuration conf); + } + + private static interface ExceptionAction<T> { + T call(Configuration conf) throws Exception; + } + @Override protected void serviceStop() throws Exception { super.serviceStop(); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Mon Jul 21 23:37:54 2014 @@ -68,6 +68,7 @@ public class CommitterEventHandler exten private BlockingQueue<CommitterEvent> eventQueue = new LinkedBlockingQueue<CommitterEvent>(); private final AtomicBoolean stopped; + private final ClassLoader jobClassLoader; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; private long commitWindowMs; @@ -79,11 +80,17 @@ public class CommitterEventHandler exten public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) { + this(context, committer, rmHeartbeatHandler, null); + } + + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) { super("CommitterEventHandler"); this.context = context; this.committer = committer; this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); + this.jobClassLoader = jobClassLoader; } @Override @@ -109,9 +116,23 @@ public class CommitterEventHandler exten @Override protected void serviceStart() throws Exception { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("CommitterEvent Processor #%d") - .build(); + ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder() + .setNameFormat("CommitterEvent Processor #%d"); + if (jobClassLoader != null) { + // if the job classloader is enabled, we need to use the job classloader + // as the thread context classloader (TCCL) of these threads in case the + // committer needs to load another class via TCCL + ThreadFactory backingTf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(jobClassLoader); + return thread; + } + }; + tfBuilder.setThreadFactory(backingTf); + } + ThreadFactory tf = tfBuilder.build(); launcherPool = new ThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); eventHandlingThread = new Thread(new Runnable() { Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Mon Jul 21 23:37:54 2014 @@ -327,8 +327,8 @@ public class MRApps extends Apps { } /** - * Sets a {@link ApplicationClassLoader} on the given configuration and as - * the context classloader, if + * Creates and sets a {@link ApplicationClassLoader} on the given + * configuration and as the thread context classloader, if * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and * the APP_CLASSPATH environment variable is set. * @param conf @@ -336,24 +336,52 @@ public class MRApps extends Apps { */ public static void setJobClassLoader(Configuration conf) throws IOException { + setClassLoader(createJobClassLoader(conf), conf); + } + + /** + * Creates a {@link ApplicationClassLoader} if + * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and + * the APP_CLASSPATH environment variable is set. + * @param conf + * @returns the created job classloader, or null if the job classloader is not + * enabled or the APP_CLASSPATH environment variable is not set + * @throws IOException + */ + public static ClassLoader createJobClassLoader(Configuration conf) + throws IOException { + ClassLoader jobClassLoader = null; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); if (appClasspath == null) { - LOG.warn("Not using job classloader since APP_CLASSPATH is not set."); + LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); } else { - LOG.info("Using job classloader"); + LOG.info("Creating job classloader"); if (LOG.isDebugEnabled()) { LOG.debug("APP_CLASSPATH=" + appClasspath); } String[] systemClasses = getSystemClasses(conf); - ClassLoader jobClassLoader = createJobClassLoader(appClasspath, + jobClassLoader = createJobClassLoader(appClasspath, systemClasses); - if (jobClassLoader != null) { - conf.setClassLoader(jobClassLoader); - Thread.currentThread().setContextClassLoader(jobClassLoader); - } } } + return jobClassLoader; + } + + /** + * Sets the provided classloader on the given configuration and as the thread + * context classloader if the classloader is not null. + * @param classLoader + * @param conf + */ + public static void setClassLoader(ClassLoader classLoader, + Configuration conf) { + if (classLoader != null) { + LOG.info("Setting classloader " + classLoader.getClass().getName() + + " on the configuration and as the thread context classloader"); + conf.setClassLoader(classLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } } @VisibleForTesting Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Mon Jul 21 23:37:54 2014 @@ -579,7 +579,7 @@ public abstract class CombineFileInputFo blocks = new OneBlockInfo[0]; } else { - if(locations.length == 0) { + if(locations.length == 0 && !stat.isDirectory()) { locations = new BlockLocation[] { new BlockLocation() }; } Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Mon Jul 21 23:37:54 2014 @@ -1275,6 +1275,61 @@ public class TestCombineFileInputFormat } /** + * Test that directories do not get included as part of getSplits() + */ + @Test + public void testGetSplitsWithDirectory() throws Exception { + MiniDFSCluster dfs = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1) + .build(); + dfs.waitActive(); + + dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1) + .build(); + dfs.waitActive(); + + FileSystem fileSys = dfs.getFileSystem(); + + // Set up the following directory structure: + // /dir1/: directory + // /dir1/file: regular file + // /dir1/dir2/: directory + Path dir1 = new Path("/dir1"); + Path file = new Path("/dir1/file1"); + Path dir2 = new Path("/dir1/dir2"); + if (!fileSys.mkdirs(dir1)) { + throw new IOException("Mkdirs failed to create " + dir1.toString()); + } + FSDataOutputStream out = fileSys.create(file); + out.write(new byte[0]); + out.close(); + if (!fileSys.mkdirs(dir2)) { + throw new IOException("Mkdirs failed to create " + dir2.toString()); + } + + // split it using a CombinedFile input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = Job.getInstance(conf); + FileInputFormat.setInputPaths(job, "/dir1"); + List<InputSplit> splits = inFormat.getSplits(job); + + // directories should be omitted from getSplits() - we should only see file1 and not dir2 + assertEquals(1, splits.size()); + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(0, fileSplit.getLength(0)); + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + + /** * Test when input files are from non-default file systems */ @Test Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1612432&r1=1612431&r2=1612432&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Mon Jul 21 23:37:54 2014 @@ -33,8 +33,8 @@ import java.util.HashMap; import java.util.Map; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.FailingMapper; @@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; +import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; +import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.junit.AfterClass; @@ -210,7 +215,19 @@ public class TestMRJobs { @Test(timeout = 300000) public void testJobClassloader() throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testJobClassloader()."); + testJobClassloader(false); + } + + @Test(timeout = 300000) + public void testJobClassloaderWithCustomClasses() throws IOException, + InterruptedException, ClassNotFoundException { + testJobClassloader(true); + } + + private void testJobClassloader(boolean useCustomClasses) throws IOException, + InterruptedException, ClassNotFoundException { + LOG.info("\n\n\nStarting testJobClassloader()" + + " useCustomClasses=" + useCustomClasses); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR @@ -221,6 +238,19 @@ public class TestMRJobs { // set master address to local to test that local mode applied iff framework == local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); + if (useCustomClasses) { + // to test AM loading user classes such as output format class, we want + // to blacklist them from the system classes (they need to be prepended + // as the first match wins) + String systemClasses = + sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); + // exclude the custom classes from system classes + systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" + + CustomSpeculator.class.getName() + "," + + systemClasses; + sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES, + systemClasses); + } sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB); sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); @@ -233,12 +263,66 @@ public class TestMRJobs { job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(SleepJob.class); job.setMaxMapAttempts(1); // speed up failures + if (useCustomClasses) { + // set custom output format class and speculator class + job.setOutputFormatClass(CustomOutputFormat.class); + final Configuration jobConf = job.getConfiguration(); + jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class, + Speculator.class); + // speculation needs to be enabled for the speculator to be loaded + jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); + } job.submit(); boolean succeeded = job.waitForCompletion(true); Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), succeeded); } + public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> { + public CustomOutputFormat() { + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class<?> cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + + public static class CustomSpeculator extends DefaultSpeculator { + public CustomSpeculator(Configuration conf, AppContext context) { + super(conf, context); + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class<?> cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters();