Thanks again for the help. As a final follow-up we added a  fetch.wait.max.ms 
of 10ms and that got us to a throughput of about 800kb/sec reading from the 20 
topics, which is still a bit lower than I'd originally hoped, but should be 
sufficient for our use case in the near term. Thank you for all of your input, 
it's been very helpful for us.

- CJ

> On Feb 10, 2015, at 12:10 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> 
> CJ,
> 
> On the consumer side there is another two configs named "fetch.min.bytes"
> and "fetch.wait.max.ms":
> 
> http://kafka.apache.org/documentation.html#consumerconfigs
> 
> They controls how long the fetch request will wait on data.
> 
> What are the values on your consumers? You may want to try tuning these two
> configs as well.
> 
> Guozhang
> 
>> On Mon, Feb 9, 2015 at 9:48 AM, CJ Woolard <cwool...@channeliq.com> wrote:
>> 
>> Thank you for your help, and I apologize for not adding sufficient detail
>> in my original question. To elaborate on our use case we are trying to
>> create a system tracing/monitoring app (which is of high importance to our
>> business) where we are trying to read messages from all of our Kafka
>> topics. We've been profiling a handful of runs with varying permutations of
>> Kafka config settings, and are struggling to find a combination that gives
>> us decent throughput. We're specifically trying to have one application
>> consume from ~20 topics (which may increase over time), some of the topics
>> have several million messages while some are currently empty. The behavior
>> we're seeing (regardless of the various config settings we've tried) is
>> that the consumer will take about 5 to 10 minutes creating it's streams,
>> after which point it appears to pull around ~20,000 messages at a decent
>> rate, and then it starts to throw consumer timeout exceptions for several
>> minutes. (Depending on the settings we choose it then may repeat that loop
>> where it again will pull a batch of messages at a decent rate and then
>> stall again on more timeouts). Our max message size is 20MB, so we set our
>> "fetch.message.max.bytes" to 20MB. We then set our "consumer.timeout.ms"
>> to "60000", so that reading the 20MB doesn't timeout, however that appears
>> to block threads for those topics which are empty (If we set it too high it
>> appears to block threads on the empty topics for a long time, if we set it
>> too low it times out reading large messages). We've tried increasing the
>> number of streams (to 20 for example), tried increasing the number of
>> thread pool consumers (to 20 for example) and tried increasing the number
>> of consumer fetchers (to 4 for example, although in profiling we appear to
>> get x2 the number of threads that we specify in config for what it's
>> worth), but have yet to find a combination of settings that works for us.
>> Again any direction here would be greatly appreciated.
>> 
>> Here is an example of a consumer config we've tried (again we've tried
>> several combinations in testing):
>> 
>> "group.id" -> Settings.Kafka.ConsumerGroupId,
>> "zookeeper.connect" -> Settings.Kafka.ZkConnectionString,
>> "num.consumer.fetchers" -> "4",
>> "consumer.timeout.ms" -> "60000",
>> "auto.offset.reset" -> "smallest",
>> "fetch.message.max.bytes" -> "20000000"
>> 
>> In terms of our code we've tried both the whitelist overload:
>> 
>> val numberOfStreams = 4    // (we've varied this number)
>> val filter = topics.mkString("|")
>> val topicFilter = new Whitelist(filter)
>> connector.createMessageStreamsByFilter(topicFilter, numberOfStreams,
>> decoder, decoder)
>> 
>> And the topic map overload:
>> 
>> private def createMapOfStreams(topics:Seq[String], numberOfPartitions:Int)
>> = {
>>    val connector = createKafkaConnector()
>>    val decoder = new StringDecoder()
>>    val topicsMap = topics.map(topic=>topic->numberOfPartitions).toMap
>>    connector.createMessageStreams(topicsMap, decoder, decoder)
>>  }
>> 
>> And our broker settings:
>> 
>> ############################# Socket Server Settings
>> #############################
>> 
>> # The port the socket server listens on
>> port=9092
>> 
>> # The number of threads handling network requests
>> num.network.threads=2
>> 
>> # The number of threads doing disk I/O
>> num.io.threads=8
>> 
>> # The send buffer (SO_SNDBUF) used by the socket server
>> socket.send.buffer.bytes=1048576
>> 
>> # The receive buffer (SO_RCVBUF) used by the socket server
>> socket.receive.buffer.bytes=1048576
>> 
>> # The maximum size of a request that the socket server will accept
>> (protection against OOM)
>> socket.request.max.bytes=104857600
>> 
>> 
>> # Hostname the broker will bind to. If not set, the server will bind to
>> all interfaces
>> #host.name=kafka-1
>> 
>> # Hostname the broker will advertise to producers and consumers. If not
>> set, it uses the
>> # value for "host.name" if configured.  Otherwise, it will use the value
>> returned from
>> # java.net.InetAddress.getCanonicalHostName().
>> advertised.host.name=kafka-1.qa.ciq-internal.net
>> 
>> # The port to publish to ZooKeeper for clients to use. If this is not set,
>> # it will publish the same port that the broker binds to.
>> advertised.port=9092
>> 
>> 
>> Thank you again for your help.
>> CJ
>> 
>> 
>> 
>> 
>> ____________________________________________________________________________________________________________________________________________________________________
>> 
>> 
>> ________________________________________
>> From: Guozhang Wang <wangg...@gmail.com>
>> Sent: Monday, February 9, 2015 10:38 AM
>> To: users@kafka.apache.org
>> Subject: Re: Poor performance consuming multiple topics
>> 
>> Hello CJ,
>> 
>> You have to set the fetch size to be >=  the maximum message size possible,
>> otherwise the consumption will block upon encountering these large
>> messages.
>> 
>> I am wondering by saying "poor performance" what do you mean exactly? Are
>> you seeing low throughput, and can you share your consumer config values?
>> 
>> Guozhang
>> 
>> 
>>> On Sun, Feb 8, 2015 at 7:39 AM, Cj <cjwool...@gmail.com> wrote:
>>> 
>>> 
>>> 
>>> Hi Kafka team,
>>> 
>>> We have a use case where we need to consume from ~20 topics (each with 24
>>> partitions), we have a potential max message size of 20MB so we've set
>> our
>>> consumer fetch.size to 20MB but that's causing very poor performance on
>> our
>>> consumer (most of our messages are in the 10-100k range). Is it possible
>> to
>>> set the fetch size to a lower number than the max message size and
>>> gracefully handle larger messages (as a trapped exception for example) in
>>> order to improve our throughput?
>>> 
>>> Thank you in advance for your help
>>> CJ Woolard
>> 
>> 
>> 
>> 
>> --
>> -- Guozhang
> 
> 
> 
> -- 
> -- Guozhang

Reply via email to