I was using a custom source. It's a TailSource but you can't configure batchSize on it.
First I have checked SpoolDir and TailDir with 1M messages of 1100 bytes each line. TailDir -- MemoryChannel - Null --> 20K messages/seg SpoolDir -- MemoryChannel - Null --> 20K messages/seg as well. I have done the test in another machine. I have tried the source to SpoolDir-KafkaChannel with batchSize of 100 (10K messages/sec) and 10000 (12K messages/sec). Same test with TailDir-KafkaChannel gives 5K messages/seconds (without batch size). It seems that batchSize in the source was the problem. agent.sources = seqGenSrc agent.channels = kafkaChannel # Describe/configure the source agent.sources.seqGenSrc.type = spooldir agent.sources.seqGenSrc.spoolDir = /var/tmp/ agent.sources.seqGenSrc.batchSize = 10000 # The channel can be defined as follows. agent.sources.seqGenSrc.channels = kafkaChannel agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel agent.channels.kafkaChannel.brokerList=ose10kafkaelk:9092,ose11kafkaelk:9092,ose12kafkaelk:9092 agent.channels.kafkaChannel.topic=kafka-topic4 agent.channels.kafkaChannel.zookeeperConnect=ose10kafkaelk:2181 *** I tried as well to use SpoolDir-MemoryChannel-KafkaSink and I get 13,3K messages/sec when using TailDir-MemoryChannel-KafkaSink I get 20K messages/sec I don't understand why I get this difference.. I did the test twice with the same results. agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = kafkaSink # Describe/configure the source agent.sources.seqGenSrc.type = spooldir // or CustomTailDir agent.sources.seqGenSrc.spoolDir = /var/tmp/ agent.sources.seqGenSrc.batchSize = 10000 # The channel can be defined as follows. agent.sources.seqGenSrc.channels = memoryChannel agent.sinks.kafkaSink.channel = memoryChannel agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.batchSize = 10000 agent.sinks.kafkaSink.brokerList = ose10kafkaelk:9092,ose11kafkaelk:9092,ose12kafkaelk:9092 agent.sinks.kafkaSink.topic = kafka-topic agent.sinks.kafkaSink.requiredAcks = -1 #Specify the channel the sink should use agent.sinks.kafkaSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100000 agent.channels.memoryChannel.transactionCapacity = 10000 2015-11-12 20:23 GMT+01:00 Hari Shreedharan <[email protected]>: > I think your batch sizes are the key. What is your batch size from your > source? > > Thanks, > Hari Shreedharan > > > > > On Nov 12, 2015, at 4:06 AM, Guillermo Ortiz <[email protected]> wrote: > > Yes,, I tried as well changing the capacity of the KafkaChannel because > there is an example in the documentation, although the documentation > doesn't say anything about what it means. > > Anyway, I finally write messages in Kafka from the PoolDir Source or from > a KafkaSink. I take the measure in Kafka. Maybe it's not the same to write > from a sink or directly as a channel. I thought that it should be faster > since there're less pieces though the complete flow. > > Another theory that I have it's that I have taken a look to the code > MemoryChannel and KafkaChannel. It was a quick look, but I saw that in > KafkaChannel it has to serialize the events with Avro and in MemoryChannel > I didn't see that transformation. There is a method doCommit but I'm not > sure when this method is called. > > > 2015-11-12 12:39 GMT+01:00 Gonzalo Herreros <[email protected]>: > >> I think your expectations are not realistic. >> The MemoryChannel adds minimum overhead but is not reliable like the >> KafkaChannel >> In the first case you can lose 10k messages if you are unlucky while with >> the KafkaChannel you won't lose a single one. >> With more reliability normally you have a small performance hit >> >> However, the differences you are seeing are too great so I also believe >> it's related to the batch size. >> While the sink it's using 10k batches, there is nothing configured for >> the KafkaChannel (it could be committing every message or something like >> 100). Not sure what is the default batch size there, >> In the documentation there are no properties for batch or >> transactionCapacity but the example it does set the capacity and >> transactionCapacity. Not sure if they apply to this channel.. >> >> Regards, >> Gonzalo >> >> >> On 12 November 2015 at 11:23, Ahmed Vila <[email protected]> wrote: >> >>> Hi Guillermo, >>> >>> With KafkaSink you're passing 10k events at once to Kafka due to >>> batchSize (transaction size) being that big. >>> >>> So, it's important to know how big batchSize is in your source in order >>> to be able to compare. Set it to 10k and check it's performance again. >>> >>> Please keep in mind that Flume has to keep track of transactions and >>> other housekeeping within any channel, so in my opinion it's supposed to be >>> slower than Sink for the same output (Kafka, file or whatever). >>> >>> >>> >>> On Thu, Nov 12, 2015 at 12:05 PM, Guillermo Ortiz <[email protected]> >>> wrote: >>> >>>> Hello, >>>> >>>> I'm using Flume with Kafka and I don't understand some performance >>>> results that I'm getting. >>>> >>>> I have a topic with 3 nodes, 6 partitions, replication 2. >>>> I'm ingesting messages of 1100bytes each one with a poolDirectory >>>> source. >>>> >>>> I tried with Source-MemoryChannel-KafkaSink and I get about >>>> 50Kmessage/second - 54Mb/s in Kafka. >>>> >>>> If I use Source-KafkaChannel I just got about 1Kmessage/second - >>>> 1.2Mb/s in Kafka >>>> >>>> I thought that I was going to get better performance with the >>>> KafkaChannel and I'm getting 50x times better with KafkaSink. >>>> >>>> The first configuration is >>>> agent.sources = seqGenSrc >>>> agent.channels = memoryChannel >>>> agent.sinks = kafkaSink >>>> >>>> #Source configuration >>>> ... >>>> >>>> agent.sources.seqGenSrc.channels = memoryChannel >>>> agent.sinks.kafkaSink.channel = memoryChannel >>>> agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink >>>> agent.sinks.kafkaSink.batchSize = 10000 >>>> agent.sinks.kafkaSink.brokerList = >>>> ose10kafkaelk:9092,ose11kafkaelk:9092,ose12kafkaelk:9092 >>>> agent.sinks.kafkaSink.topic = kafka-topic >>>> agent.sinks.kafkaSink.requiredAcks = -1 >>>> agent.sinks.kafkaSink.channel = memoryChannel >>>> >>>> agent.channels.memoryChannel.type = memory >>>> agent.channels.memoryChannel.capacity = 100000 >>>> agent.channels.memoryChannel.transactionCapacity = 10000 >>>> >>>> >>>> >>>> The second is: >>>> agent.sources = seqGenSrc >>>> agent.channels = kafkaChannel >>>> >>>> >>>> # Describe/configure the source >>>> ###Configuration spoolDir source... >>>> ... >>>> >>>> # The channel can be defined as follows. >>>> agent.sources.seqGenSrc.channels = kafkaChannel >>>> >>>> agent.channels.kafkaChannel.type = >>>> org.apache.flume.channel.kafka.KafkaChannel >>>> >>>> agent.channels.kafkaChannel.brokerList=ose10kafkaelk:9092,ose11kafkaelk:9092,ose12kafkaelk:9092 >>>> agent.channels.kafkaChannel.topic=kafka-topic3 >>>> agent.channels.kafkaChannel.zookeeperConnect=ose10kafkaelk:2181 >>>> >>>> >>>> >>> >>> >>> -- >>> >>> Best regards, >>> Ahmed Vila | Senior software developer >>> DevLogic | Sarajevo | Bosnia and Herzegovina >>> >>> Office : +387 33 942 123 >>> Mobile: +387 62 139 348 >>> >>> Website: www.devlogic.eu >>> E-mail : [email protected] >>> --------------------------------------------------------------------- >>> This e-mail and any attachment is for authorised use by the intended >>> recipient(s) only. This email contains confidential information. It should >>> not be copied, disclosed to, retained or used by, any party other than the >>> intended recipient. Any unauthorised distribution, dissemination or copying >>> of this E-mail or its attachments, and/or any use of any information >>> contained in them, is strictly prohibited and may be illegal. If you are >>> not an intended recipient then please promptly delete this e-mail and any >>> attachment and all copies and inform the sender directly via email. Any >>> emails that you send to us may be monitored by systems or persons other >>> than the named communicant for the purposes of ascertaining whether the >>> communication complies with the law and company policies. >>> >>> --------------------------------------------------------------------- >>> This e-mail and any attachment is for authorised use by the intended >>> recipient(s) only. This email contains confidential information. It should >>> not be copied, disclosed to, retained or used by, any party other than the >>> intended recipient. Any unauthorised distribution, dissemination or copying >>> of this E-mail or its attachments, and/or any use of any information >>> contained in them, is strictly prohibited and may be illegal. If you are >>> not an intended recipient then please promptly delete this e-mail and any >>> attachment and all copies and inform the sender directly via email. Any >>> emails that you send to us may be monitored by systems or persons other >>> than the named communicant for the purposes of ascertaining whether the >>> communication complies with the law and company policies. >> >> >> > >
