Rob Underwood created KAFKA-3627:
------------------------------------
Summary: New consumer doesn't run delayed tasks while under load
Key: KAFKA-3627
URL: https://issues.apache.org/jira/browse/KAFKA-3627
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.9.0.1
Reporter: Rob Underwood
Assignee: Neha Narkhede
If the new consumer receives a steady flow of fetch responses it will not run
delayed tasks, which means it will not heartbeat or perform automatic offset
commits.
The main cause is the code that attempts to pipeline fetch responses and keep
the consumer fed. Specifically, in KafkaConsumer::pollOnce() there is a check
that skips calling client.poll() if there are fetched records ready (line 903
in the 0.9.0 branch of this writing). Then in KafkaConsumer::poll(), if
records are returned it will initiate another fetch and perform a quick poll,
which will send/receive fetch requests/responses but will not run delayed tasks.
If the timing works out, and the consumer is consistently receiving fetched
records, it won't run delayed tasks until it doesn't receive a fetch response
during its quick poll. That leads to a rebalance since the consumer isn't
heartbeating, and typically means all the consumed records will be re-delivered
since the automatic offset commit wasn't able to run either.
h5. Steps to reproduce
# Start up a cluster with *at least 2 brokers*. This seems to be required to
reproduce the issue, I'm guessing because the fetch responses all arrive
together when using a single broker.
# Create a topic with a good number of partitions
#* bq. bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic
delayed-task-bug --partitions 10 --replication-factor 1
# Generate some test data so the consumer has plenty to consume. In this case
I'm just using uuids
#* bq. for ((i=0;i<100;++i)) do; cat /proc/sys/kernel/random/uuid >>
/tmp/test-messages; done
#* bq. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
delayed-task-bug < /tmp/test-messages
# Start up a consumer with a small max fetch size to ensure it only pulls a few
records at a time. The consumer can simply sleep for a moment when it receives
a record.
#* I'll attach an example in Java
# There's a timing aspect to this issue so it may take a few attempts to
reproduce
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)