We are using the new kafka consumer with the following config (as logged by
kafka)
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = myGroup.id
partition.assignment.strategy = [org.apache.kafka.clients.consumer
.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 2097152
bootstrap.servers = [myBrokerList]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 1000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class sf.kafka.VoidDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 512
send.buffer.bytes = 131072
auto.offset.reset = earliest
We use the consumer.assign() feature to assign a list of partitions and
call poll in a loop. We have the following setup:
1. The messages have no key and we use the byte array deserializer to get
byte arrays from the config.
2. The messages themselves are on an average about 75 bytes. We get this
number by diving the Kafka broker bytes-in metric by the messages-in metric.
3. Each consumer is assigned about 64 partitions of the same topic spread
across three brokers.
4. We get very few messages per second maybe around 1-2 messages across all
partitions on a client right now.
5. We have no compression on the topic.
Our run loop looks something like this
while (isRunning()) {
ConsumerRecords<Void, byte[]> records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
logger.error("Exception polling Kafka ", e);
records = null;
}
if (records != null) {
for (ConsumerRecord<Void, byte[]> record : records) {
// The handler puts the byte array on a very fast ring
buffer so it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}
With this setup our performance has taken a horrendous hit as soon as we
started this one thread that just polls kafka in a loop.
I profiled the application using Java Mission Control and have a few
insights.
1. There doesn't seem to be a single hotspot. The consumer just ends up
using a lot of CPU for handing such a low number of messages. Our process
was using 16% CPU before we added a single consumer and it went to 25% and
above after. That's an increase of over 50% from a single consumer getting
a single digit number of small messages per second. Here is an attachment
of the cpu usage breakdown in the consumer (the namespace is different
because we shade the kafka jar before using it) - http://imgur.com/tHjdVnM
We've used bigger timeouts (100 seconds odd) and that doesn't seem to make
much of a difference either.
2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure
whether this is expected but this seems like it would completely kill
performance. Here is the exception tab of Java mission control.
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
minutes which is about 10 thousand exceptions per second! The exception
stack trace shows that it originates from the poll call. I don't understand
how it can throw so many exceptions given I call poll it with a timeout of
10 seconds and get messages at about 1 per second.
3. The single thread seems to allocate a lot too. The single thread is
responsible for 17.87% of our entire JVM allocation rate. Most of what it
allocates seems to be those same EOFExceptions. Here is a chart showing the
single thread's allocation proportion: http://imgur.com/GNUJQsz Here is a
chart that shows a breakdown of the allocations: http://imgur.com/YjCXljE
About 20% of the allocations are for the EOFExceptions. This seems kind of
crazy especially given that this happens about 10 thousand times a second.
The rest of the allocations seem to be spread all over but again seem
excessive given how we are getting very few messages.
As a comparison, we also run a wrapper over the old SimpleConsumer that
gets a lot more data (10 -15 thousand 70 byte messages/sec on a different
topic) and it is able to handle that load without much trouble. At this
moment we are completely puzzled by this performance. Most of it does seem
to be due to the crazy volumes of exceptions. Note: Our messages seem to
all be making through. The exceptions are caught by Kafka's stack and never
bubble though to us.
Are we doing anything wrong with how we are using the new consumer (longer
timeouts of a 100 second odd don't seem to help)?
Thanks in advance,
Rajiv