[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhihai xu updated MAPREDUCE-6238:
---------------------------------
    Attachment: MAPREDUCE-6238.000.patch

> MR2 can't run local jobs with -libjars command options which is a regression 
> from MR1
> -------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6238
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6238
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mrv2
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>            Priority: Critical
>         Attachments: MAPREDUCE-6238.000.patch
>
>
> MR2 can't run local jobs with -libjars command options which is a regression 
> from MR1. 
> When run MR2 job with -jt local and -libjars, the job fails with 
> java.io.FileNotFoundException: File does not exist: 
> hdfs://XXXXXXXXXXXXXXX.jar.
> But the same command is working in MR1.
> I find the problem is
> 1.
> because when MR2 run local job using  LocalJobRunner
> from JobSubmitter, the JobSubmitter#jtFs is local filesystem,
> So copyRemoteFiles will return from [the middle of the 
> function|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L138]
> because source and destination file system are same.
> {code}
>     if (compareFs(remoteFs, jtFs)) {
>       return originalPath;
>     }
> {code}
> The following code at 
> [JobSubmitter.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L219]
> try to add the destination file to DistributedCache which introduce a bug for 
> local job.
> {code}
>         Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
>         DistributedCache.addFileToClassPath(
>             new Path(newPath.toUri().getPath()), conf);
> {code}
> Because new Path(newPath.toUri().getPath()) will lose the filesystem 
> information from newPath, the file added to DistributedCache will use the 
> default Uri filesystem hdfs based on the following code. This causes the 
>  FileNotFoundException when we access the file later at 
>  
> [determineTimestampsAndCacheVisibilities|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L270]
> {code}
>   public static void addFileToClassPath(Path file, Configuration conf)
>     throws IOException {
>         addFileToClassPath(file, conf, file.getFileSystem(conf));
>   }
>   public static void addFileToClassPath
>            (Path file, Configuration conf, FileSystem fs)
>         throws IOException {
>     String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
>     conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
>              : classpath + "," + file.toString());
>     URI uri = fs.makeQualified(file).toUri();
>     addCacheFile(uri, conf);
>   }
> {code}
> Compare to the following [MR1 
> code|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java#L811]:
> {code}
>         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
>         DistributedCache.addFileToClassPath(
>           new Path(newPath.toUri().getPath()), job, fs);
> {code}
> You will see why MR1 doesn't have this issue.
> because it passes the local filesystem into  
> DistributedCache#addFileToClassPath instead of using the default Uri 
> filesystem hdfs.
> 2.
> Another incompatible change in MR2 is in 
> [LocalDistributedCacheManager#setup|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java#L113]
> {code}
>     // Find which resources are to be put on the local classpath
>     Map<String, Path> classpaths = new HashMap<String, Path>();
>     Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
>     if (archiveClassPaths != null) {
>       for (Path p : archiveClassPaths) {
>         FileSystem remoteFS = p.getFileSystem(conf);
>         p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
>             remoteFS.getWorkingDirectory()));
>         classpaths.put(p.toUri().getPath().toString(), p);
>       }
>     }
>     Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
>     if (fileClassPaths != null) {
>       for (Path p : fileClassPaths) {
>         FileSystem remoteFS = p.getFileSystem(conf);
>         p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
>             remoteFS.getWorkingDirectory()));
>         classpaths.put(p.toUri().getPath().toString(), p);
>       }
>     }
> {code}
> Similar code from MR1 is at 
> [TaskDistributedCacheManager#makeCacheFiles|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java#L119]
> {code}
>         Map<String, Path> classPaths = new HashMap<String, Path>();
>         if (paths != null) {
>           for (Path p : paths) {
>             classPaths.put(p.toUri().getPath().toString(), p);
>             }
>         }
> {code}
> I think we don't need call remoteFS.resolvePath to get the class path and
> We can use the  class path from DistributedCache.getFileClassPaths directly.
> Also p.toUri().getPath().toString() will remove the filesystem 
> information(scheme) and only keySet of classpaths is used(ValueSet of 
> classpaths is not used).
> It is better to do the same in MR2 to maintain backward compatible with MR1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to