[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhihai xu updated MAPREDUCE-6238:
---------------------------------
    Description: 
MR2 can't run local jobs with -libjars command options which is a regression 
from MR1. 
When run MR2 job with -jt local and -libjars, the job fails with 
java.io.FileNotFoundException: File does not exist: hdfs://XXXXXXXXXXXXXXX.jar.
But the same command is working in MR1.
I find the problem is
1.
because when MR2 run local job using  LocalJobRunner
from JobSubmitter, the JobSubmitter#jtFs is local filesystem,
So copyRemoteFiles will return from [the middle of the 
function|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L138]
because source and destination file system are same.
{code}
    if (compareFs(remoteFs, jtFs)) {
      return originalPath;
    }
{code}
The following code at 
[JobSubmitter.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L219]
try to add the destination file to DistributedCache which introduce a bug for 
local job.
{code}
        Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
        DistributedCache.addFileToClassPath(
            new Path(newPath.toUri().getPath()), conf);
{code}
Because new Path(newPath.toUri().getPath()) will lose the filesystem 
information from newPath, the file added to DistributedCache will use the 
default Uri filesystem hdfs based on the following code. This causes the 
 FileNotFoundException when we access the file later at 
 
[determineTimestampsAndCacheVisibilities|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L270]
{code}
  public static void addFileToClassPath(Path file, Configuration conf)
    throws IOException {
          addFileToClassPath(file, conf, file.getFileSystem(conf));
  }
  public static void addFileToClassPath
           (Path file, Configuration conf, FileSystem fs)
        throws IOException {
    String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
    conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
             : classpath + "," + file.toString());
    URI uri = fs.makeQualified(file).toUri();
    addCacheFile(uri, conf);
  }
{code}

Compare to the following [MR1 
code|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java#L811]:
{code}
        Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
        DistributedCache.addFileToClassPath(
          new Path(newPath.toUri().getPath()), job, fs);
{code}
You will see why MR1 doesn't have this issue.
because it passes the local filesystem into  
DistributedCache#addFileToClassPath instead of using the default Uri filesystem 
hdfs.
2.
Another incompatible change from MR1 is
For MR2, in 
[LocalDistributedCacheManager#setup|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java#L113]
{code}
    // Find which resources are to be put on the local classpath
    Map<String, Path> classpaths = new HashMap<String, Path>();
    Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
    if (archiveClassPaths != null) {
      for (Path p : archiveClassPaths) {
        FileSystem remoteFS = p.getFileSystem(conf);
        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
            remoteFS.getWorkingDirectory()));
        classpaths.put(p.toUri().getPath().toString(), p);
      }
    }
    Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
    if (fileClassPaths != null) {
      for (Path p : fileClassPaths) {
        FileSystem remoteFS = p.getFileSystem(conf);
        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
            remoteFS.getWorkingDirectory()));
        classpaths.put(p.toUri().getPath().toString(), p);
      }
    }
{code}
Similar code from MR1 is at 
[TaskDistributedCacheManager#makeCacheFiles|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java#L119]
{code}
        Map<String, Path> classPaths = new HashMap<String, Path>();
        if (paths != null) {
          for (Path p : paths) {
            classPaths.put(p.toUri().getPath().toString(), p);
            }
        }
{code}

It is better to do the same in MR2 to maintain backward compatible with MR1.


  was:
MR2 can't run local jobs with -libjars command options which is a regression 
from MR1. 
When run MR2 job with -jt local and -libjars, the job fails with 
java.io.FileNotFoundException: File does not exist: hdfs://XXXXXXXXXXXXXXX.jar.
But the same command is working in MR1.
I find the problem is because when MR2 run local job using  LocalJobRunner
from JobSubmitter, the JobSubmitter#jtFs is local filesystem,
So copyRemoteFiles will return from [the middle of the 
function|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L138]
because source and destination file system are same.
{code}
    if (compareFs(remoteFs, jtFs)) {
      return originalPath;
    }
{code}
The following code at 
[JobSubmitter.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L219]
try to add the destination file to DistributedCache which introduce a bug for 
local job.
{code}
        Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
        DistributedCache.addFileToClassPath(
            new Path(newPath.toUri().getPath()), conf);
{code}
Because new Path(newPath.toUri().getPath()) will lose the filesystem 
information from newPath, the file added to DistributedCache will use the 
default Uri filesystem hdfs based on the following code. This causes the 
 FileNotFoundException when we access the file later at 
 
[determineTimestampsAndCacheVisibilities|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L270]
{code}
  public static void addFileToClassPath(Path file, Configuration conf)
    throws IOException {
          addFileToClassPath(file, conf, file.getFileSystem(conf));
  }
  public static void addFileToClassPath
           (Path file, Configuration conf, FileSystem fs)
        throws IOException {
    String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
    conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
             : classpath + "," + file.toString());
    URI uri = fs.makeQualified(file).toUri();
    addCacheFile(uri, conf);
  }
{code}

