[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2017-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022604#comment-16022604
 ] 

ASF GitHub Bot commented on KAFKA-3159:
---

Github user felixgborrego closed the pull request at:

https://github.com/apache/kafka/pull/3127


> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usa

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2017-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021140#comment-16021140
 ] 

ASF GitHub Bot commented on KAFKA-3159:
---

GitHub user felixgborrego opened a pull request:

https://github.com/apache/kafka/pull/3127

Add sleep between empty polls to avoid burning CPU

Workaround for  https://issues.apache.org/jira/browse/KAFKA-3159

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Nitro/kafka 0.9.0.2-NITRO

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3127


commit 88cfff0660bd726ab5cd11ceee79c5cc35ddce18
Author: Felix 
Date:   2017-05-23T12:35:14Z

Add sleep between empty polls to avoid burning CPU




> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMess

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-10-20 Thread constantine (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592137#comment-15592137
 ] 

constantine commented on KAFKA-3159:


Hi everyone. We use kafka-10.0.0 (in which current bug should be fixed), but 
we've faced with the same problem - high CPU usage (close to 50% for dummy 
consumer without any code in it), tons of EOFException during polling the topic 
without any data in it.
We use AWS m3.medium instance for testing (1 core) .
Our parameters of consumer: 
metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 10485760
bootstrap.servers = [broker-eventbus1.aws:6667, 
broker-eventbus2.aws:6667, broker-eventbus3.aws:6667]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 50
check.crcs = true
request.timeout.ms = 4
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 10485760
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 10485760
send.buffer.bytes = 131072
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = context-facade-consumer
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 1
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
session.timeout.ms = 3
metrics.num.samples = 2
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
auto.offset.reset = latest

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
>

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140054#comment-15140054
 ] 

Ismael Juma commented on KAFKA-3159:


Good news [~ra...@signalfx.com], thanks for reporting back.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace i

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-09 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140049#comment-15140049
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I am running a patched broker with a consumer consuming partitions that have no 
messages and it seems to be working fine. So it looks good so far. I'll run it 
for longer and then finally run it with real messages to make sure there is no 
regression. Thanks every one!

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and abov

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139950#comment-15139950
 ] 

ASF GitHub Bot commented on KAFKA-3159:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/884


> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> br

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-08 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138038#comment-15138038
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I'll try it tomorrow for sure.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shad

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138031#comment-15138031
 ] 

Ismael Juma commented on KAFKA-3159:


[~ra...@signalfx.com], when you get a chance to try this, try the following 
branch: https://github.com/hachikuji/kafka/tree/K3159

It would be great if you could try it soon as this is the only blocker left 
before we can release 0.9.0.1.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15137342#comment-15137342
 ] 

ASF GitHub Bot commented on KAFKA-3159:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/884

KAFKA-3159: stale high watermark segment offset causes early fetch return



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka K3159

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/884.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #884


commit b23447db22aa5eaf6992d37f11ae31627598175b
Author: Jason Gustafson 
Date:   2016-02-06T00:38:00Z

KAFKA-3159: stale high watermark segment offset causes early fetch return




> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-06 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15136071#comment-15136071
 ] 

Jason Gustafson commented on KAFKA-3159:


Either should work, but perhaps it would be most useful at the moment to try 
against 0.9.0.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> b

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135586#comment-15135586
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Thanks Jason. I'll try to apply this patch early next week. Should I build 
trunk + patch or 0.9.0 + patch?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu u

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135356#comment-15135356
 ] 

Jason Gustafson commented on KAFKA-3159:


[~ra...@signalfx.com] We're still discussing the best way to address this 
issue, but for now, would you mind trying this patch: 
https://github.com/hachikuji/kafka/commit/34158c835668f9780f65ab527ade160d9e19c87c?
 As far as what I've been able to reproduce locally, this does fix the problem.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consume

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135007#comment-15135007
 ] 

Ismael Juma commented on KAFKA-3159:


For reference, Jun also added a comment to the original PR:

https://github.com/apache/kafka/pull/688#issuecomment-180551456

It may be worth adding a comment to that PR if we are looking to fix the 
remaining issue as part of this JIRA (to avoid duplicated work).

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after.

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134991#comment-15134991
 ] 

Jason Gustafson commented on KAFKA-3159:


Talked with [~junrao] offline about this. It seems the fix for KAFKA-3003 may 
have been incomplete. It doesn't appear to handle the case where there is only 
1 replica in the ISR set. If the high watermark doesn't get updated when the 
segment is rolled, then the fetch would return immediately.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single cons

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134813#comment-15134813
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Though I should mention that I've seen the same issue in older brokers 0.8.2.x 
etc too if I remember so it doesn't seem exclusive to 0.9.x.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. H

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134809#comment-15134809
 ] 

Jason Gustafson commented on KAFKA-3159:


[~granthenke] Thanks for the suggestion, but appears to be unrelated. I've been 
able to reproduce the problem off of trunk with that patch included.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small message

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134776#comment-15134776
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

It does seem like it is related if not the same problem.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134768#comment-15134768
 ] 

Grant Henke commented on KAFKA-3159:


Could this be related to KAFKA-3003? A fix for KAFKA-3003 just got committed. 

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consum

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134562#comment-15134562
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I think I've found the isse. It turns out that when I don't have any messages 
in the log, the kafka broker sends back a reply with no messages immediately 
instead of respecting the fetch_max_wait_ms or the fetch_min_bytes. The 
EOFExceptions were probably just raised from parsing empty message sets. I can 
reproduce this consistently. Steps:
1. Create a topic with a small retention say 5 minutes or wait for said topic 
to have all its logs cleaned.
2. Start consuming on the topic without any messages being sent to the topic.
3. Observe that Kafka sends back an empty reply to every fetch request almost 
immediately. This can be observed with tcp-dump, or monitoring the 
networking-in/out or ngrep etc. I also verified it by writing my own client and 
observing that my requests get immediate replies when the log is empty.
4. As soon as you start sending messages to the topic, the problem goes away.

