Author: bobby
Date: Wed Sep 26 18:16:03 2012
New Revision: 1390629

URL: http://svn.apache.org/viewvc?rev=1390629&view=rev
Log:
svn merge -c 1377149 FIXES: MAPREDUCE-4408. allow jobs to set a JAR that is in 
the distributed cached (rkanter via tucu)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1390629&r1=1390628&r2=1390629&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed 
Sep 26 18:16:03 2012
@@ -16,6 +16,9 @@ Release 0.23.4 - UNRELEASED
 
     MAPREDUCE-4651. Benchmarking random reads with DFSIO. (shv)
 
+    MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached 
+    (rkanter via tucu)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1390629&r1=1390628&r2=1390629&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
 Wed Sep 26 18:16:03 2012
@@ -232,9 +232,17 @@ class JobSubmitter {
       if ("".equals(job.getJobName())){
         job.setJobName(new Path(jobJar).getName());
       }
-      copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir), 
-          replication);
-      job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      Path jobJarPath = new Path(jobJar);
+      URI jobJarURI = jobJarPath.toUri();
+      // If the job jar is already in fs, we don't need to copy it from local 
fs
+      if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
+              || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) 
+                  && jobJarURI.getAuthority().equals(
+                                            jtFs.getUri().getAuthority()))) {
+        copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), 
+            replication);
+        job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      }
     } else {
       LOG.warn("No job jar file set.  User classes may not be found. "+
       "See Job or Job#setJar(String).");

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1390629&r1=1390628&r2=1390629&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
 Wed Sep 26 18:16:03 2012
@@ -345,7 +345,7 @@ public class YARNRunner implements Clien
         createApplicationResource(defaultFileContext,
             jobConfPath, LocalResourceType.FILE));
     if (jobConf.get(MRJobConfig.JAR) != null) {
-      Path jobJarPath = new Path(jobSubmitDir, MRJobConfig.JOB_JAR);
+      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
       LocalResource rc = createApplicationResource(defaultFileContext,
           jobJarPath, 
           LocalResourceType.PATTERN);

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1390629&r1=1390628&r2=1390629&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
 Wed Sep 26 18:16:03 2012
@@ -70,8 +70,10 @@ public class MiniMRYarnCluster extends M
   @Override
   public void init(Configuration conf) {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
-        "apps_staging_dir/").getAbsolutePath());
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir/").getAbsolutePath());
+    }
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
 
     try {

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1390629&r1=1390628&r2=1390629&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
 Wed Sep 26 18:16:03 2012
@@ -41,10 +41,10 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -80,15 +80,24 @@ public class TestMRJobs {
   private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
 
   protected static MiniMRYarnCluster mrCluster;
+  protected static MiniDFSCluster dfsCluster;
 
   private static Configuration conf = new Configuration();
   private static FileSystem localFs;
+  private static FileSystem remoteFs;
   static {
     try {
       localFs = FileSystem.getLocal(conf);
     } catch (IOException io) {
       throw new RuntimeException("problem getting local fs", io);
     }
+    try {
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
   }
 
   private static Path TEST_ROOT_DIR = new Path("target",
@@ -107,6 +116,8 @@ public class TestMRJobs {
     if (mrCluster == null) {
       mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
       Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
       mrCluster.init(conf);
       mrCluster.start();
     }
@@ -123,6 +134,10 @@ public class TestMRJobs {
       mrCluster.stop();
       mrCluster = null;
     }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
   }
 
   @Test
@@ -403,7 +418,6 @@ public class TestMRJobs {
       Configuration conf = context.getConfiguration();
       Path[] files = context.getLocalCacheFiles();
       Path[] archives = context.getLocalCacheArchives();
-      FileSystem fs = LocalFileSystem.get(conf);
 
       // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files 
       // and 2 archives are present
@@ -411,13 +425,13 @@ public class TestMRJobs {
       Assert.assertEquals(2, archives.length);
 
       // Check lengths of the files
-      Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen());
-      Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1);
+      Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen());
+      Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1);
 
       // Check extraction of the archive
-      Assert.assertTrue(fs.exists(new Path(archives[0],
+      Assert.assertTrue(localFs.exists(new Path(archives[0],
           "distributed.jar.inside3")));
-      Assert.assertTrue(fs.exists(new Path(archives[1],
+      Assert.assertTrue(localFs.exists(new Path(archives[1],
           "distributed.jar.inside4")));
 
       // Check the class loaders
@@ -448,8 +462,7 @@ public class TestMRJobs {
     }
   }
 
-  @Test
-  public void testDistributedCache() throws Exception {
+  public void _testDistributedCache(String jobJarPath) throws Exception {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
            + " not found. Not running test.");
@@ -470,11 +483,13 @@ public class TestMRJobs {
     
     // Set the job jar to a new "dummy" jar so we can check that its extracted 
     // properly
-    job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()));
+    job.setJar(jobJarPath);
     // Because the job jar is a "dummy" jar, we need to include the jar with
     // DistributedCacheChecker or it won't be able to find it
-    job.addFileToClassPath(new Path(
-            JarFinder.getJar(DistributedCacheChecker.class)));
+    Path distributedCacheCheckerJar = new Path(
+            JarFinder.getJar(DistributedCacheChecker.class));
+    job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
+            localFs.getUri(), distributedCacheCheckerJar.getParent()));
     
     job.setMapperClass(DistributedCacheChecker.class);
     job.setOutputFormatClass(NullOutputFormat.class);
@@ -484,7 +499,9 @@ public class TestMRJobs {
     job.addCacheFile(
         new URI(first.toUri().toString() + "#distributed.first.symlink"));
     job.addFileToClassPath(second);
-    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    // The AppMaster jar itself
+    job.addFileToClassPath(
+            APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); 
     job.addArchiveToClassPath(third);
     job.addCacheArchive(fourth.toUri());
     job.setMaxMapAttempts(1); // speed up failures
@@ -497,6 +514,23 @@ public class TestMRJobs {
                       " but didn't Match Job ID " + jobId ,
           trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
   }
+  
+  @Test
+  public void testDistributedCache() throws Exception {
+    // Test with a local (file:///) Job Jar
+    Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
+    _testDistributedCache(localJobJarPath.toUri().toString());
+    
+    // Test with a remote (hdfs://) Job Jar
+    Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/",
+            localJobJarPath.getName());
+    remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath);
+    File localJobJarFile = new File(localJobJarPath.toUri().toString());
+    if (localJobJarFile.exists()) {     // just to make sure
+        localJobJarFile.delete();
+    }
+    _testDistributedCache(remoteJobJarPath.toUri().toString());
+  }
 
   private Path createTempFile(String filename, String contents)
       throws IOException {
@@ -522,7 +556,7 @@ public class TestMRJobs {
     return p;
   }
   
-  private String makeJobJarWithLib(String testDir) throws 
FileNotFoundException, 
+  private Path makeJobJarWithLib(String testDir) throws FileNotFoundException, 
       IOException{
     Path jobJarPath = new Path(testDir, "thejob.jar");
     FileOutputStream fos =
@@ -535,7 +569,7 @@ public class TestMRJobs {
             new Path(testDir, "lib2.jar").toUri().getPath()));
     jos.close();
     localFs.setPermission(jobJarPath, new FsPermission("700"));
-    return jobJarPath.toUri().toString();
+    return jobJarPath;
   }
   
   private void createAndAddJarToJar(JarOutputStream jos, File jarFile) 


Reply via email to