[ 
https://issues.apache.org/jira/browse/FLUME-2451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169844#comment-14169844
 ] 

Hari Shreedharan commented on FLUME-2451:
-----------------------------------------

Roshan:

- Do you think we should close *all* bucket writers on an HDFS IOException? Is 
there any specific way to identify if the namenode is down? We don't really 
need to close all bucket writers in all cases.
- The sfWritersMap is guarded by a lock - you should use the locking mechanism 
in this method too.

Also, the 2nd patch only has the unit test, not the code fix. Can you merge the 
two patches? Thanks!

> HDFS Sink Cannot Reconnect After NameNode Restart
> -------------------------------------------------
>
>                 Key: FLUME-2451
>                 URL: https://issues.apache.org/jira/browse/FLUME-2451
>             Project: Flume
>          Issue Type: Bug
>          Components: File Channel, Sinks+Sources
>    Affects Versions: v1.4.0
>         Environment: 8 node CDH 4.2.2 (2.0.0-cdh4.2.2) cluster
> All cluster machines are running Ubuntu 12.04 x86_64
>            Reporter: Andrew O'Neill
>            Assignee: Roshan Naik
>              Labels: HDFS, Sink
>         Attachments: FLUME-2451.patch, FLUME-2451.v2.patch
>
>
> I am testing a simple flume setup with a Sequence Generator Source, a File 
> Channel, and an HDFS Sink (see my flume.conf below). This configuration works 
> as expected until I reboot the cluster’s NameNode or until I restart the HDFS 
> service on the cluster. At this point, it appears that the Flume Agent cannot 
> reconnect to HDFS and must be manually restarted.
> Here is our flume.conf:
>     appserver.sources = rawtext
>     appserver.channels = testchannel
>     appserver.sinks = test_sink
>     appserver.sources.rawtext.type = seq
>     appserver.sources.rawtext.channels = testchannel
>     appserver.channels.testchannel.type = file
>     appserver.channels.testchannel.capacity = 10000000
>     appserver.channels.testchannel.minimumRequiredSpace = 214748364800
>     appserver.channels.testchannel.checkpointDir = 
> /Users/aoneill/Desktop/testchannel/checkpoint
>     appserver.channels.testchannel.dataDirs = 
> /Users/aoneill/Desktop/testchannel/data
>     appserver.channels.testchannel.maxFileSize = 20000000
>     appserver.sinks.test_sink.type = hdfs
>     appserver.sinks.test_sink.channel = testchannel
>     appserver.sinks.test_sink.hdfs.path = 
> hdfs://cluster01:8020/user/aoneill/flumetest
>     appserver.sinks.test_sink.hdfs.closeTries = 3
>     appserver.sinks.test_sink.hdfs.filePrefix = events-
>     appserver.sinks.test_sink.hdfs.fileSuffix = .avro
>     appserver.sinks.test_sink.hdfs.fileType = DataStream
>     appserver.sinks.test_sink.hdfs.writeFormat = Text
>     appserver.sinks.test_sink.hdfs.inUsePrefix = inuse-
>     appserver.sinks.test_sink.hdfs.inUseSuffix = .avro
>     appserver.sinks.test_sink.hdfs.rollCount = 100000
>     appserver.sinks.test_sink.hdfs.rollInterval = 30
>     appserver.sinks.test_sink.hdfs.rollSize = 10485760
> These are the two error message that the Flume Agent outputs constantly after 
> the restart:
>     2014-08-26 10:47:24,572 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> [ERROR - 
> org.apache.flume.sink.hdfs.AbstractHDFSWriter.isUnderReplicated(AbstractHDFSWriter.java:96)]
>  Unexpected error while checking replication factor
>     java.lang.reflect.InvocationTargetException
>         at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> org.apache.flume.sink.hdfs.AbstractHDFSWriter.getNumCurrentReplicas(AbstractHDFSWriter.java:162)
>         at 
> org.apache.flume.sink.hdfs.AbstractHDFSWriter.isUnderReplicated(AbstractHDFSWriter.java:82)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.shouldRotate(BucketWriter.java:452)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:387)
>         at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:744)
>     Caused by: java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>         at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:207)
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:525)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1253)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:891)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:881)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:982)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:779)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)
> and
>     2014-08-26 10:47:29,592 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> [WARN - 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)] 
> HDFS IO error
>     java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>         at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:207)
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:525)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1253)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:891)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:881)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:982)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:779)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)
> I can provide additional information if needed. Thank you very much for any 
> insight you are able to provide into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to