[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-3159: --- Resolution: Fixed Status: Resolved (was: Patch Available) Issue resolved by pull request 884 [https://github.com/apache/kafka/pull/884] > Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain > conditions > -- > > Key: KAFKA-3159 > URL: https://issues.apache.org/jira/browse/KAFKA-3159 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 > Environment: Linux, Oracle JVM 8. >Reporter: Rajiv Kurian >Assignee: Jason Gustafson > Fix For: 0.9.0.1 > > Attachments: Memory-profile-patched-client.png, Screen Shot > 2016-02-01 at 11.09.32 AM.png > > > We are using the new kafka consumer with the following config (as logged by > kafka) > metric.reporters = [] > metadata.max.age.ms = 30 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = myGroup.id > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 2097152 > bootstrap.servers = [myBrokerList] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 1000 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > ssl.truststore.password = null > session.timeout.ms = 3 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class sf.kafka.VoidDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 4 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > fetch.min.bytes = 512 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > We use the consumer.assign() feature to assign a list of partitions and call > poll in a loop. We have the following setup: > 1. The messages have no key and we use the byte array deserializer to get > byte arrays from the config. > 2. The messages themselves are on an average about 75 bytes. We get this > number by dividing the Kafka broker bytes-in metric by the messages-in metric. > 3. Each consumer is assigned about 64 partitions of the same topic spread > across three brokers. > 4. We get very few messages per second maybe around 1-2 messages across all > partitions on a client right now. > 5. We have no compression on the topic. > Our run loop looks something like this > while (isRunning()) { > ConsumerRecordsrecords = null; > try { > // Here timeout is about 10 seconds, so it is pretty big. > records = consumer.poll(timeout); > } catch (Exception e) { >// This never hits for us > logger.error("Exception polling Kafka ", e); > records = null; > } > if (records != null) { > for (ConsumerRecord record : records) { >// The handler puts the byte array on a very fast ring buffer > so it barely takes any time. > handler.handleMessage(ByteBuffer.wrap(record.value())); > } > } > } > With this setup our performance has taken a horrendous hit as soon as we > started this one thread that just polls Kafka in a loop. > I profiled the application using Java Mission Control and have a few insights. > 1. There doesn't seem to be a single hotspot. The consumer just ends up using > a lot of CPU for handing such a low number of messages. Our process was using > 16% CPU before we added a single consumer and it went to 25% and above after. > That's an increase of over 50% from a single consumer getting a single digit > number of small messages per second. Here is an attachment of the
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-3159: --- Reviewer: Jun Rao Status: Patch Available (was: Open) > Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain > conditions > -- > > Key: KAFKA-3159 > URL: https://issues.apache.org/jira/browse/KAFKA-3159 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 > Environment: Linux, Oracle JVM 8. >Reporter: Rajiv Kurian >Assignee: Jason Gustafson > Fix For: 0.9.0.1 > > Attachments: Memory-profile-patched-client.png, Screen Shot > 2016-02-01 at 11.09.32 AM.png > > > We are using the new kafka consumer with the following config (as logged by > kafka) > metric.reporters = [] > metadata.max.age.ms = 30 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = myGroup.id > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 2097152 > bootstrap.servers = [myBrokerList] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 1000 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > ssl.truststore.password = null > session.timeout.ms = 3 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class sf.kafka.VoidDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 4 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > fetch.min.bytes = 512 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > We use the consumer.assign() feature to assign a list of partitions and call > poll in a loop. We have the following setup: > 1. The messages have no key and we use the byte array deserializer to get > byte arrays from the config. > 2. The messages themselves are on an average about 75 bytes. We get this > number by dividing the Kafka broker bytes-in metric by the messages-in metric. > 3. Each consumer is assigned about 64 partitions of the same topic spread > across three brokers. > 4. We get very few messages per second maybe around 1-2 messages across all > partitions on a client right now. > 5. We have no compression on the topic. > Our run loop looks something like this > while (isRunning()) { > ConsumerRecordsrecords = null; > try { > // Here timeout is about 10 seconds, so it is pretty big. > records = consumer.poll(timeout); > } catch (Exception e) { >// This never hits for us > logger.error("Exception polling Kafka ", e); > records = null; > } > if (records != null) { > for (ConsumerRecord record : records) { >// The handler puts the byte array on a very fast ring buffer > so it barely takes any time. > handler.handleMessage(ByteBuffer.wrap(record.value())); > } > } > } > With this setup our performance has taken a horrendous hit as soon as we > started this one thread that just polls Kafka in a loop. > I profiled the application using Java Mission Control and have a few insights. > 1. There doesn't seem to be a single hotspot. The consumer just ends up using > a lot of CPU for handing such a low number of messages. Our process was using > 16% CPU before we added a single consumer and it went to 25% and above after. > That's an increase of over 50% from a single consumer getting a single digit > number of small messages per second. Here is an attachment of the cpu usage > breakdown in the consumer (the namespace is different because we
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajiv Kurian updated KAFKA-3159: Attachment: Screen Shot 2016-02-01 at 11.09.32 AM.png CPU break down of the patched client. Some notes: 1. 40.58% of the process' CPU profile is on these poll calls which are done with a timeout of 5 seconds. 2. A lot of cpu is spent on hash map operations. 3. The rest of the cpu seems to be spent mostly in NetworkClient.poll(). > Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain > conditions > -- > > Key: KAFKA-3159 > URL: https://issues.apache.org/jira/browse/KAFKA-3159 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 > Environment: Linux, Oracle JVM 8. >Reporter: Rajiv Kurian >Assignee: Jason Gustafson > Fix For: 0.9.0.1 > > Attachments: Screen Shot 2016-02-01 at 11.09.32 AM.png > > > We are using the new kafka consumer with the following config (as logged by > kafka) > metric.reporters = [] > metadata.max.age.ms = 30 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = myGroup.id > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 2097152 > bootstrap.servers = [myBrokerList] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 1000 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > ssl.truststore.password = null > session.timeout.ms = 3 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class sf.kafka.VoidDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 4 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > fetch.min.bytes = 512 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > We use the consumer.assign() feature to assign a list of partitions and call > poll in a loop. We have the following setup: > 1. The messages have no key and we use the byte array deserializer to get > byte arrays from the config. > 2. The messages themselves are on an average about 75 bytes. We get this > number by dividing the Kafka broker bytes-in metric by the messages-in metric. > 3. Each consumer is assigned about 64 partitions of the same topic spread > across three brokers. > 4. We get very few messages per second maybe around 1-2 messages across all > partitions on a client right now. > 5. We have no compression on the topic. > Our run loop looks something like this > while (isRunning()) { > ConsumerRecordsrecords = null; > try { > // Here timeout is about 10 seconds, so it is pretty big. > records = consumer.poll(timeout); > } catch (Exception e) { >// This never hits for us > logger.error("Exception polling Kafka ", e); > records = null; > } > if (records != null) { > for (ConsumerRecord record : records) { >// The handler puts the byte array on a very fast ring buffer > so it barely takes any time. > handler.handleMessage(ByteBuffer.wrap(record.value())); > } > } > } > With this setup our performance has taken a horrendous hit as soon as we > started this one thread that just polls Kafka in a loop. > I profiled the application using Java Mission Control and have a few insights. > 1. There doesn't seem to be a single hotspot. The consumer just ends up using > a lot of CPU for handing such a low number of messages. Our process was using > 16% CPU before we added a single consumer and it went to 25% and above
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajiv Kurian updated KAFKA-3159: Attachment: Memory-profile-patched-client.png Memory profile of the patched client. Notes: 1.A lot of it is in clients.consumer.internals.Fetcher.createFetchRequests(). Again quite a bit of hash map allocations. 2. The majority of the rest of allocations seems to be in NetworkClient.poll(). > Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain > conditions > -- > > Key: KAFKA-3159 > URL: https://issues.apache.org/jira/browse/KAFKA-3159 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 > Environment: Linux, Oracle JVM 8. >Reporter: Rajiv Kurian >Assignee: Jason Gustafson > Fix For: 0.9.0.1 > > Attachments: Memory-profile-patched-client.png, Screen Shot > 2016-02-01 at 11.09.32 AM.png > > > We are using the new kafka consumer with the following config (as logged by > kafka) > metric.reporters = [] > metadata.max.age.ms = 30 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = myGroup.id > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 2097152 > bootstrap.servers = [myBrokerList] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 1000 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > ssl.truststore.password = null > session.timeout.ms = 3 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class sf.kafka.VoidDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 4 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > fetch.min.bytes = 512 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > We use the consumer.assign() feature to assign a list of partitions and call > poll in a loop. We have the following setup: > 1. The messages have no key and we use the byte array deserializer to get > byte arrays from the config. > 2. The messages themselves are on an average about 75 bytes. We get this > number by dividing the Kafka broker bytes-in metric by the messages-in metric. > 3. Each consumer is assigned about 64 partitions of the same topic spread > across three brokers. > 4. We get very few messages per second maybe around 1-2 messages across all > partitions on a client right now. > 5. We have no compression on the topic. > Our run loop looks something like this > while (isRunning()) { > ConsumerRecordsrecords = null; > try { > // Here timeout is about 10 seconds, so it is pretty big. > records = consumer.poll(timeout); > } catch (Exception e) { >// This never hits for us > logger.error("Exception polling Kafka ", e); > records = null; > } > if (records != null) { > for (ConsumerRecord record : records) { >// The handler puts the byte array on a very fast ring buffer > so it barely takes any time. > handler.handleMessage(ByteBuffer.wrap(record.value())); > } > } > } > With this setup our performance has taken a horrendous hit as soon as we > started this one thread that just polls Kafka in a loop. > I profiled the application using Java Mission Control and have a few insights. > 1. There doesn't seem to be a single hotspot. The consumer just ends up using > a lot of CPU for handing such a low number of messages. Our process was using > 16% CPU before we added a single consumer and it went to 25% and above
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-3159: --- Fix Version/s: 0.9.0.1 > Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain > conditions > -- > > Key: KAFKA-3159 > URL: https://issues.apache.org/jira/browse/KAFKA-3159 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 > Environment: Linux, Oracle JVM 8. >Reporter: Rajiv Kurian >Assignee: Jason Gustafson > Fix For: 0.9.0.1 > > > We are using the new kafka consumer with the following config (as logged by > kafka) > metric.reporters = [] > metadata.max.age.ms = 30 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = myGroup.id > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 2097152 > bootstrap.servers = [myBrokerList] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > enable.auto.commit = false > ssl.key.password = null > fetch.max.wait.ms = 1000 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > ssl.truststore.password = null > session.timeout.ms = 3 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > key.deserializer = class sf.kafka.VoidDeserializer > ssl.protocol = TLS > check.crcs = true > request.timeout.ms = 4 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > fetch.min.bytes = 512 > send.buffer.bytes = 131072 > auto.offset.reset = earliest > We use the consumer.assign() feature to assign a list of partitions and call > poll in a loop. We have the following setup: > 1. The messages have no key and we use the byte array deserializer to get > byte arrays from the config. > 2. The messages themselves are on an average about 75 bytes. We get this > number by dividing the Kafka broker bytes-in metric by the messages-in metric. > 3. Each consumer is assigned about 64 partitions of the same topic spread > across three brokers. > 4. We get very few messages per second maybe around 1-2 messages across all > partitions on a client right now. > 5. We have no compression on the topic. > Our run loop looks something like this > while (isRunning()) { > ConsumerRecordsrecords = null; > try { > // Here timeout is about 10 seconds, so it is pretty big. > records = consumer.poll(timeout); > } catch (Exception e) { >// This never hits for us > logger.error("Exception polling Kafka ", e); > records = null; > } > if (records != null) { > for (ConsumerRecord record : records) { >// The handler puts the byte array on a very fast ring buffer > so it barely takes any time. > handler.handleMessage(ByteBuffer.wrap(record.value())); > } > } > } > With this setup our performance has taken a horrendous hit as soon as we > started this one thread that just polls Kafka in a loop. > I profiled the application using Java Mission Control and have a few insights. > 1. There doesn't seem to be a single hotspot. The consumer just ends up using > a lot of CPU for handing such a low number of messages. Our process was using > 16% CPU before we added a single consumer and it went to 25% and above after. > That's an increase of over 50% from a single consumer getting a single digit > number of small messages per second. Here is an attachment of the cpu usage > breakdown in the consumer (the namespace is different because we shade the > kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire > process CPU is used on polling these 64 partitions
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajiv Kurian updated KAFKA-3159: Description: We are using the new kafka consumer with the following config (as logged by kafka) metric.reporters = [] metadata.max.age.ms = 30 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = myGroup.id partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 2097152 bootstrap.servers = [myBrokerList] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 1000 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 ssl.truststore.password = null session.timeout.ms = 3 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class sf.kafka.VoidDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 4 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 fetch.min.bytes = 512 send.buffer.bytes = 131072 auto.offset.reset = earliest We use the consumer.assign() feature to assign a list of partitions and call poll in a loop. We have the following setup: 1. The messages have no key and we use the byte array deserializer to get byte arrays from the config. 2. The messages themselves are on an average about 75 bytes. We get this number by dividing the Kafka broker bytes-in metric by the messages-in metric. 3. Each consumer is assigned about 64 partitions of the same topic spread across three brokers. 4. We get very few messages per second maybe around 1-2 messages across all partitions on a client right now. 5. We have no compression on the topic. Our run loop looks something like this while (isRunning()) { ConsumerRecordsrecords = null; try { // Here timeout is about 10 seconds, so it is pretty big. records = consumer.poll(timeout); } catch (Exception e) { // This never hits for us logger.error("Exception polling Kafka ", e); records = null; } if (records != null) { for (ConsumerRecord record : records) { // The handler puts the byte array on a very fast ring buffer so it barely takes any time. handler.handleMessage(ByteBuffer.wrap(record.value())); } } } With this setup our performance has taken a horrendous hit as soon as we started this one thread that just polls Kafka in a loop. I profiled the application using Java Mission Control and have a few insights. 1. There doesn't seem to be a single hotspot. The consumer just ends up using a lot of CPU for handing such a low number of messages. Our process was using 16% CPU before we added a single consumer and it went to 25% and above after. That's an increase of over 50% from a single consumer getting a single digit number of small messages per second. Here is an attachment of the cpu usage breakdown in the consumer (the namespace is different because we shade the kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire process CPU is used on polling these 64 partitions (across 3 brokers) with single digit number of 70-80 byte odd messages. We've used bigger timeouts (100 seconds odd) and that doesn't seem to make much of a difference either. 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure whether this is expected but this seems like it would completely kill performance. Here is the exception tab of Java mission control. http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes which is about 10 thousand exceptions per second! The exception stack trace shows that it originates from the poll call. I don't understand how it can throw so many exceptions given I call poll it with a
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajiv Kurian updated KAFKA-3159: Description: We are using the new kafka consumer with the following config (as logged by kafka) metric.reporters = [] metadata.max.age.ms = 30 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = myGroup.id partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 2097152 bootstrap.servers = [myBrokerList] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 1000 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 ssl.truststore.password = null session.timeout.ms = 3 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class sf.kafka.VoidDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 4 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 fetch.min.bytes = 512 send.buffer.bytes = 131072 auto.offset.reset = earliest We use the consumer.assign() feature to assign a list of partitions and call poll in a loop. We have the following setup: 1. The messages have no key and we use the byte array deserializer to get byte arrays from the config. 2. The messages themselves are on an average about 75 bytes. We get this number by dividing the Kafka broker bytes-in metric by the messages-in metric. 3. Each consumer is assigned about 64 partitions of the same topic spread across three brokers. 4. We get very few messages per second maybe around 1-2 messages across all partitions on a client right now. 5. We have no compression on the topic. Our run loop looks something like this while (isRunning()) { ConsumerRecordsrecords = null; try { // Here timeout is about 10 seconds, so it is pretty big. records = consumer.poll(timeout); } catch (Exception e) { // This never hits for us logger.error("Exception polling Kafka ", e); records = null; } if (records != null) { for (ConsumerRecord record : records) { // The handler puts the byte array on a very fast ring buffer so it barely takes any time. handler.handleMessage(ByteBuffer.wrap(record.value())); } } } With this setup our performance has taken a horrendous hit as soon as we started this one thread that just polls Kafka in a loop. I profiled the application using Java Mission Control and have a few insights. 1. There doesn't seem to be a single hotspot. The consumer just ends up using a lot of CPU for handing such a low number of messages. Our process was using 16% CPU before we added a single consumer and it went to 25% and above after. That's an increase of over 50% from a single consumer getting a single digit number of small messages per second. Here is an attachment of the cpu usage breakdown in the consumer (the namespace is different because we shade the kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire process CPU is used on polling these 64 partitions (across 3 brokers) with single digit number of 70-80 byte odd messages. We've used bigger timeouts (100 seconds odd) and that doesn't seem to make much of a difference either. 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure whether this is expected but this seems like it would completely kill performance. Here is the exception tab of Java mission control. http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes which is about 10 thousand exceptions per second! The exception stack trace shows that it originates from the poll call. I don't understand how it can throw so many exceptions given I call poll it with a
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajiv Kurian updated KAFKA-3159: Description: We are using the new kafka consumer with the following config (as logged by kafka) metric.reporters = [] metadata.max.age.ms = 30 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = myGroup.id partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 2097152 bootstrap.servers = [myBrokerList] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 1000 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 ssl.truststore.password = null session.timeout.ms = 3 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class sf.kafka.VoidDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 4 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 fetch.min.bytes = 512 send.buffer.bytes = 131072 auto.offset.reset = earliest We use the consumer.assign() feature to assign a list of partitions and call poll in a loop. We have the following setup: 1. The messages have no key and we use the byte array deserializer to get byte arrays from the config. 2. The messages themselves are on an average about 75 bytes. We get this number by dividing the Kafka broker bytes-in metric by the messages-in metric. 3. Each consumer is assigned about 64 partitions of the same topic spread across three brokers. 4. We get very few messages per second maybe around 1-2 messages across all partitions on a client right now. 5. We have no compression on the topic. Our run loop looks something like this while (isRunning()) { ConsumerRecordsrecords = null; try { // Here timeout is about 10 seconds, so it is pretty big. records = consumer.poll(timeout); } catch (Exception e) { // This never hits for us logger.error("Exception polling Kafka ", e); records = null; } if (records != null) { for (ConsumerRecord record : records) { // The handler puts the byte array on a very fast ring buffer so it barely takes any time. handler.handleMessage(ByteBuffer.wrap(record.value())); } } } With this setup our performance has taken a horrendous hit as soon as we started this one thread that just polls Kafka in a loop. I profiled the application using Java Mission Control and have a few insights. 1. There doesn't seem to be a single hotspot. The consumer just ends up using a lot of CPU for handing such a low number of messages. Our process was using 16% CPU before we added a single consumer and it went to 25% and above after. That's an increase of over 50% from a single consumer getting a single digit number of small messages per second. Here is an attachment of the cpu usage breakdown in the consumer (the namespace is different because we shade the kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire process CPU is used on polling these 64 partitions (across 3 brokers) with single digit number of 70-80 byte odd messages. We've used bigger timeouts (100 seconds odd) and that doesn't seem to make much of a difference either. 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure whether this is expected but this seems like it would completely kill performance. Here is the exception tab of Java mission control. http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes which is about 10 thousand exceptions per second! The exception stack trace shows that it originates from the poll call. I don't understand how it can throw so many exceptions given I call poll it with a
[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajiv Kurian updated KAFKA-3159: Description: We are using the new kafka consumer with the following config (as logged by kafka) metric.reporters = [] metadata.max.age.ms = 30 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = myGroup.id partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 2097152 bootstrap.servers = [myBrokerList] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 1000 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 ssl.truststore.password = null session.timeout.ms = 3 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class sf.kafka.VoidDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 4 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 fetch.min.bytes = 512 send.buffer.bytes = 131072 auto.offset.reset = earliest We use the consumer.assign() feature to assign a list of partitions and call poll in a loop. We have the following setup: 1. The messages have no key and we use the byte array deserializer to get byte arrays from the config. 2. The messages themselves are on an average about 75 bytes. We get this number by dividing the Kafka broker bytes-in metric by the messages-in metric. 3. Each consumer is assigned about 64 partitions of the same topic spread across three brokers. 4. We get very few messages per second maybe around 1-2 messages across all partitions on a client right now. 5. We have no compression on the topic. Our run loop looks something like this while (isRunning()) { ConsumerRecordsrecords = null; try { // Here timeout is about 10 seconds, so it is pretty big. records = consumer.poll(timeout); } catch (Exception e) { // This never hits for us logger.error("Exception polling Kafka ", e); records = null; } if (records != null) { for (ConsumerRecord record : records) { // The handler puts the byte array on a very fast ring buffer so it barely takes any time. handler.handleMessage(ByteBuffer.wrap(record.value())); } } } With this setup our performance has taken a horrendous hit as soon as we started this one thread that just polls Kafka in a loop. I profiled the application using Java Mission Control and have a few insights. 1. There doesn't seem to be a single hotspot. The consumer just ends up using a lot of CPU for handing such a low number of messages. Our process was using 16% CPU before we added a single consumer and it went to 25% and above after. That's an increase of over 50% from a single consumer getting a single digit number of small messages per second. Here is an attachment of the cpu usage breakdown in the consumer (the namespace is different because we shade the kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire process CPU is used on polling these 64 partitions (across 3 brokers) with single digit number of 70-80 byte odd messages. We've used bigger timeouts (100 seconds odd) and that doesn't seem to make much of a difference either. 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure whether this is expected but this seems like it would completely kill performance. Here is the exception tab of Java mission control. http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes which is about 10 thousand exceptions per second! The exception stack trace shows that it originates from the poll call. I don't understand how it can throw so many exceptions given I call poll it with a