Hi Jeff,

We are using flume-ng version 1.3.

-Ashish


On Wednesday 29 May 2013 03:08 AM, Jeff Lord wrote:
Hi Ashish,

What version of flume are you running?

flume-ng version

-Jeff


On Fri, May 24, 2013 at 3:38 AM, Ashish Tadose <[email protected] <mailto:[email protected]>> wrote:

    Hi All,

    We are facing this issue in production flume setup.

    Issue initiates when HDFS sink BucketWriter fails to append a
    batch for a file because of hadoop datanode issue.
    Then it tries to close that file but even close() file throws a
    exception and HDFS sink does not remove that bucketwriter
    instance. [Cause may be no. of bucketwriter instances are below
    *maxOpenFiles*].

    Whats creating more problem is that HDFS sink is not letting go
    off that bucketwriter instance and keeps trying to append/close to
    that file infinitely which cause it to disrupt the event
    processing towards to HDFS.

    *Logs from flume agent **
    *
    2013-05-20 02:32:40,669 (ResponseProcessor for block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:3015)]
    DFSOutputStream ResponseProcessor exception  for block
    blk_8857674139042547711_2143611java.net.SocketTimeoutException:
    69000 millis timeout while waiting for channel to be ready for
    read. ch : java.nio.channels.SocketChannel[connected
    local=/<datanode_ip>:41129 remote=/<datanode2_ip>:60010]
            at
    org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
            at
    org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
            at
    org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
            at java.io.DataInputStream.readFully(DataInputStream.java:178)
            at java.io.DataInputStream.readLong(DataInputStream.java:399)
            at
    
org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124)
            at
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967)

    2013-05-20 02:32:56,321 (DataStreamer for file
    /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3051)]
    *Error Recovery for block blk_8857674139042547711_2143611 bad
    datanode[0] <datanode2_ip>:60010*
    2013-05-20 02:32:56,322 (DataStreamer for file
    /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3102)]
    Error Recovery for block blk_8857674139042547711_2143611 in
    pipeline <datanode2_ip>:60010, <datanode3_ip>:60010,
    <datanode4_ip>:60010: bad datanode <datanode2_ip>:60010
    2013-05-20 02:32:56,538 (Log-BackgroundWorker-fileChannel) [INFO -
    
org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:109)]
    Start checkpoint for
    /home/flume/flume_channel/checkpointDir15/checkpoint, elements to
    sync = 42000
    2013-05-20 02:32:56,634 (Log-BackgroundWorker-fileChannel) [INFO -
    
org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:117)]
    Updating checkpoint metadata: logWriteOrderID: 1370969225649,
    queueSize: 140156, queueHead: 72872626
    2013-05-20 02:32:56,722 (Log-BackgroundWorker-fileChannel) [INFO -
    
org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
    Updating log-543.meta currentPosition = 743847065, logWriteOrderID
    = 1370969225649
    2013-05-20 02:32:56,759 (Log-BackgroundWorker-fileChannel) [INFO -
    org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:886)]
    Updated checkpoint for file:
    /home/flume/flume_channel/dataDir15/log-543 position: 743847065
    logWriteOrderID: 1370969225649
    2013-05-20 02:33:56,331 (DataStreamer for file
    /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
    Failed recovery attempt #0 from primary datanode <datanode3_ip>:60010
    java.net.SocketTimeoutException: Call to /<datanode3_ip>:60021
    failed on socket timeout exception:
    java.net.SocketTimeoutException: 60000 millis timeout while
    waiting for channel to be ready for read. ch :
    java.nio.channels.SocketChannel[connected
    local=/<datanode_ip>:41526 remote=/<datanode3_ip>:60021]
    ..
    ..
    ..
    2013-05-20 02:36:00,249 (DataStreamer for file
    /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
    Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
    org.apache.hadoop.ipc.RemoteException:
    org.apache.hadoop.ipc.RemoteException: java.io.IOException:
    blk_8857674139042547711_2143611 is already commited, storedBlock
    == null.
            at
    
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
            at
    
org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
            at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown
    Source)
            at
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:396)
            at
    
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

            at org.apache.hadoop.ipc.Client.call(Client.java:1070)
            at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
            at $Proxy5.nextGenerationStamp(Unknown Source)
            at
    
org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
            at
    
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
            at
    
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at
    
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:396)
            at
    
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    2013-05-20 02:36:00,249 (DataStreamer for file
    /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
    Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
    org.apache.hadoop.ipc.RemoteException:
    org.apache.hadoop.ipc.RemoteException: java.io.IOException:
    blk_8857674139042547711_2143611 is already commited, storedBlock
    == null.
            at
    
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
            at
    
org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
            at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown
    Source)
            at
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:396)
            at
    
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

            at org.apache.hadoop.ipc.Client.call(Client.java:1070)
            at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
            at $Proxy5.nextGenerationStamp(Unknown Source)
            at
    
