[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-3159:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 884
[https://github.com/apache/kafka/pull/884]

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-08 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3159:
---
Reviewer: Jun Rao
  Status: Patch Available  (was: Open)

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Attachment: Screen Shot 2016-02-01 at 11.09.32 AM.png

CPU break down of the patched client. Some notes:
1. 40.58% of the process' CPU profile is on these poll calls which are done 
with a timeout of 5 seconds.
2. A lot of cpu is spent on hash map operations.
3. The rest of the cpu seems to be spent mostly in NetworkClient.poll().

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Screen Shot 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Attachment: Memory-profile-patched-client.png

Memory profile of the patched client. Notes:

1.A lot of it is in clients.consumer.internals.Fetcher.createFetchRequests(). 
Again quite a bit of hash map allocations.
2. The majority of the rest of allocations seems to be in NetworkClient.poll().

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3159:
---
Fix Version/s: 0.9.0.1

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
> process CPU is used on polling these 64 partitions 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a