Hi, I'm a newbie trying out kafka as an alternative to AWS SQS. The motivation primarily is to improve performance where kafka would eliminate the constraint of pulling 10 messages at a time with a cap of 256kb. Here's a high-level scenario of my use case. I've a bunch of crawlers which are sending documents for indexing. The size of the payload is around 1 mb on average. The crawlers call a SOAP end-point which in turn runs a producer code to submit the messages to a kafka queue. The consumer app picks up the messages and processes them. For my test box, I've configured the topic with 30 partitions with 2 replication. The two kafka instances are running with 1 zookeeper instance. The kafka version is 0.10.0.
For my testing, I published 7 million messages in the queue. I created a consumer group with 30 consumer thread , one per partition. I was initially under the impression that this would substantially speed up the processing power compared to what I was getting via SQS. Unfortunately, that was not to be the case. In my case, the processing of data is complex and takes up 1-2 minutes on average to complete.That lead to a flurry of partition rebalancing as the threads were not able to heartbeat on time. I could see a bunch of messages in the log citing "Auto offset commit failed for group full_group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in the poll() with max.poll.records." This lead to the same message being processed multiple times. I tried playing around with session timeout, max.poll.records and poll time to avoid this, but that slowed down the overall processing bigtime. Here's some of the configuration parameter: metadata.max.age.ms = 300000 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] enable.auto.commit = true max.poll.records = 10000 request.timeout.ms = 310000 heartbeat.interval.ms = 100000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 fetch.max.wait.ms = 500 connections.max.idle.ms = 540000 session.timeout.ms = 300000 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer metrics.sample.window.ms = 30000 auto.offset.reset = latest I reduced the consumer poll time to 100 ms. It reduced the rebalancing issues, eliminated duplicate processing but slowed down the overall process significantly. It ended up taking 35 hours to complete processing all 6 million messages compared to 25 hours using the SQS based solution. Each consumer thread on average retrieved 50-60 messages per poll, though some of them polled 0 records at times. I'm not sure about this behavior when there are a huge amount messages available in the partition. The same thread was able to pick up messages during the subsequent iteration. Could this be due to rebalancing ? Here's my consumer code: while (true) { try{ ConsumerRecords<String, TextAnalysisRequest> records = consumer.poll(100); for (ConsumerRecord<String, TextAnalysisRequest> record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ // Process record PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); } } } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex); } I understanding that record processing part is one bottleneck in my case. But I'm sure a few folks here have a similar use case of dealing with large processing time. I thought of doing an async processing by spinning each processor in it's dedicated thread or use a thread pool with large capacity, but not sure if it would create a big load in the system. At the same time, I've seen a couple of instances where people have used pause and resume API to perform the processing in order to avoid rebalancing issue. I'm really looking for some advice / best practice in this circumstance. Particularly, the recommended configuration setting around hearbeat, request timeout, max poll records, auto commit interval, poll interval, etc. if kafka is not the right tool for my use case, please let me know as well. Any pointers will be appreciated. -Thanks, Shamik