Hi Ashish, What version of flume are you running?
flume-ng version -Jeff On Fri, May 24, 2013 at 3:38 AM, Ashish Tadose <[email protected]>wrote: > Hi All, > > We are facing this issue in production flume setup. > > Issue initiates when HDFS sink BucketWriter fails to append a batch for a > file because of hadoop datanode issue. > Then it tries to close that file but even close() file throws a exception > and HDFS sink does not remove that bucketwriter instance. [Cause may be no. > of bucketwriter instances are below *maxOpenFiles*]. > > Whats creating more problem is that HDFS sink is not letting go off that > bucketwriter instance and keeps trying to append/close to that file > infinitely which cause it to disrupt the event processing towards to HDFS. > > *Logs from flume agent ** > * > 2013-05-20 02:32:40,669 (ResponseProcessor for block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:3015)] > DFSOutputStream ResponseProcessor exception for block > blk_8857674139042547711_2143611java.net.SocketTimeoutException: 69000 > millis timeout while waiting for channel to be ready for read. ch : > java.nio.channels.SocketChannel[connected local=/<datanode_ip>:41129 > remote=/<datanode2_ip>:60010] > at > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155) > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128) > at java.io.DataInputStream.readFully(DataInputStream.java:178) > at java.io.DataInputStream.readLong(DataInputStream.java:399) > at > org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124) > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967) > > 2013-05-20 02:32:56,321 (DataStreamer for file > /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3051)] > *Error Recovery for block blk_8857674139042547711_2143611 bad datanode[0] > <datanode2_ip>:60010* > 2013-05-20 02:32:56,322 (DataStreamer for file > /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3102)] > Error Recovery for block blk_8857674139042547711_2143611 in pipeline > <datanode2_ip>:60010, <datanode3_ip>:60010, <datanode4_ip>:60010: bad > datanode <datanode2_ip>:60010 > 2013-05-20 02:32:56,538 (Log-BackgroundWorker-fileChannel) [INFO - > org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:109)] > Start checkpoint for /home/flume/flume_channel/checkpointDir15/checkpoint, > elements to sync = 42000 > 2013-05-20 02:32:56,634 (Log-BackgroundWorker-fileChannel) [INFO - > org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:117)] > Updating checkpoint metadata: logWriteOrderID: 1370969225649, queueSize: > 140156, queueHead: 72872626 > 2013-05-20 02:32:56,722 (Log-BackgroundWorker-fileChannel) [INFO - > org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)] > Updating log-543.meta currentPosition = 743847065, logWriteOrderID = > 1370969225649 > 2013-05-20 02:32:56,759 (Log-BackgroundWorker-fileChannel) [INFO - > org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:886)] Updated > checkpoint for file: /home/flume/flume_channel/dataDir15/log-543 position: > 743847065 logWriteOrderID: 1370969225649 > 2013-05-20 02:33:56,331 (DataStreamer for file > /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)] > Failed recovery attempt #0 from primary datanode <datanode3_ip>:60010 > java.net.SocketTimeoutException: Call to /<datanode3_ip>:60021 failed on > socket timeout exception: java.net.SocketTimeoutException: 60000 millis > timeout while waiting for channel to be ready for read. ch : > java.nio.channels.SocketChannel[connected local=/<datanode_ip>:41526 > remote=/<datanode3_ip>:60021] > .. > .. > .. > 2013-05-20 02:36:00,249 (DataStreamer for file > /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)] > Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010 > org.apache.hadoop.ipc.RemoteException: > org.apache.hadoop.ipc.RemoteException: java.io.IOException: > blk_8857674139042547711_2143611 is already commited, storedBlock == null. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388) > at > org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748) > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) > > at org.apache.hadoop.ipc.Client.call(Client.java:1070) > at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) > at $Proxy5.nextGenerationStamp(Unknown Source) > at > org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999) > at > org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967) > at > org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > 2013-05-20 02:36:00,249 (DataStreamer for file > /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)] > Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010 > org.apache.hadoop.ipc.RemoteException: > org.apache.hadoop.ipc.RemoteException: java.io.IOException: > blk_8857674139042547711_2143611 is already commited, storedBlock == null. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388) > at > org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748) > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) > > at org.apache.hadoop.ipc.Client.call(Client.java:1070) > at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) > at $Proxy5.nextGenerationStamp(Unknown Source) > at > org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999) > at > org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967) > at > org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) > > at org.apache.hadoop.ipc.Client.call(Client.java:1070) > at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) > at $Proxy7.recoverBlock(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3121) > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2100(DFSClient.java:2589) > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2793) > 2013-05-20 02:36:00,249 (DataStreamer for file > /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block > blk_8857674139042547711_2143611) [WARN - > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3162)] > Error Recovery for block blk_8857674139042547711_2143611 failed because > recovery from primary datanode <datanode2_ip>:60010 failed 1 times. > Pipeline was <datanode2_ip>:60010. Will retry... > .. > .. > .. > 2013-05-20 02:36:16,933 (hdfs-hdfsSink1-call-runner-8) [WARN - > org.apache.flume.sink.hdfs.BucketWriter.*append*(BucketWriter.java:378)] > Caught IOException writing to HDFSWriter (write beyond end of stream). > *Closing > file > (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp) > and rethrowing exception.* > 2013-05-20 02:36:16,934 (hdfs-hdfsSink1-call-runner-8) [WARN - > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:384)] > Caught IOException while closing file > (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp). > *Exception follows.** > **java.io.IOException: DFSOutputStream is closed* > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3669) > at > org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97) > at > org.apache.flume.sink.hdfs.HDFSCompressedDataStream.sync(HDFSCompressedDataStream.java:96) > at > org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:345) > at > org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:53) > at > org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:310) > at > org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:308) > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143) > at > org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:308) > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:257) > at > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:382) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727) > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > 2013-05-20 02:36:16,934 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [WARN - > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)] > HDFS IO error > java.io.IOException: write beyond end of stream > at > com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127) > at java.io.OutputStream.write(OutputStream.java:58) > at > org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81) > at > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727) > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > 2013-05-20 02:36:16,934 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [WARN - > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]*HDFS > IO error > ** > **java.io.IOException: write beyond end of stream* > at > com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127) > at java.io.OutputStream.write(OutputStream.java:58) > at > org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81) > at > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729) > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727) > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > 2013-05-20 02:36:20,988 (hdfs-hdfsSink1-call-runner-7) [WARN - > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:378)]*Caught > IOException writing to HDFSWriter (Error Recovery for block > blk_8857674139042547711_2143611 failed because recovery from primary > datanode <datanode2_ip>:60010 failed 6 times. Pipeline was > <datanode2_ip>:60010. Aborting...). Closing file > (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp) > and rethrowing exception.* > .. > .. > .. > then it keep repeating these logs > * > **Flume agent HDFSsink properties* > > agent.sinks.hdfsSink1.type = hdfs > > agent.sinks.hdfsSink1.hdfs.path=hdfs://<namenode_ip>:64310/flume/tracker_data > #agent.sinks.hdfsSink1.hdfs.fileType =DataStream > agent.sinks.hdfsSink1.hdfs.fileType =CompressedStream > agent.sinks.hdfsSink1.hdfs.filePrefix=Flume_raw_1_ > agent.sinks.hdfsSink1.hdfs.rollSize=0 > agent.sinks.hdfsSink1.hdfs.codeC=LZopCodec > agent.sinks.hdfsSink1.hdfs.rollCount=0 > agent.sinks.hdfsSink1.hdfs.batchSize=5000 > agent.sinks.hdfsSink1.hdfs.rollInterval=60 > agent.sinks.hdfsSink1.channel= fileChannel > agent.sinks.hdfsSink1.hdfs.callTimeout = 240000 > agent.sinks.hdfsSink1.hdfs.idleTimeout = 300 > > Thanks, > Ashish > >
