http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java index d0d7a34..d347da5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -220,7 +220,7 @@ public class TestJobResourceUploader { destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" }; private String jobjarSubmitDir = "/jobjar-submit-dir"; - private String expectedJobJar = jobjarSubmitDir + "/job.jar"; + private String basicExpectedJobJar = jobjarSubmitDir + "/job.jar"; @Test public void testPathsWithNoFragNoSchemeRelative() throws IOException { @@ -236,7 +236,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -254,7 +254,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -272,7 +272,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -290,7 +290,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -308,7 +308,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -326,7 +326,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -344,7 +344,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf, true); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -362,7 +362,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf, true); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -402,44 +402,39 @@ public class TestJobResourceUploader { private void runTmpResourcePathTest(JobResourceUploader uploader, ResourceConf rConf, JobConf jConf, String[] expectedFiles, String[] expectedArchives, String expectedJobJar) throws IOException { - rConf.setupJobConf(jConf); - // We use a pre and post job object here because we need the post job object - // to get the new values set during uploadResources, but we need the pre job - // to set the job jar because JobResourceUploader#uploadJobJar uses the Job - // interface not the JobConf. The post job is automatically created in - // validateResourcePaths. - Job jobPre = Job.getInstance(jConf); - uploadResources(uploader, jConf, jobPre); - - validateResourcePaths(jConf, expectedFiles, expectedArchives, - expectedJobJar, jobPre); + Job job = rConf.setupJobConf(jConf); + uploadResources(uploader, job); + validateResourcePaths(job, expectedFiles, expectedArchives, expectedJobJar); } - private void uploadResources(JobResourceUploader uploader, JobConf jConf, - Job job) throws IOException { - Collection<String> files = jConf.getStringCollection("tmpfiles"); - Collection<String> libjars = jConf.getStringCollection("tmpjars"); - Collection<String> archives = jConf.getStringCollection("tmparchives"); - String jobJar = jConf.getJar(); - uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null, - (short) 3); - uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"), - null, (short) 3); - uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"), - null, (short) 3); - uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3); + private void uploadResources(JobResourceUploader uploader, Job job) + throws IOException { + Configuration conf = job.getConfiguration(); + Collection<String> files = conf.getStringCollection("tmpfiles"); + Collection<String> libjars = conf.getStringCollection("tmpjars"); + Collection<String> archives = conf.getStringCollection("tmparchives"); + Map<URI, FileStatus> statCache = new HashMap<>(); + Map<String, Boolean> fileSCUploadPolicies = new HashMap<>(); + String jobJar = job.getJar(); + uploader.uploadFiles(job, files, new Path("/files-submit-dir"), null, + (short) 3, fileSCUploadPolicies, statCache); + uploader.uploadArchives(job, archives, new Path("/archives-submit-dir"), + null, (short) 3, fileSCUploadPolicies, statCache); + uploader.uploadLibJars(job, libjars, new Path("/libjars-submit-dir"), null, + (short) 3, fileSCUploadPolicies, statCache); + uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3, + statCache); } - private void validateResourcePaths(JobConf jConf, String[] expectedFiles, - String[] expectedArchives, String expectedJobJar, Job preJob) + private void validateResourcePaths(Job job, String[] expectedFiles, + String[] expectedArchives, String expectedJobJar) throws IOException { - Job j = Job.getInstance(jConf); - validateResourcePathsSub(j.getCacheFiles(), expectedFiles); - validateResourcePathsSub(j.getCacheArchives(), expectedArchives); + validateResourcePathsSub(job.getCacheFiles(), expectedFiles); + validateResourcePathsSub(job.getCacheArchives(), expectedArchives); // We use a different job object here because the jobjar was set on a // different job object Assert.assertEquals("Job jar path is different than expected!", - expectedJobJar, preJob.getJar()); + expectedJobJar, job.getJar()); } private void validateResourcePathsSub(URI[] actualURIs, @@ -645,7 +640,7 @@ public class TestJobResourceUploader { } } - private void setupJobConf(JobConf conf) { + private Job setupJobConf(JobConf conf) throws IOException { conf.set("tmpfiles", buildPathString("tmpFiles", this.numOfTmpFiles, ".txt")); conf.set("tmpjars", @@ -675,6 +670,7 @@ public class TestJobResourceUploader { conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB); conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, this.maxSingleResourceMB); + return new Job(conf); } // We always want absolute paths with a scheme in the DistributedCache, so
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java new file mode 100644 index 0000000..7598141 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.jar.JarOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.client.api.SharedCacheClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests the JobResourceUploader class with the shared cache. + */ +public class TestJobResourceUploaderWithSharedCache { + protected static final Log LOG = LogFactory + .getLog(TestJobResourceUploaderWithSharedCache.class); + private static MiniDFSCluster dfs; + private static FileSystem localFs; + private static FileSystem remoteFs; + private static Configuration conf = new Configuration(); + private static Path testRootDir; + private static Path remoteStagingDir = + new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR); + private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; + + @Before + public void cleanup() throws Exception { + remoteFs.delete(remoteStagingDir, true); + } + + @BeforeClass + public static void setup() throws IOException { + // create configuration, dfs, file system + localFs = FileSystem.getLocal(conf); + testRootDir = + new Path("target", + TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir") + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + remoteFs = dfs.getFileSystem(); + } + + @AfterClass + public static void tearDown() { + try { + if (localFs != null) { + localFs.close(); + } + if (remoteFs != null) { + remoteFs.close(); + } + if (dfs != null) { + dfs.shutdown(); + } + } catch (IOException ioe) { + LOG.info("IO exception in closing file system"); + ioe.printStackTrace(); + } + } + + private class MyFileUploader extends JobResourceUploader { + // The mocked SharedCacheClient that will be fed into the FileUploader + private SharedCacheClient mockscClient = mock(SharedCacheClient.class); + // A real client for checksum calculation + private SharedCacheClient scClient = SharedCacheClient + .createSharedCacheClient(); + + MyFileUploader(FileSystem submitFs, Configuration conf) + throws IOException { + super(submitFs, false); + // Initialize the real client, but don't start it. We don't need or want + // to create an actual proxy because we only use this for mocking out the + // getFileChecksum method. + scClient.init(conf); + when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer( + new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + Path file = (Path) invocation.getArguments()[0]; + // Use the real scClient to generate the checksum. We use an + // answer/mock combination to avoid having to spy on a real + // SharedCacheClient object. + return scClient.getFileChecksum(file); + } + }); + } + + // This method is to prime the mock client with the correct checksum, so it + // looks like a given resource is present in the shared cache. + public void mockFileInSharedCache(Path localFile, URL remoteFile) + throws YarnException, IOException { + // when the resource is referenced, simply return the remote path to the + // caller + when(mockscClient.use(any(ApplicationId.class), + eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile); + } + + @Override + protected SharedCacheClient createSharedCacheClient(Configuration c) { + // Feed the mocked SharedCacheClient into the FileUploader logic + return mockscClient; + } + } + + @Test + public void testSharedCacheDisabled() throws Exception { + JobConf jobConf = createJobConf(); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is disabled by default + uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false); + + } + + @Test + public void testSharedCacheEnabled() throws Exception { + JobConf jobConf = createJobConf(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is enabled for every file type + // the # of times SharedCacheClient.use is called should == + // total # of files/libjars/archive/jobjar + uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false); + } + + @Test + public void testSharedCacheEnabledWithJobJarInSharedCache() + throws Exception { + JobConf jobConf = createJobConf(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is enabled for every file type + // the # of times SharedCacheClient.use is called should == + // total # of files/libjars/archive/jobjar + uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true); + } + + @Test + public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception { + JobConf jobConf = createJobConf(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars"); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is enabled for archives and libjars type + // the # of times SharedCacheClient.use is called should == + // total # of libjars and archives + uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true); + } + + private JobConf createJobConf() { + JobConf jobConf = new JobConf(); + jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + + jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri() + .toString()); + return jobConf; + } + + private Path copyToRemote(Path jar) throws IOException { + Path remoteFile = new Path("/tmp", jar.getName()); + remoteFs.copyFromLocalFile(jar, remoteFile); + return remoteFile; + } + + private void makeJarAvailableInSharedCache(Path jar, + MyFileUploader fileUploader) throws YarnException, IOException { + // copy file to remote file system + Path remoteFile = copyToRemote(jar); + // prime mocking so that it looks like this file is in the shared cache + fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile)); + } + + private void uploadFilesToRemoteFS(Job job, JobConf jobConf, + int useCallCountExpected, + int numOfFilesShouldBeUploadedToSharedCacheExpected, + int numOfArchivesShouldBeUploadedToSharedCacheExpected, + boolean jobJarInSharedCacheBeforeUpload) throws Exception { + MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf); + SharedCacheConfig sharedCacheConfig = new SharedCacheConfig(); + sharedCacheConfig.init(jobConf); + + Path firstFile = createTempFile("first-input-file", "x"); + Path secondFile = createTempFile("second-input-file", "xx"); + + // Add files to job conf via distributed cache API as well as command line + boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf); + assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded); + if (!fileAdded) { + Path remoteFile = copyToRemote(firstFile); + job.addCacheFile(remoteFile.toUri()); + } + jobConf.set("tmpfiles", secondFile.toString()); + + // Create jars with a single file inside them. + Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1); + Path secondJar = + makeJar(new Path(testRootDir, "distributed.second.jar"), 2); + + // Verify duplicated contents can be handled properly. + Path thirdJar = new Path(testRootDir, "distributed.third.jar"); + localFs.copyFromLocalFile(secondJar, thirdJar); + + // make secondJar cache available + makeJarAvailableInSharedCache(secondJar, fileUploader); + + // Add libjars to job conf via distributed cache API as well as command + // line + boolean libjarAdded = + Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf); + assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded); + if (!libjarAdded) { + Path remoteJar = copyToRemote(firstJar); + job.addFileToClassPath(remoteJar); + } + + jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString()); + + Path firstArchive = makeArchive("first-archive.zip", "first-file"); + Path secondArchive = makeArchive("second-archive.zip", "second-file"); + + // Add archives to job conf via distributed cache API as well as command + // line + boolean archiveAdded = + Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf); + assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(), + archiveAdded); + if (!archiveAdded) { + Path remoteArchive = copyToRemote(firstArchive); + job.addCacheArchive(remoteArchive.toUri()); + } + + jobConf.set("tmparchives", secondArchive.toString()); + + // Add job jar to job conf + Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4); + if (jobJarInSharedCacheBeforeUpload) { + makeJarAvailableInSharedCache(jobJar, fileUploader); + } + jobConf.setJar(jobJar.toString()); + + fileUploader.uploadResources(job, remoteStagingDir); + + verify(fileUploader.mockscClient, times(useCallCountExpected)).use( + any(ApplicationId.class), anyString()); + + int numOfFilesShouldBeUploadedToSharedCache = 0; + Map<String, Boolean> filesSharedCacheUploadPolicies = + Job.getFileSharedCacheUploadPolicies(jobConf); + for (Boolean policy : filesSharedCacheUploadPolicies.values()) { + if (policy) { + numOfFilesShouldBeUploadedToSharedCache++; + } + } + assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected, + numOfFilesShouldBeUploadedToSharedCache); + + int numOfArchivesShouldBeUploadedToSharedCache = 0; + Map<String, Boolean> archivesSharedCacheUploadPolicies = + Job.getArchiveSharedCacheUploadPolicies(jobConf); + for (Boolean policy : archivesSharedCacheUploadPolicies.values()) { + if (policy) { + numOfArchivesShouldBeUploadedToSharedCache++; + } + } + assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected, + numOfArchivesShouldBeUploadedToSharedCache); + } + + + private Path createTempFile(String filename, String contents) + throws IOException { + Path path = new Path(testRootDir, filename); + FSDataOutputStream os = localFs.create(path); + os.writeBytes(contents); + os.close(); + localFs.setPermission(path, new FsPermission("700")); + return path; + } + + private Path makeJar(Path p, int index) throws FileNotFoundException, + IOException { + FileOutputStream fos = + new FileOutputStream(new File(p.toUri().getPath())); + JarOutputStream jos = new JarOutputStream(fos); + ZipEntry ze = new ZipEntry("distributed.jar.inside" + index); + jos.putNextEntry(ze); + jos.write(("inside the jar!" + index).getBytes()); + jos.closeEntry(); + jos.close(); + localFs.setPermission(p, new FsPermission("700")); + return p; + } + + private Path makeArchive(String archiveFile, String filename) + throws Exception { + Path archive = new Path(testRootDir, archiveFile); + Path file = new Path(testRootDir, filename); + DataOutputStream out = localFs.create(archive); + ZipOutputStream zos = new ZipOutputStream(out); + ZipEntry ze = new ZipEntry(file.toString()); + zos.putNextEntry(ze); + zos.write(input.getBytes("UTF-8")); + zos.closeEntry(); + zos.close(); + return archive; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 1baa467..a23ff34 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -19,6 +19,8 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -338,16 +340,41 @@ public class YARNRunner implements ClientProtocol { } } - private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) - throws IOException { + private LocalResource createApplicationResource(FileContext fs, Path p, + LocalResourceType type) throws IOException { + return createApplicationResource(fs, p, null, type, + LocalResourceVisibility.APPLICATION, false); + } + + private LocalResource createApplicationResource(FileContext fs, Path p, + String fileSymlink, LocalResourceType type, LocalResourceVisibility viz, + Boolean uploadToSharedCache) throws IOException { LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); FileStatus rsrcStat = fs.getFileStatus(p); - rsrc.setResource(URL.fromPath(fs - .getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); + // We need to be careful when converting from path to URL to add a fragment + // so that the symlink name when localized will be correct. + Path qualifiedPath = + fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath()); + URI uriWithFragment = null; + boolean useFragment = fileSymlink != null && !fileSymlink.equals(""); + try { + if (useFragment) { + uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink); + } else { + uriWithFragment = qualifiedPath.toUri(); + } + } catch (URISyntaxException e) { + throw new IOException( + "Error parsing local resource path." + + " Path was not able to be converted to a URI: " + qualifiedPath, + e); + } + rsrc.setResource(URL.fromURI(uriWithFragment)); rsrc.setSize(rsrcStat.getLen()); rsrc.setTimestamp(rsrcStat.getModificationTime()); rsrc.setType(type); - rsrc.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc.setVisibility(viz); + rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache); return rsrc; } @@ -368,10 +395,21 @@ public class YARNRunner implements ClientProtocol { jobConfPath, LocalResourceType.FILE)); if (jobConf.get(MRJobConfig.JAR) != null) { Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); + // We hard code the job.jar symlink because mapreduce code expects the + // job.jar to be named that way. + FileContext fccc = + FileContext.getFileContext(jobJarPath.toUri(), jobConf); + LocalResourceVisibility jobJarViz = + jobConf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY, + MRJobConfig.JOBJAR_VISIBILITY_DEFAULT) + ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.APPLICATION; LocalResource rc = createApplicationResource( - FileContext.getFileContext(jobJarPath.toUri(), jobConf), - jobJarPath, - LocalResourceType.PATTERN); + FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath, + MRJobConfig.JOB_JAR, LocalResourceType.PATTERN, jobJarViz, + jobConf.getBoolean( + MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY, + MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT)); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); rc.setPattern(pattern); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java index 4a2b857..a3ea26e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java @@ -132,6 +132,58 @@ public class TestLocalJobSubmission { } } + /** + * Test local job submission with a file option. + * + * @throws IOException + */ + @Test + public void testLocalJobFilesOption() throws IOException { + Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + final String[] args = + {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1", + "-mt", "1", "-rt", "1"}; + int res = -1; + try { + res = ToolRunner.run(conf, new SleepJob(), args); + } catch (Exception e) { + System.out.println("Job failed with " + e.getLocalizedMessage()); + e.printStackTrace(System.out); + fail("Job failed"); + } + assertEquals("dist job res is not 0:", 0, res); + } + + /** + * Test local job submission with an archive option. + * + * @throws IOException + */ + @Test + public void testLocalJobArchivesOption() throws IOException { + Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + final String[] args = + {"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r", + "1", "-mt", "1", "-rt", "1"}; + int res = -1; + try { + res = ToolRunner.run(conf, new SleepJob(), args); + } catch (Exception e) { + System.out.println("Job failed with " + e.getLocalizedMessage()); + e.printStackTrace(System.out); + fail("Job failed"); + } + assertEquals("dist job res is not 0:", 0, res); + } + private Path makeJar(Path p) throws IOException { FileOutputStream fos = new FileOutputStream(new File(p.toString())); JarOutputStream jos = new JarOutputStream(fos); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 22cb530..6e280ad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -1298,6 +1298,65 @@ public class TestMRJobs { jarFile.delete(); } + @Test + public void testSharedCache() throws Exception { + Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + Job job = Job.getInstance(mrCluster.getConfig()); + + Configuration jobConf = job.getConfiguration(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); + + Path inputFile = createTempFile("input-file", "x"); + + // Create jars with a single file inside them. + Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2); + Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3); + Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); + + // Add libjars to job conf + jobConf.set("tmpjars", second.toString() + "," + third.toString() + "," + + fourth.toString()); + + // Because the job jar is a "dummy" jar, we need to include the jar with + // DistributedCacheChecker or it won't be able to find it + Path distributedCacheCheckerJar = + new Path(JarFinder.getJar(SharedCacheChecker.class)); + job.addFileToClassPath(distributedCacheCheckerJar.makeQualified( + localFs.getUri(), distributedCacheCheckerJar.getParent())); + + job.setMapperClass(SharedCacheChecker.class); + job.setOutputFormatClass(NullOutputFormat.class); + + FileInputFormat.setInputPaths(job, inputFile); + + job.setMaxMapAttempts(1); // speed up failures + + job.submit(); + String trackingUrl = job.getTrackingURL(); + String jobId = job.getJobID().toString(); + Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertTrue("Tracking URL was " + trackingUrl + + " but didn't Match Job ID " + jobId, + trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); + } + + /** + * An identity mapper for testing the shared cache. + */ + public static class SharedCacheChecker extends + Mapper<LongWritable, Text, NullWritable, NullWritable> { + @Override + public void setup(Context context) throws IOException { + } + } + public static class ConfVerificationMapper extends SleepMapper { @Override protected void setup(Context context) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e46d5bb9/hadoop-project/src/site/site.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 44551fe..4512947 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -112,6 +112,7 @@ <item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/> <item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/> <item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/> + <item name="Support for YARN Shared Cache" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html"/> </menu> <menu name="MapReduce REST APIs" inherit="top"> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org