org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
            at
    
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
            at
    
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at
    
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
            at
    org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:396)
            at
    
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
           at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

            at org.apache.hadoop.ipc.Client.call(Client.java:1070)
            at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
            at $Proxy7.recoverBlock(Unknown Source)
            at
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3121)
            at
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2100(DFSClient.java:2589)
            at
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2793)
    2013-05-20 02:36:00,249 (DataStreamer for file
    /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
    blk_8857674139042547711_2143611) [WARN -
    
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3162)]
Error Recovery for block blk_8857674139042547711_2143611 failed because recovery from primary datanode <datanode2_ip>:60010 failed
    1 times.  Pipeline was <datanode2_ip>:60010. Will retry...
    ..
    ..
    ..
    2013-05-20 02:36:16,933 (hdfs-hdfsSink1-call-runner-8) [WARN -
    org.apache.flume.sink.hdfs.BucketWriter.*append*(BucketWriter.java:378)]
    Caught IOException writing to HDFSWriter (write beyond end of
    stream). *Closing file
    (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp)
    and rethrowing exception.*
    2013-05-20 02:36:16,934 (hdfs-hdfsSink1-call-runner-8) [WARN -
    org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:384)]
    Caught IOException while closing file
    (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp).
    *Exception follows.**
    **java.io.IOException: DFSOutputStream is closed*
            at
    org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3669)
            at
    org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
            at
    
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.sync(HDFSCompressedDataStream.java:96)
            at
    org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:345)
            at
    org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:53)
            at
    org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:310)
            at
    org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:308)
            at
    org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
            at
    org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:308)
            at
    org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:257)
            at
    org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:382)
            at
    org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
            at
    org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
            at
    java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
            at java.util.concurrent.FutureTask.run(FutureTask.java:138)
            at
    
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
            at
    
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
            at java.lang.Thread.run(Thread.java:662)
    2013-05-20 02:36:16,934
    (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN -
    org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
    HDFS IO error
    java.io.IOException: write beyond end of stream
            at
    com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
            at java.io.OutputStream.write(OutputStream.java:58)
            at
    
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
            at
    org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
            at
    org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
            at
    org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
            at
    java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
           at java.util.concurrent.FutureTask.run(FutureTask.java:138)
            at
    
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
            at
    
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
            at java.lang.Thread.run(Thread.java:662)
    2013-05-20 02:36:16,934
    (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN -
    
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]*HDFS
    IO error**
    **java.io.IOException: write beyond end of stream*
            at
    com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
            at java.io.OutputStream.write(OutputStream.java:58)
            at
    
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
            at
    org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
            at
    org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
            at
    org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
            at
    java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
            at java.util.concurrent.FutureTask.run(FutureTask.java:138)
            at
    
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
            at
    
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
            at java.lang.Thread.run(Thread.java:662)
    2013-05-20 02:36:20,988 (hdfs-hdfsSink1-call-runner-7) [WARN -
    
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:378)]*Caught
    IOException writing to HDFSWriter (Error Recovery for block
    blk_8857674139042547711_2143611 failed because recovery from
    primary datanode <datanode2_ip>:60010 failed 6 times.  Pipeline
    was <datanode2_ip>:60010. Aborting...). Closing file
    (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp)
    and rethrowing exception.*
    ..
    ..
    ..
    then it keep repeating these logs
    *
    **Flume agent HDFSsink properties*

    agent.sinks.hdfsSink1.type = hdfs
    
agent.sinks.hdfsSink1.hdfs.path=hdfs://<namenode_ip>:64310/flume/tracker_data
    #agent.sinks.hdfsSink1.hdfs.fileType =DataStream
    agent.sinks.hdfsSink1.hdfs.fileType =CompressedStream
    agent.sinks.hdfsSink1.hdfs.filePrefix=Flume_raw_1_
    agent.sinks.hdfsSink1.hdfs.rollSize=0
    agent.sinks.hdfsSink1.hdfs.codeC=LZopCodec
    agent.sinks.hdfsSink1.hdfs.rollCount=0
    agent.sinks.hdfsSink1.hdfs.batchSize=5000
    agent.sinks.hdfsSink1.hdfs.rollInterval=60
    agent.sinks.hdfsSink1.channel= fileChannel
    agent.sinks.hdfsSink1.hdfs.callTimeout = 240000
    agent.sinks.hdfsSink1.hdfs.idleTimeout = 300

    Thanks,
    Ashish



Reply via email to