Author: cnauroth Date: Tue Jul 9 18:55:51 2013 New Revision: 1501462 URL: http://svn.apache.org/r1501462 Log: MAPREDUCE-5278. Distributed cache is broken when JT staging dir is not on the default FS. Contributed by Xi Fang.
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1501462&r1=1501461&r2=1501462&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original) +++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Tue Jul 9 18:55:51 2013 @@ -298,6 +298,9 @@ Branch-hadoop-1-win (branched from branc MAPREDUCE-5371. TestProxyUserFromEnv#testProxyUserFromEnvironment failed caused by domains of windows users. (Xi Fang via cnauroth) + MAPREDUCE-5278. Distributed cache is broken when JT staging dir is not on the + default FS. (Xi Fang via cnauroth) + Merged from branch-1 HDFS-385. Backport: Add support for an experimental API that allows a Modified: hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml?rev=1501462&r1=1501461&r2=1501462&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml Tue Jul 9 18:55:51 2013 @@ -415,6 +415,15 @@ </property> <property> + <name>mapreduce.client.accessible.remote.schemes</name> + <value></value> + <description>The schemes of the file systems that are accessible from + all the nodes in the cluster. Used by the job client to avoid copying + distributed cache entries to the job staging dir if path is accessible. + </description> +</property> + +<property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>2</value> <description>The maximum number of reduce tasks that will be run Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1501462&r1=1501461&r2=1501462&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Jul 9 18:55:51 2013 @@ -47,6 +47,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; @@ -450,6 +451,9 @@ public class JobClient extends Configure private static final String TASKLOG_PULL_TIMEOUT_KEY = "mapreduce.client.tasklog.timeout"; private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; + @Private + public static final String CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY = + "mapreduce.client.accessible.remote.schemes"; static int tasklogtimeout; public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = @@ -688,18 +692,33 @@ public class JobClient extends Configure private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, final Path originalPath, final JobConf job, short replication) throws IOException, InterruptedException { - //check if we do not need to copy the files - // is jt using the same file system. - // just checking for uri strings... doing no dns lookups - // to see if the filesystems are the same. This is not optimal. - // but avoids name resolution. FileSystem remoteFs = null; remoteFs = originalPath.getFileSystem(job); + + // Check if we do not need to copy the files + // Check whether the given path is accessible from all the + // nodes in the cluster in which case we skip the copy operation + // altogether and pass the original path. + String remoteFsScheme = remoteFs.getUri().getScheme(); + String [] accessibleSchemes = job.getStrings( + CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY, null); + if (accessibleSchemes != null) { + for (String s : accessibleSchemes) { + if (remoteFsScheme.equalsIgnoreCase(s)) { + return originalPath; + } + } + } + // Check whether jt is using the same file system. + // just checking for uri strings... doing no dns lookups + // to see if the filesystems are the same. This is not optimal. + // but avoids name resolution. if (compareFs(remoteFs, jtFs)) { return originalPath; } + // this might have name collisions. copy will throw an exception //parse the original path to create new path Path newPath = new Path(parentDir, originalPath.getName()); @@ -808,7 +827,7 @@ public class JobClient extends Configure Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication); DistributedCache.addArchiveToClassPath - (new Path(newPath.toUri().getPath()), job, fs); + (new Path(newPath.toUri().getPath()), job, newPath.getFileSystem(job)); } } Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java?rev=1501462&r1=1501461&r2=1501462&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java (original) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java Tue Jul 9 18:55:51 2013 @@ -36,10 +36,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -179,6 +181,96 @@ public class TestMRWithDistributedCache } + /** + * Tests setting job tracker's staging dir to a nondefault filesystem. + * It validates that distributed cache entries are not copied to the staging + * dir for schemes defined in "mapreduce.client.accessible.remote.schemes" + * (denoted by JobClient.CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY). + */ + public void testJTStagingOnNondefaultFS() throws Exception { + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + try { + dfs = new MiniDFSCluster(conf, 1, true, null); + FileSystem fileSys = dfs.getFileSystem(); + mr = new MiniMRCluster(1, fileSys.getUri().toString(), 1); + runWithConfJTStagingOnNondefaultFS(mr.createJobConf()); + } finally { + if (mr != null) { + mr.shutdown(); + } + if (dfs != null) { + dfs.shutdown(); + } + } + } + + private void runWithConfJTStagingOnNondefaultFS(JobConf conf) + throws IOException, InterruptedException, + ClassNotFoundException, URISyntaxException { + // Create a temporary file of length 1. + Path first = createTempFile("distributed.first", "x") + .makeQualified(localFs); + // Create two jars with a single file inside them. + Path second = + makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2) + .makeQualified(localFs); + Path third = + makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3) + .makeQualified(localFs); + // Set configuration properties for this job + conf.set("fs.default.name", "file:///"); + conf.set("tmpfiles", first.toString()); + conf.set("tmpjars", second.toString()); + conf.set("tmparchives", third.toString()); + conf.set(JobClient.CLIENT_ACCESSIBLE_REMOTE_SCHEMES_KEY, + localFs.getUri().getScheme()); + conf.setMaxMapAttempts(1); // speed up failures + // Submit job + Job job = new Job(conf); + job.setMapperClass(DistributedCacheCheckerJTStagingOnNondefaultFS.class); + job.setOutputFormatClass(NullOutputFormat.class); + FileInputFormat.setInputPaths(job, first); + job.submit(); + // Check if the job is successful + assertTrue(job.waitForCompletion(false)); + } + + public static class DistributedCacheCheckerJTStagingOnNondefaultFS extends + Mapper<LongWritable, Text, NullWritable, NullWritable> { + @Override + public void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + Path[] files = DistributedCache.getLocalCacheFiles(conf); + Path[] archives = DistributedCache.getLocalCacheArchives(conf); + // Check that 1 file and 2 archives are present + assertEquals(1, files.length); + assertEquals(2, archives.length); + // Check lengths of the file + assertEquals(1, localFs.getFileStatus(files[0]).getLen()); + // Check the existence of the archives + assertTrue(localFs + .exists(new Path(archives[0], "distributed.jar.inside2"))); + assertTrue(localFs + .exists(new Path(archives[1], "distributed.jar.inside3"))); + // Check the schemes of the files/archives specified in + // "mapred.cache.archives" and "mapred.cache.files". The schemes + // must be "file" if we do not copy the files/archives to jt's staging + // dir + String[] arxSources = conf.getStrings("mapred.cache.archives"); + String[] fileSources = conf.getStrings("mapred.cache.files"); + assertEquals("file", (new Path(fileSources[0])).toUri().getScheme()); + assertEquals("file", (new Path(arxSources[0])).toUri().getScheme()); + assertEquals("file", (new Path(arxSources[1])).toUri().getScheme()); + // Check the class loaders + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + // The archives added by "tmpjars" should be reachable via + // the class loader. + TestCase.assertNotNull(cl.getResource("distributed.jar.inside2")); + TestCase.assertNull(cl.getResource("distributed.jar.inside3")); + } + } + private Path createTempFile(String filename, String contents) throws IOException { Path path = new Path(TEST_ROOT_DIR, filename);