Hi,

  I'm a newbie trying out kafka as an alternative to AWS SQS. The motivation 
primarily is to improve performance where kafka would eliminate the constraint 
of pulling 10 messages at a time with a cap of 256kb. Here's a high-level 
scenario of my use case. I've a bunch of crawlers which are sending documents 
for indexing. The size of the payload is around 1 mb on average. The crawlers 
call a SOAP end-point which in turn runs a producer code to submit the messages 
to a kafka queue. The consumer app picks up the messages and processes them. 
For my test box, I've configured the topic with 30 partitions with 2 
replication. The two kafka instances are running with 1 zookeeper instance. The 
kafka version is 0.10.0.

For my testing, I published 7 million messages in the queue. I created a 
consumer group with 30 consumer thread , one per partition. I was initially 
under the impression that this would substantially speed up the processing 
power compared to what I was getting via SQS. Unfortunately, that was not to be 
the case. In my case, the processing of data is complex and takes up 1-2 
minutes on average to complete.That lead to a flurry of partition rebalancing 
as the threads were not able to heartbeat on time. I could see a bunch of 
messages in the log citing "Auto offset commit failed for group full_group: 
Commit cannot be completed since the group has already rebalanced and assigned 
the partitions to another member. This means that the time between subsequent 
calls to poll() was longer than the configured session.timeout.ms, which 
typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in the poll() with 
max.poll.records." This lead to the same message being processed multiple 
times. I tried playing around with session timeout, max.poll.records and poll 
time to avoid this, but that slowed down the overall processing bigtime. Here's 
some of the configuration parameter:

metadata.max.age.ms = 300000 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] 
enable.auto.commit = true 
max.poll.records = 10000 
request.timeout.ms = 310000 
heartbeat.interval.ms = 100000 
auto.commit.interval.ms = 1000 
receive.buffer.bytes = 65536 
fetch.min.bytes = 1 
send.buffer.bytes = 131072 
value.deserializer = class 
com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer 
group.id = full_group 
retry.backoff.ms = 100 
fetch.max.wait.ms = 500 
connections.max.idle.ms = 540000 
session.timeout.ms = 300000 
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer 
metrics.sample.window.ms = 30000 
auto.offset.reset = latest

I reduced the consumer poll time to 100 ms. It reduced the rebalancing issues, 
eliminated duplicate processing but slowed down the overall process 
significantly. It ended up taking 35 hours to complete processing all 6 million 
messages compared to 25 hours using the SQS based solution. Each consumer 
thread on average retrieved 50-60 messages per poll, though some of them polled 
0 records at times. I'm not sure about this behavior when there are a huge 
amount messages available in the partition. The same thread was able to pick up 
messages during the subsequent iteration. Could this be due to rebalancing ? 

Here's my consumer code:

while (true) { 
    try{ 
        ConsumerRecords<String, TextAnalysisRequest> records = 
consumer.poll(100); 
        for (ConsumerRecord<String, TextAnalysisRequest> record : records) { 
            if(record.value()!=null){ 
                TextAnalysisRequest textAnalysisObj = record.value(); 
                if(textAnalysisObj!=null){ 
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
                }
            } 
        } 
    }catch(Exception ex){ 
        LOGGER.error("Error in Full Consumer group worker", ex); 
    }

I understanding that record processing part is one bottleneck in my case. But 
I'm sure a few folks here have a similar use case of dealing with large 
processing time. I thought of doing an async processing by spinning each 
processor in it's dedicated thread or use a thread pool with large capacity, 
but not sure if it would create a big load in the system. At the same time, 
I've seen a couple of instances where people have used pause and resume API to 
perform the processing in order to avoid rebalancing issue.

I'm really looking for some advice / best practice in this circumstance. 
Particularly, the recommended configuration setting around hearbeat, request 
timeout, max poll records, auto commit interval, poll interval, etc. if kafka 
is not the right tool for my use case, please let me know as well.

Any pointers will be appreciated. 

-Thanks,
Shamik

Reply via email to