[
https://issues.apache.org/jira/browse/CRUNCH-685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Micah Whitacre resolved CRUNCH-685.
-----------------------------------
Resolution: Fixed
PR got approval and has been merged.
> Limit Target#fileSystem(FileSystem) to only apply filesystem specific
> configurations to the FormatBundle
> --------------------------------------------------------------------------------------------------------
>
> Key: CRUNCH-685
> URL: https://issues.apache.org/jira/browse/CRUNCH-685
> Project: Crunch
> Issue Type: Improvement
> Components: Core
> Reporter: Nathan Schile
> Assignee: Josh Wills
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> I have an application that runs multiple Crunch pipelines. The first pipeline
> (P1) reads data from HDFS and completes successfully. The second pipeline
> (P2) writes data to the same HDFS that was used in the P1 pipeline. The
> Target configuration for the P2 pipeline is configured by utilizing the
> Target#fileSystem(FileSystem) method. The P2 pipeline fails when committing
> the job [1]. It fails when attempting to read a temporary directory from the
> P1 pipeline, which was already deleted when the P1 pipeline completed.
> The failure is occurring because the Hadoop Filesystem uses an internal cache
> [2] to cache Filesystems. The first pipeline create a FileSystem object that
> contains the configuration
> "mapreduce.output.fileoutputformat.outputdir":"hdfs://my-cluster/tmp/crunch-897836570/p2/output".
> When the P2 pipeline runs it invokes Target#fileSystem(FileSystem) which
> uses the cached FileSystem from P1 pipeline. The
> Target#fileSystem(FileSystem) method copies the configuration from the
> filesystem to the FormatBundle, which causes the erroneous
> "mapreduce.output.fileoutputformat.outputdir" to be set.
> [1]
> {noformat}
> java.io.FileNotFoundException: File
> hdfs://my-cluster/tmp/crunch-897836570/p2/output/_temporary/1 does not exist.
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:747)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:113)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:808)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:804)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:804)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1566)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1609)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:322)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:392)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:365)
> at
> org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379)
> at
> org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379)
> at
> org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.handleJobCommit(CommitterEventHandler.java:285)
> at
> org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.run(CommitterEventHandler.java:237)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> [2]
> http://johnjianfang.blogspot.com/2015/03/hadoop-filesystem-internal-cache.html
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)