Nathan Schile created CRUNCH-685:
------------------------------------

             Summary: Limit Target#fileSystem(FileSystem) to only apply 
filesystem specific configurations to the FormatBundle
                 Key: CRUNCH-685
                 URL: https://issues.apache.org/jira/browse/CRUNCH-685
             Project: Crunch
          Issue Type: Improvement
          Components: Core
            Reporter: Nathan Schile
            Assignee: Josh Wills


I have an application that runs multiple Crunch pipelines. The first pipeline 
(P1) reads data from HDFS and completes successfully. The second pipeline (P2) 
writes data to the same HDFS that was used in the P1 pipeline. The Target 
configuration for the P2 pipeline is configured by utilizing the 
Target#fileSystem(FileSystem) method. The P2 pipeline fails when committing the 
job [1]. It fails when attempting to read a temporary directory from the P1 
pipeline, which was already deleted when the P1 pipeline completed.

The failure is occurring because the Hadoop Filesystem uses an internal cache 
[2] to cache Filesystems. The first pipeline create a FileSystem object that 
contains the configuration 
"mapreduce.output.fileoutputformat.outputdir":"hdfs://my-cluster/tmp/crunch-897836570/p2/output".
 When the P2 pipeline runs it invokes Target#fileSystem(FileSystem) which uses 
the cached FileSystem from P1 pipeline. The Target#fileSystem(FileSystem) 
method copies the configuration from the filesystem to the FormatBundle, which 
causes the erroneous "mapreduce.output.fileoutputformat.outputdir" to be set.

[1]

{noformat}
java.io.FileNotFoundException: File 
hdfs://my-cluster/tmp/crunch-897836570/p2/output/_temporary/1 does not exist.
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:747)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:113)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:808)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:804)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:804)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1566)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1609)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:322)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:392)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:365)
        at 
org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379)
        at 
org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379)
        at 
org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.handleJobCommit(CommitterEventHandler.java:285)
        at 
org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.run(CommitterEventHandler.java:237)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{noformat}
[2] 
http://johnjianfang.blogspot.com/2015/03/hadoop-filesystem-internal-cache.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to