What version of Hadoop are you using? It looks like the flag used to make this work fs.automatic.close is not in 0.20/1.0 versions. It is in CDH3 but the call to closeAll() in the finalizer sets a boolean flag which disables this.
It does look to be in Hadoop trunk/CDH4. On Fri, Aug 3, 2012 at 9:16 AM, Yongcheng Li <[email protected]> wrote: > FLUME-1163 (https://issues.apache.org/jira/browse/FLUME-1163) states > that this problem has been fixed in Flume 1.2.0. However, my tests indicate > that the fix works sometimes, but does not work most of the time (i.e. it > didn’t rename the .tmp file). The exceptions occurred when it did not work > are shown below. It seems that the Filesystem has been closed when it tries > to close and rename the .tmp file. **** > > ** ** > > 2012-08-02 16:09:16,844 WARN hdfs.BucketWriter: failed to close() > HDFSWriter for file (hdfs:// > xxx.yyy.com/user/yongli/flume/2012/08_02_16.events.1343938150590.tmp). > Exception follows.**** > > java.io.IOException: DFSOutputStream is closed**** > > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3666) > **** > > at > org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)** > ** > > at > org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)** > ** > > at > org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)**** > > at > org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)** > ** > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)**** > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)**** > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > **** > > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)**** > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:796)*** > * > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:793)*** > * > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)**** > > at java.util.concurrent.FutureTask.run(FutureTask.java:138)**** > > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > **** > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > **** > > at java.lang.Thread.run(Thread.java:619)**** > > 2012-08-02 16:09:16,853 INFO hdfs.BucketWriter: Renaming hdfs:// > xxx.yyy.com /user/yongli/flume/2012/08_02_16.events.1343938150590.tmp to > hdfs://xxx.yyy.com/user/yongli/flume /2012/08_02_16.events.1343938150590** > ** > > 2012-08-02 16:09:16,853 WARN hdfs.HDFSEventSink: Exception while closing > hdfs:// xxx.yyy.com /user/yongli/flume /2012/08_02_16.events. Exception > follows.**** > > java.io.IOException: Filesystem closed**** > > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)* > *** > > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:800)**** > > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:216) > **** > > at > org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:377) > **** > > at > org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)**** > > at > org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)** > ** > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)**** > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)**** > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > **** > > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)**** > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:796)*** > * > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:793)*** > * > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)**** > > at java.util.concurrent.FutureTask.run(FutureTask.java:138)**** > > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > **** > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > **** > > at java.lang.Thread.run(Thread.java:619)**** > > 2012-08-02 16:09:16,856 INFO instrumentation.MonitoredCounterGroup: > Component type: SINK, name: hdfs-sink stopped**** > -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
