Repository: hadoop Updated Branches: refs/heads/branch-2.6 43854764a -> 7be74ea09
MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect with no authority in job jar path. Contributed by Gera Shegalov (cherry picked from commit 10f9f5101c44be7c675a44ded4aad212627ecdee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7be74ea0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7be74ea0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7be74ea0 Branch: refs/heads/branch-2.6 Commit: 7be74ea09a4721b39f78df47c11b9ceca1b34d1b Parents: 4385476 Author: Jason Lowe <jl...@apache.org> Authored: Thu Nov 6 15:10:40 2014 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Thu Nov 6 15:12:37 2014 +0000 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/FileContext.java | 3 ++ hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../v2/app/job/impl/TaskAttemptImpl.java | 8 ++-- .../apache/hadoop/mapreduce/JobSubmitter.java | 9 ++--- .../org/apache/hadoop/mapred/YARNRunner.java | 5 ++- .../apache/hadoop/mapreduce/v2/TestMRJobs.java | 41 ++++++++++++++++---- .../apache/hadoop/mapreduce/v2/TestUberAM.java | 40 +++---------------- 7 files changed, 56 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index 57bd3b5..40d271d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -440,6 +440,9 @@ public class FileContext { final Configuration aConf) throws UnsupportedFileSystemException { UserGroupInformation currentUser = null; AbstractFileSystem defaultAfs = null; + if (defaultFsUri.getScheme() == null) { + return getFileContext(aConf); + } try { currentUser = UserGroupInformation.getCurrentUser(); defaultAfs = getAbstractFileSystem(currentUser, defaultFsUri, aConf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bc70c89..bf758c4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -225,6 +225,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-6048. Fixed TestJavaSerialization failure. (Varun Vasudev via jianhe) + MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect + with no authority in job jar path. (Gera Shegalov via jlowe) + Release 2.5.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 58a8537..5a8a658 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -653,9 +653,11 @@ public abstract class TaskAttemptImpl implements // //////////// Set up JobJar to be localized properly on the remote NM. String jobJar = conf.get(MRJobConfig.JAR); if (jobJar != null) { - Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS - .getUri(), remoteFS.getWorkingDirectory()); - LocalResource rc = createLocalResource(remoteFS, remoteJobJar, + final Path jobJarPath = new Path(jobJar); + final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); + Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), + jobJarFs.getWorkingDirectory()); + LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index d8b3a26..4db616a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -249,11 +249,10 @@ class JobSubmitter { } Path jobJarPath = new Path(jobJar); URI jobJarURI = jobJarPath.toUri(); - // If the job jar is already in fs, we don't need to copy it from local fs - if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null - || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) - && jobJarURI.getAuthority().equals( - jtFs.getUri().getAuthority()))) { + // If the job jar is already in a global fs, + // we don't need to copy it from local fs + if ( jobJarURI.getScheme() == null + || jobJarURI.getScheme().equals("file")) { copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), replication); job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index a1c4c32..7b2cf53 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -357,8 +357,9 @@ public class YARNRunner implements ClientProtocol { jobConfPath, LocalResourceType.FILE)); if (jobConf.get(MRJobConfig.JAR) != null) { Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); - LocalResource rc = createApplicationResource(defaultFileContext, - jobJarPath, + LocalResource rc = createApplicationResource( + FileContext.getFileContext(jobJarPath.toUri(), jobConf), + jobJarPath, LocalResourceType.PATTERN); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 6e2b628..d53a257 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.FailingMapper; import org.apache.hadoop.RandomTextWriterJob; import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat; +import org.apache.hadoop.fs.viewfs.ConfigUtil; import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.mapreduce.SleepJob.SleepMapper; import org.apache.hadoop.conf.Configuration; @@ -85,6 +86,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ApplicationClassLoader; +import org.apache.hadoop.util.ClassUtil; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -93,6 +95,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -106,6 +109,9 @@ public class TestMRJobs { private static final int NUM_NODE_MGRS = 3; private static final String TEST_IO_SORT_MB = "11"; + private static final int DEFAULT_REDUCES = 2; + protected int numSleepReducers = DEFAULT_REDUCES; + protected static MiniMRYarnCluster mrCluster; protected static MiniDFSCluster dfsCluster; @@ -170,10 +176,23 @@ public class TestMRJobs { } } + @After + public void resetInit() { + numSleepReducers = DEFAULT_REDUCES; + } + + @Test (timeout = 300000) + public void testSleepJob() throws Exception { + testSleepJobInternal(false); + } + @Test (timeout = 300000) - public void testSleepJob() throws IOException, InterruptedException, - ClassNotFoundException { - LOG.info("\n\n\nStarting testSleepJob()."); + public void testSleepJobWithRemoteJar() throws Exception { + testSleepJobInternal(true); + } + + private void testSleepJobInternal(boolean useRemoteJar) throws Exception { + LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR @@ -187,14 +206,20 @@ public class TestMRJobs { SleepJob sleepJob = new SleepJob(); sleepJob.setConf(sleepConf); - - int numReduces = sleepConf.getInt("TestMRJobs.testSleepJob.reduces", 2); // or sleepConf.getConfig().getInt(MRJobConfig.NUM_REDUCES, 2); // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each: - Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1); + Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1); job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. - job.setJarByClass(SleepJob.class); + if (useRemoteJar) { + final Path localJar = new Path( + ClassUtil.findContainingJar(SleepJob.class)); + ConfigUtil.addLink(job.getConfiguration(), "/jobjars", + localFs.makeQualified(localJar.getParent()).toUri()); + job.setJar("viewfs:///jobjars/" + localJar.getName()); + } else { + job.setJarByClass(SleepJob.class); + } job.setMaxMapAttempts(1); // speed up failures job.submit(); String trackingUrl = job.getTrackingURL(); @@ -329,7 +354,7 @@ public class TestMRJobs { .getValue()); Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) .getValue()); - Assert.assertEquals(2, + Assert.assertEquals(numSleepReducers, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null http://git-wip-us.apache.org/repos/asf/hadoop/blob/7be74ea0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java index e89a919..e198f99 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java @@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2; import java.io.File; import java.io.IOException; -import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,8 +39,7 @@ import org.junit.Test; public class TestUberAM extends TestMRJobs { private static final Log LOG = LogFactory.getLog(TestUberAM.class); - private int numSleepReducers; - + @BeforeClass public static void setup() throws IOException { TestMRJobs.setup(); @@ -54,21 +52,15 @@ public class TestUberAM extends TestMRJobs { @Override @Test public void testSleepJob() - throws IOException, InterruptedException, ClassNotFoundException { + throws Exception { numSleepReducers = 1; - if (mrCluster != null) { - mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers); - } super.testSleepJob(); } @Test public void testSleepJobWithMultipleReducers() - throws IOException, InterruptedException, ClassNotFoundException { + throws Exception { numSleepReducers = 3; - if (mrCluster != null) { - mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers); - } super.testSleepJob(); } @@ -76,20 +68,7 @@ public class TestUberAM extends TestMRJobs { protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); - - Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) - .getValue()); - Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) - .getValue()); - Assert.assertEquals(numSleepReducers, - counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); - Assert - .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null - && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); - Assert - .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null - && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); - + super.verifySleepJobCounters(job); Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue()); Assert.assertEquals(numSleepReducers, @@ -168,16 +147,7 @@ public class TestUberAM extends TestMRJobs { protected void verifyFailingMapperCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); - Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) - .getValue()); - Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) - .getValue()); - Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS) - .getValue()); - Assert - .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null - && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); - + super.verifyFailingMapperCounters(job); Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue()); Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)