We've actually hit this problem in the past (seeing massive number of network 
traffic) when we were subscribed to a single topic that gets no messages. We 
didn't know the underlying issue then but I am pretty sure it is this problem.

This is a problem if any consumer is sending fetch requests to at least one 
broker that is a leader for the partitions being queried but has no messages 
retained in its log. In real life it can also be a problem. Here are a few use 
cases:
i) Metadata like topics that are always consumed but very rarely ever written 
to. We've run into this in the past like I said.
ii) During feature development one can switch on the consumers, and put the 
producers behind a feature flag. This was the problem we ran into. The consumer 
code went ahead before the producer code was integrated/switched on and we had 
to roll back because of the massive regression.

Moreover it goes against all intuition that doing fetch requests against an 
empty topic-partition should not be more expensive than actually getting data.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following s

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-02 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15129287#comment-15129287
 ] 

Jason Gustafson commented on KAFKA-3159:


[~ra...@signalfx.com] I think we'll probably need some more information to 
investigate this further. It appears that the EOFExceptions were only symptoms 
of some other problem which is causing high CPU utilization. It might be 
helpful to see some of the logs so we know what the consumer was doing during 
that time. Can you turn on TRACE level logging and attach a sample to this 
ticket?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handi

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126877#comment-15126877
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

[~hachikuji] I tried your patch. The Exceptions are now gone, but the CPU has 
remained high (25% + from 17% before the new client was added). I have attached 
the CPU breakdown and the allocation break down screen shots and comments.
Some notes:
1. The exceptions seem to be gone completely. The overall CPU has gone down to 
25% odd from the 27% before. So it has gotten a bit better. But the percentage 
of CPU used by the Kafka part of the code has gone up to 40.58% of the total 
used by my process. Most of the CPU is now spent on hash map code. Again I 
don't understand why there is so much CPU being used to get single digit 60 
byte messages per second (64 partitions striped across 3 brokers).

2. The allocations % has believe it or not gone up even more at about 31.26% of 
my entire processes allocation. Again it is baffling that it allocates so much 
to get so few messages. The total sum allocations from the TLAB in the 5 minute 
period has gone up to 14.05 GB from the 6.95 GB done by the client which threw 
a lot of exceptions. Again that seems to be a staggering amount of allocations 
for something that does 1 message odd a second.

My poll timings are done with a 5 second timeout which seems high enough.

Let me know if I can do more profiling or provide other info.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here ti

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122659#comment-15122659
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Thanks Jason. I can try to do that early next week. Have a lot of deadlines 
this week so might not get the chance to get on it.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - ht

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122640#comment-15122640
 ] 

Jason Gustafson commented on KAFKA-3159:


[~ra...@signalfx.com] Here's a patch I've been messing with: 
https://github.com/hachikuji/kafka/commit/69485add2119d523a1b3c93373eb20923a98320e.
 Any chance you could give it a try? You only need to update the client. This 
should address the cause of the EOFs reported above, but it's tough to know for 
sure since I haven't seen anywhere near the number of exceptions reported.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an 

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122614#comment-15122614
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Actually I managed to dig through the logs and find the producer config logs 
from the producer:
2016-01-26T02:53:31.497Z INFO  [PathChildrenCache-0] 
[s.o.a.k.c.producer.ProducerConfig   ] {}: ProducerConfig values: 
compression.type = none
metric.reporters = []
metadata.max.age.ms = 30
metadata.fetch.timeout.ms = 6
acks = 1
batch.size = 10240
reconnect.backoff.ms = 10
bootstrap.servers = [our-kafka-brokers]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 2097152
timeout.ms = 3
key.serializer = class sf.disco.kafka.VoidSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = false
value.serializer = class 
sf.org.apache.kafka.common.serialization.ByteArraySerializer
metrics.sample.window.ms = 3
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 100
client.id = 

I don't explicitly set compression and it appears from the config that no 
compression was set.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122607#comment-15122607
 ] 

Jason Gustafson commented on KAFKA-3159:


You can use the DumpLogSegmentsTool. The output should show you if the messages 
are compressed or not. Sample usage below:

{code}
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 
/tmp/kafka/foo-0/.log
{code}

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu 

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122594#comment-15122594
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I don't enable compression on the topic. However the producer (0.8.2) might 
just decide to compress all the same. How can I tell?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - 

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122507#comment-15122507
 ] 

Jason Gustafson commented on KAFKA-3159:


However, I do see significantly more exceptions when the topic has been 
compressed (I tried snappy locally). Are you sure that the topic is not 
compressed?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122467#comment-15122467
 ] 

Jason Gustafson commented on KAFKA-3159:


[~ra...@signalfx.com] Looks like these EOFExceptions are avoidable by checking 
whether the underlying buffer has data remaining. However, I'm still a bit 
puzzled by the number reported. In the current implementation, I would expect 
to see at most one EOFException for each partition in every fetch response. If 
there are about 64 partitions and "fetch.max.wait.ms" is 1000, then we should 
see about 64 exceptions raised each second (when there is not much data to 
fetch). Perhaps most of the exceptions occurred during a load spike or maybe 
when it was catching up initially?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The c