[ 
https://issues.apache.org/jira/browse/FLUME-2307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14205225#comment-14205225
 ] 

Hari Shreedharan commented on FLUME-2307:
-----------------------------------------

Your checkpoint interval is set to 27+ hours (that value is in seconds). Flume 
deletes files only on every checkpoint. In fact, once the events in a file are 
removed, it will remove the file in the 2nd checkpoint after that one (not the 
one immediately after). So in your case, it needs to wait for 54 hours before 
deleting the file. Even after a replay it waits for another checkpoint to 
delete the file. Did you wait for that long and verify?

> Remove Log writetimeout
> -----------------------
>
>                 Key: FLUME-2307
>                 URL: https://issues.apache.org/jira/browse/FLUME-2307
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.4.0
>            Reporter: Steve Zesch
>            Assignee: Hari Shreedharan
>             Fix For: v1.5.0
>
>         Attachments: FLUME-2307-1.patch, FLUME-2307.patch
>
>
> I've observed Flume failing to clean up old log data in FileChannels. The 
> amount of old log data can range anywhere from tens to hundreds of GB. I was 
> able to confirm that the channels were in fact empty. This behavior always 
> occurs after lock timeouts when attempting to put, take, rollback, or commit 
> to a FileChannel. Once the timeout occurs, Flume stops cleaning up the old 
> files. I was able to confirm that the Log's writeCheckpoint method was still 
> being called and successfully obtaining a lock from tryLockExclusive(), but I 
> was not able to confirm removeOldLogs being called. The application log did 
> not include "Removing old file: log-xyz" for the old files which the Log 
> class would output if they were correctly being removed. I suspect the lock 
> timeouts were due to high I/O load at the time.
> Some stack traces:
> {code}
> org.apache.flume.ChannelException: Failed to obtain lock for writing to the 
> log. Try increasing the log write timeout value. [channel=fileChannel]
>         at 
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:478)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
>         at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
> org.apache.flume.ChannelException: Failed to obtain lock for writing to the 
> log. Try increasing the log write timeout value. [channel=fileChannel]
>         at 
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doCommit(FileChannel.java:594)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at 
> dataxu.flume.plugins.avro.AsyncAvroSink.process(AsyncAvroSink.java:548)
>         at 
> dataxu.flume.plugins.ClassLoaderFlumeSink.process(ClassLoaderFlumeSink.java:33)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:619)
> org.apache.flume.ChannelException: Failed to obtain lock for writing to the 
> log. Try increasing the log write timeout value. [channel=fileChannel]
>         at 
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:621)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
>         at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
>         at 
> dataxu.flume.plugins.avro.AvroSource.appendBatch(AvroSource.java:209)
>         at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at 
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:91)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:151)
>         at 
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at 
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>         at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>         at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
>         at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:220)
>         at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at 
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
>         at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
>         at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
>         at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>         at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:619)
> {code}
> Channel Config:
> {code}
> agent.channels.fileChannel.type = file
> agent.channels.fileChannel.checkpointDir = 
> /var/log/flume-ng/channels/fileChannel/checkpoint
> agent.channels.fileChannel.dataDirs = 
> /var/log/flume-ng/channels/fileChannel/data
> agent.channels.fileChannel.capacity = 100000000
> agent.channels.fileChannel.transactionCapacity = 100000000
> agent.channels.fileChannel.maxFileSize = 104857600 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to