Hi,
I'm creating a proof-of-concept of a Flume agent that'll buffer events and
stops consuming events from the source when the sink is unavailable. Only when
the sink is available again, the buffered events should be processed and then
the source restarts consumption.
For this I've created a simple agent, which reads from a SpoolDir and writes to
a file. To simulate that the sink service is down, I change file permissions so
Flume can't write to it. Then I start Flume some events are buffered in the
memory channel and it stops consuming events when the channel capacity is full,
as expected. As soon as the file becomes writeable, the sink is able to process
the events and Flume recovers. However, that only works when the transaction
capacity is not exceeded. As soon as the transaction capacity is exceeded,
Flume never recovers and keeps writing the following error:
2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
to
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
at
org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for
MemoryTransaction,
capacity 4 full, consider committing more frequently, increasing capacity,
or
increasing thread count
at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at
org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
... 3 more
As soon as the number of events buffered in memory exceed the transaction
capacity (4) this error occurs. I don't understand why, because the batchSize
of the fileout is 1, so it should take out the events one by one.
This is the config I'm using:
agent.sources = spool-src
agent.channels = mem-channel
agent.sinks = fileout
agent.sources.spool-src.channels = mem-channel
agent.sources.spool-src.type = spooldir
agent.sources.spool-src.spoolDir = /tmp/flume-spool
agent.sources.spool-src.batchSize = 1
agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 10
agent.channels.mem-channel.transactionCapacity = 4
agent.sinks.fileout.channel = mem-channel
agent.sinks.fileout.type = file_roll
agent.sinks.fileout.sink.directory = /tmp/flume-output
agent.sinks.fileout.sink.rollInterval = 0
agent.sinks.fileout.batchSize = 1
I've tested this config with different values for the channel capacity &
transaction capacity (e.g., 3 and 3), but haven't found a config where Flume is
able to recover after the channel capacity is full. Any ideas on how to achieve
this?
--
Kind regards,
Balthasar Schopman
LeaseWeb CDN Innovation Engineer
Kind regards,
Balthasar Schopman
Software Developer
LeaseWeb Technologies B.V.
T: +31 20 316 0232
M:
E: [email protected]
W: http://www.leaseweb.com
Luttenbergweg 8, 1101 EC Amsterdam, Netherlands