Hi Wayne, Thanks for doing the investigation, this seems like a legitimate problem. I have created an issue to track this (FLUME-2564 <https://issues.apache.org/jira/browse/FLUME-2564>). In case you already have a patch to address this problem, please post that on the Jira.
Regards, Arvind Prabhakar On Thu, Nov 27, 2014 at 1:05 AM, Wan Yi(武汉_技术部_搜索与精准化_万毅) <[email protected]> wrote: > I have investigated the HDFSEventSink source code, found if the > exception was IOException , the exception would not throw to the upper > layer, > > So FailOverSinkProcessor would not mark this sink as dead. > > > > } *catch* (IOException eIO) { > > transaction.rollback(); > > *LOG*.warn("HDFS IO error", eIO); > > *return* Status.*BACKOFF*; > > } *catch* (Throwable th) { > > transaction.rollback(); > > *LOG*.error("process failed", th); > > *if* (th *instanceof* Error) { > > *throw* (Error) th; > > } *else* { > > *throw* *new* EventDeliveryException(th); > > } > > } > > > > > > > > *发件人:* Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:[email protected]] > *发送时间:* 2014年11月27日 16:02 > *收件人:* [email protected] > *主题:* re: Why failover sink processor does not work > > > > By the way, I use flume-1.4.0 > > > > > > > > Wayne Wan > > *发件人:* Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:[email protected] <[email protected]>] > *发送时间:* 2014年11月27日 15:57 > *收件人:* [email protected] > *主题:* Why failover sink processor does not work > > > > I use hdfs to store our logs, but the failover processor seems does not > work when I killed the hdfs cluster that used by the high priority > sink(sinks1). > > > > *Below is my config* > > > > #### define agent > > a1.sources = src1 > > a1.sinks = sinks1 sinks5 > > a1.channels = ch1 > > a1.sinkgroups = g1 > > #### defined the sink group > > a1.sinkgroups.g1.sinks = sinks1 sinks5 > > a1.sinkgroups.g1.processor.type = failover > > a1.sinkgroups.g1.processor.priority.sinks1 = 5 > > a1.sinkgroups.g1.processor.priority.sinks5 = 1 > > a1.sinkgroups.g1.processor.maxpenalty = 1000 > > > > #### define http source > > a1.sources.src1.type = ** > > a1.sources.src1.port = 8081 > > a1.sources.src1.contextPath = / > > a1.sources.src1.urlPattern = /t > > a1.sources.src1.handler = ** > > a1.sources.src1.channels = ch1 > > > > #### define hdfs sink > > a1.sinks.sinks1.type = hdfs > > a1.sinks.sinks1.channel = ch1 > > a1.sinks.sinks1.hdfs.path = hdfs://host1:9000/user/hadoop/flume/ds=%y-%m-%d > > a1.sinks.sinks1.hdfs.filePrefix = %{host} > > a1.sinks.sinks1.hdfs.batchSize = 1000 > > a1.sinks.sinks1.hdfs.rollCount = 0 > > a1.sinks.sinks1.hdfs.rollSize = 0 > > a1.sinks.sinks1.hdfs.rollInterval = 300 > > a1.sinks.sinks1.hdfs.idleTimeout = 1800000 > > a1.sinks.sinks1.hdfs.callTimeout = 20000 > > a1.sinks.sinks1.hdfs.threadsPoolSize = 250 > > a1.sinks.sinks1.hdfs.writeFormat = Text > > a1.sinks.sinks1.hdfs.fileType = DataStream > > > > #### define hdfs sink > > a1.sinks.sinks5.type = hdfs > > a1.sinks.sinks5.channel = ch1 > > a1.sinks.sinks5.hdfs.path = hdfs://host2:8020/user/hadoop/flume/ds=%y-%m-%d > > a1.sinks.sinks5.hdfs.filePrefix = %{host} > > a1.sinks.sinks5.hdfs.batchSize = 1000 > > a1.sinks.sinks5.hdfs.rollCount = 0 > > a1.sinks.sinks5.hdfs.rollSize = 0 > > a1.sinks.sinks5.hdfs.rollInterval = 300 > > a1.sinks.sinks5.hdfs.idleTimeout = 1800000 > > a1.sinks.sinks5.hdfs.callTimeout = 20000 > > a1.sinks.sinks5.hdfs.threadsPoolSize = 250 > > a1.sinks.sinks5.hdfs.writeFormat = Text > > a1.sinks.sinks5.hdfs.fileType = DataStream > > > > > > #### define memory channel1 > > a1.channels.ch1.type = memory > > a1.channels.ch1.capacity = 10000 > > a1.channels.ch1.transactionCapacity = 1000 > > > > > > > > > > > > Wayne Wan > > >
