[ https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yongxi Zhang updated FLUME-3106: -------------------------------- Description: Flume can produce endless data when use this following config: {code:xml} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {code} And there are Exceptions like this: {code:xml} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {code} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. was: Flume can produce endless data when use this following config: {code:xml} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {code} And there are Exceptions like this: {panel} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {panel} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. > When batchSize of sink greater than transactionCapacity of Memory Channel, > Flume can produce endless data > --------------------------------------------------------------------------------------------------------- > > Key: FLUME-3106 > URL: https://issues.apache.org/jira/browse/FLUME-3106 > Project: Flume > Issue Type: Bug > Components: Channel > Affects Versions: 1.7.0 > Reporter: Yongxi Zhang > Fix For: 1.8.0 > > Attachments: FLUME-3106-0.patch > > > Flume can produce endless data when use this following config: > {code:xml} > agent.sources = src1 > agent.sinks = sink1 > agent.channels = ch2 > agent.sources.src1.type = spooldir > agent.sources.src1.channels = ch2 > agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir > agent.sources.src1.fileHeader = false > agent.sources.src1.batchSize = 5 > agent.channels.ch2.type=memory > agent.channels.ch2.capacity=100 > agent.channels.ch2.transactionCapacity=5 > agent.sinks.sink1.type = hdfs > agent.sinks.sink1.channel = ch2 > agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ > agent.sinks.sink1.hdfs.rollInterval=1 > agent.sinks.sink1.hdfs.fileType = DataStream > agent.sinks.sink1.hdfs.writeFormat = Text > agent.sinks.sink1.hdfs.batchSize = 10 > {code} > And there are Exceptions like this: > {code:xml} > org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity > 5 full, consider committing more frequently, increasing capaci > ty, or increasing thread count > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) > at > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > at > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception > follows. > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: > Take list for MemoryTransaction, capacity 5 full, consider comm > itting more frequently, increasing capacity, or increasing thread count > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > {code} > When takeList of Memory Channel is full,there is a ChannelException will be > throwed,The event of takeList has been writed by the sink and roll back to > the queue of memoryChannel at the same time,it is not reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)