Compare to the following [MR1 
code|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java#L811]:
{code}
        Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
        DistributedCache.addFileToClassPath(
          new Path(newPath.toUri().getPath()), job, fs);
{code}
You will see why MR1 doesn't have this issue.
because it passes the local filesystem into  
DistributedCache#addFileToClassPath instead of using the default Uri filesystem 
hdfs.
We should do the same in MR2.



> MR2 can't run local jobs with -libjars command options which is a regression 
> from MR1
> -------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6238
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6238
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mrv2
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>            Priority: Critical
>         Attachments: MAPREDUCE-6238.000.patch
>
>
> MR2 can't run local jobs with -libjars command options which is a regression 
> from MR1. 
> When run MR2 job with -jt local and -libjars, the job fails with 
> java.io.FileNotFoundException: File does not exist: 
> hdfs://XXXXXXXXXXXXXXX.jar.
> But the same command is working in MR1.
> I find the problem is
> 1.
> because when MR2 run local job using  LocalJobRunner
> from JobSubmitter, the JobSubmitter#jtFs is local filesystem,
> So copyRemoteFiles will return from [the middle of the 
> function|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L138]
> because source and destination file system are same.
> {code}
>     if (compareFs(remoteFs, jtFs)) {
>       return originalPath;
>     }
> {code}
> The following code at 
> [JobSubmitter.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L219]
> try to add the destination file to DistributedCache which introduce a bug for 
> local job.
> {code}
>         Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
>         DistributedCache.addFileToClassPath(
>             new Path(newPath.toUri().getPath()), conf);
> {code}
> Because new Path(newPath.toUri().getPath()) will lose the filesystem 
> information from newPath, the file added to DistributedCache will use the 
> default Uri filesystem hdfs based on the following code. This causes the 
>  FileNotFoundException when we access the file later at 
>  
> [determineTimestampsAndCacheVisibilities|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java#L270]
> {code}
>   public static void addFileToClassPath(Path file, Configuration conf)
>     throws IOException {
>         addFileToClassPath(file, conf, file.getFileSystem(conf));
>   }
>   public static void addFileToClassPath
>            (Path file, Configuration conf, FileSystem fs)
>         throws IOException {
>     String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
>     conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
>              : classpath + "," + file.toString());
>     URI uri = fs.makeQualified(file).toUri();
>     addCacheFile(uri, conf);
>   }
> {code}
> Compare to the following [MR1 
> code|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java#L811]:
> {code}
>         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
>         DistributedCache.addFileToClassPath(
>           new Path(newPath.toUri().getPath()), job, fs);
> {code}
> You will see why MR1 doesn't have this issue.
> because it passes the local filesystem into  
> DistributedCache#addFileToClassPath instead of using the default Uri 
> filesystem hdfs.
> 2.
> Another incompatible change from MR1 is
> For MR2, in 
> [LocalDistributedCacheManager#setup|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java#L113]
> {code}
>     // Find which resources are to be put on the local classpath
>     Map<String, Path> classpaths = new HashMap<String, Path>();
>     Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
>     if (archiveClassPaths != null) {
>       for (Path p : archiveClassPaths) {
>         FileSystem remoteFS = p.getFileSystem(conf);
>         p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
>             remoteFS.getWorkingDirectory()));
>         classpaths.put(p.toUri().getPath().toString(), p);
>       }
>     }
>     Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
>     if (fileClassPaths != null) {
>       for (Path p : fileClassPaths) {
>         FileSystem remoteFS = p.getFileSystem(conf);
>         p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
>             remoteFS.getWorkingDirectory()));
>         classpaths.put(p.toUri().getPath().toString(), p);
>       }
>     }
> {code}
> Similar code from MR1 is at 
> [TaskDistributedCacheManager#makeCacheFiles|https://github.com/apache/hadoop/blob/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java#L119]
> {code}
>         Map<String, Path> classPaths = new HashMap<String, Path>();
>         if (paths != null) {
>           for (Path p : paths) {
>             classPaths.put(p.toUri().getPath().toString(), p);
>             }
>         }
> {code}
> It is better to do the same in MR2 to maintain backward compatible with MR1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to