I have investigated the HDFSEventSink source code, found if the exception was
IOException , the exception would not throw to the upper layer,
So FailOverSinkProcessor would not mark this sink as dead.
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
}
发件人: Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:[email protected]]
发送时间: 2014年11月27日 16:02
收件人: [email protected]
主题: re: Why failover sink processor does not work
By the way, I use flume-1.4.0
Wayne Wan
发件人: Wan Yi(武汉_技术部_搜索与精准化_万毅) [mailto:[email protected]]
发送时间: 2014年11月27日 15:57
收件人: [email protected]<mailto:[email protected]>
主题: Why failover sink processor does not work
I use hdfs to store our logs, but the failover processor seems does not work
when I killed the hdfs cluster that used by the high priority sink(sinks1).
Below is my config
#### define agent
a1.sources = src1
a1.sinks = sinks1 sinks5
a1.channels = ch1
a1.sinkgroups = g1
#### defined the sink group
a1.sinkgroups.g1.sinks = sinks1 sinks5
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.sinks1 = 5
a1.sinkgroups.g1.processor.priority.sinks5 = 1
a1.sinkgroups.g1.processor.maxpenalty = 1000
#### define http source
a1.sources.src1.type = **
a1.sources.src1.port = 8081
a1.sources.src1.contextPath = /
a1.sources.src1.urlPattern = /t
a1.sources.src1.handler = **
a1.sources.src1.channels = ch1
#### define hdfs sink
a1.sinks.sinks1.type = hdfs
a1.sinks.sinks1.channel = ch1
a1.sinks.sinks1.hdfs.path = hdfs://host1:9000/user/hadoop/flume/ds=%y-%m-%d
a1.sinks.sinks1.hdfs.filePrefix = %{host}
a1.sinks.sinks1.hdfs.batchSize = 1000
a1.sinks.sinks1.hdfs.rollCount = 0
a1.sinks.sinks1.hdfs.rollSize = 0
a1.sinks.sinks1.hdfs.rollInterval = 300
a1.sinks.sinks1.hdfs.idleTimeout = 1800000
a1.sinks.sinks1.hdfs.callTimeout = 20000
a1.sinks.sinks1.hdfs.threadsPoolSize = 250
a1.sinks.sinks1.hdfs.writeFormat = Text
a1.sinks.sinks1.hdfs.fileType = DataStream
#### define hdfs sink
a1.sinks.sinks5.type = hdfs
a1.sinks.sinks5.channel = ch1
a1.sinks.sinks5.hdfs.path = hdfs://host2:8020/user/hadoop/flume/ds=%y-%m-%d
a1.sinks.sinks5.hdfs.filePrefix = %{host}
a1.sinks.sinks5.hdfs.batchSize = 1000
a1.sinks.sinks5.hdfs.rollCount = 0
a1.sinks.sinks5.hdfs.rollSize = 0
a1.sinks.sinks5.hdfs.rollInterval = 300
a1.sinks.sinks5.hdfs.idleTimeout = 1800000
a1.sinks.sinks5.hdfs.callTimeout = 20000
a1.sinks.sinks5.hdfs.threadsPoolSize = 250
a1.sinks.sinks5.hdfs.writeFormat = Text
a1.sinks.sinks5.hdfs.fileType = DataStream
#### define memory channel1
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 10000
a1.channels.ch1.transactionCapacity = 1000
Wayne Wan