[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-09 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140049#comment-15140049
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I am running a patched broker with a consumer consuming partitions that have no 
messages and it seems to be working fine. So it looks good so far. I'll run it 
for longer and then finally run it with real messages to make sure there is no 
regression. Thanks every one!

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and abov

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-08 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138038#comment-15138038
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I'll try it tomorrow for sure.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shad

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135586#comment-15135586
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Thanks Jason. I'll try to apply this patch early next week. Should I build 
trunk + patch or 0.9.0 + patch?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu u

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134813#comment-15134813
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Though I should mention that I've seen the same issue in older brokers 0.8.2.x 
etc too if I remember so it doesn't seem exclusive to 0.9.x.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. H

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134776#comment-15134776
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

It does seem like it is related if not the same problem.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is

[jira] [Comment Edited] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134562#comment-15134562
 ] 

Rajiv Kurian edited comment on KAFKA-3159 at 2/5/16 5:49 PM:
-

I think I've found the underlying issue (might not be the only one in play). It 
turns out that when I don't have any messages in the log, the Kafka broker 
sends back a reply with no messages immediately instead of respecting the 
fetch_max_wait_ms or the fetch_min_bytes. The EOFExceptions were probably just 
raised from parsing empty message sets. I can reproduce this consistently. 
Steps:
1. Create a topic with a small retention say 5 minutes or wait for said topic 
to have all its logs cleaned.
2. Start consuming on the topic without any messages being sent to the topic.
3. Observe that Kafka sends back an empty reply to every fetch request almost 
immediately. This can be observed with tcp-dump, or monitoring the 
networking-in/out or ngrep etc. I also verified it by writing my own client and 
observing that my requests get immediate replies when the log is empty.
4. As soon as you start sending messages to the topic, the problem goes away.

We've actually hit this problem in the past (seeing massive number of network 
traffic) when we were subscribed to a single topic that gets no messages. We 
didn't know the underlying issue then but I am pretty sure it is this problem.

This is a problem if any consumer is sending fetch requests to at least one 
broker that is a leader for the partitions being queried but has no messages 
retained in its log. In real life it can also be a problem. Here are a few use 
cases:
i) Metadata like topics that are always consumed but very rarely ever written 
to. We've run into this in the past like I said.
ii) During feature development one can switch on the consumers, and put the 
producers behind a feature flag. This was the problem we ran into. The consumer 
code went ahead before the producer code was integrated/switched on and we had 
to roll back because of the massive regression.

Moreover it goes against all intuition that doing fetch requests against an 
empty topic-partition should not be more expensive than actually getting data.


was (Author: ra...@signalfx.com):
I think I've found the isse. It turns out that when I don't have any messages 
in the log, the kafka broker sends back a reply with no messages immediately 
instead of respecting the fetch_max_wait_ms or the fetch_min_bytes. The 
EOFExceptions were probably just raised from parsing empty message sets. I can 
reproduce this consistently. Steps:
1. Create a topic with a small retention say 5 minutes or wait for said topic 
to have all its logs cleaned.
2. Start consuming on the topic without any messages being sent to the topic.
3. Observe that Kafka sends back an empty reply to every fetch request almost 
immediately. This can be observed with tcp-dump, or monitoring the 
networking-in/out or ngrep etc. I also verified it by writing my own client and 
observing that my requests get immediate replies when the log is empty.
4. As soon as you start sending messages to the topic, the problem goes away.

We've actually hit this problem in the past (seeing massive number of network 
traffic) when we were subscribed to a single topic that gets no messages. We 
didn't know the underlying issue then but I am pretty sure it is this problem.

This is a problem if any consumer is sending fetch requests to at least one 
broker that is a leader for the partitions being queried but has no messages 
retained in its log. In real life it can also be a problem. Here are a few use 
cases:
i) Metadata like topics that are always consumed but very rarely ever written 
to. We've run into this in the past like I said.
ii) During feature development one can switch on the consumers, and put the 
producers behind a feature flag. This was the problem we ran into. The consumer 
code went ahead before the producer code was integrated/switched on and we had 
to roll back because of the massive regression.

Moreover it goes against all intuition that doing fetch requests against an 
empty topic-partition should not be more expensive than actually getting data.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka con

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-05 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134562#comment-15134562
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I think I've found the isse. It turns out that when I don't have any messages 
in the log, the kafka broker sends back a reply with no messages immediately 
instead of respecting the fetch_max_wait_ms or the fetch_min_bytes. The 
EOFExceptions were probably just raised from parsing empty message sets. I can 
reproduce this consistently. Steps:
1. Create a topic with a small retention say 5 minutes or wait for said topic 
to have all its logs cleaned.
2. Start consuming on the topic without any messages being sent to the topic.
3. Observe that Kafka sends back an empty reply to every fetch request almost 
immediately. This can be observed with tcp-dump, or monitoring the 
networking-in/out or ngrep etc. I also verified it by writing my own client and 
observing that my requests get immediate replies when the log is empty.
4. As soon as you start sending messages to the topic, the problem goes away.

We've actually hit this problem in the past (seeing massive number of network 
traffic) when we were subscribed to a single topic that gets no messages. We 
didn't know the underlying issue then but I am pretty sure it is this problem.

This is a problem if any consumer is sending fetch requests to at least one 
broker that is a leader for the partitions being queried but has no messages 
retained in its log. In real life it can also be a problem. Here are a few use 
cases:
i) Metadata like topics that are always consumed but very rarely ever written 
to. We've run into this in the past like I said.
ii) During feature development one can switch on the consumers, and put the 
producers behind a feature flag. This was the problem we ran into. The consumer 
code went ahead before the producer code was integrated/switched on and we had 
to roll back because of the massive regression.

Moreover it goes against all intuition that doing fetch requests against an 
empty topic-partition should not be more expensive than actually getting data.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following s

[jira] [Commented] (KAFKA-3200) MessageSet from broker seems invalid

2016-02-03 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131155#comment-15131155
 ] 

Rajiv Kurian commented on KAFKA-3200:
-

Got it.  Yeah it would be ideal to not have this problem. Also would be good to 
mention this in the guide, for people writing new clients.

> MessageSet from broker seems invalid
> 
>
> Key: KAFKA-3200
> URL: https://issues.apache.org/jira/browse/KAFKA-3200
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux,  running Oracle JVM 1.8
>Reporter: Rajiv Kurian
>
> I am writing a java consumer client for Kafka and using the protocol guide at 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>  to parse buffers. I am currently running into a problem parsing certain 
> fetch responses. Many times it works fine but some other times it does not. 
> It might just be a bug with my implementation in which case I apologize.
> My messages are uncompressed and exactly 23 bytes in length and has null 
> keys. So each Message in my MessageSet is exactly size 4 (crc) + 
> 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) + 0 (key is null) + 
> 4(num_value_bytes) + 23(value_bytes) = 37 bytes.
> So each element of the MessageSet itself is exactly 37 (size of message) + 8 
> (offset) + 4 (message_size) = 49 bytes.
> In comparison an empty message set element should be of size 8 (offset) + 4 
> (message_size) + 4 (crc) + 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) 
> + 0 (key is null) + 4(num_value_bytes) + 0(value is null)  = 26 bytes
> I occasionally receive a MessageSet which says size is 1000. A size of 1000 
> is not divisible by my MessageSet element size which is 49 bytes. When I 
> parse such a message set I can actually read 20 of message set elements(49 
> bytes) which is 980 bytes. I have 20 extra bytes to parse now which is 
> actually less than even an empty message (26 bytes). At this moment I don't 
> know how to parse the messages any more.
> I will attach a file for a response that can actually cause me to run into 
> this problem as well as the sample ccde.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3200) MessageSet from broker seems invalid

2016-02-03 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131150#comment-15131150
 ] 

Rajiv Kurian commented on KAFKA-3200:
-

Not attaching code any more since this seems known. Glad I am not the first one 
facing this.

> MessageSet from broker seems invalid
> 
>
> Key: KAFKA-3200
> URL: https://issues.apache.org/jira/browse/KAFKA-3200
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux,  running Oracle JVM 1.8
>Reporter: Rajiv Kurian
>
> I am writing a java consumer client for Kafka and using the protocol guide at 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>  to parse buffers. I am currently running into a problem parsing certain 
> fetch responses. Many times it works fine but some other times it does not. 
> It might just be a bug with my implementation in which case I apologize.
> My messages are uncompressed and exactly 23 bytes in length and has null 
> keys. So each Message in my MessageSet is exactly size 4 (crc) + 
> 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) + 0 (key is null) + 
> 4(num_value_bytes) + 23(value_bytes) = 37 bytes.
> So each element of the MessageSet itself is exactly 37 (size of message) + 8 
> (offset) + 4 (message_size) = 49 bytes.
> In comparison an empty message set element should be of size 8 (offset) + 4 
> (message_size) + 4 (crc) + 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) 
> + 0 (key is null) + 4(num_value_bytes) + 0(value is null)  = 26 bytes
> I occasionally receive a MessageSet which says size is 1000. A size of 1000 
> is not divisible by my MessageSet element size which is 49 bytes. When I 
> parse such a message set I can actually read 20 of message set elements(49 
> bytes) which is 980 bytes. I have 20 extra bytes to parse now which is 
> actually less than even an empty message (26 bytes). At this moment I don't 
> know how to parse the messages any more.
> I will attach a file for a response that can actually cause me to run into 
> this problem as well as the sample ccde.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3200) MessageSet from broker seems invalid

2016-02-03 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131148#comment-15131148
 ] 

Rajiv Kurian commented on KAFKA-3200:
-

Yeah I have a similar workaround but this seems unfortunate.

> MessageSet from broker seems invalid
> 
>
> Key: KAFKA-3200
> URL: https://issues.apache.org/jira/browse/KAFKA-3200
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux,  running Oracle JVM 1.8
>Reporter: Rajiv Kurian
>
> I am writing a java consumer client for Kafka and using the protocol guide at 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>  to parse buffers. I am currently running into a problem parsing certain 
> fetch responses. Many times it works fine but some other times it does not. 
> It might just be a bug with my implementation in which case I apologize.
> My messages are uncompressed and exactly 23 bytes in length and has null 
> keys. So each Message in my MessageSet is exactly size 4 (crc) + 
> 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) + 0 (key is null) + 
> 4(num_value_bytes) + 23(value_bytes) = 37 bytes.
> So each element of the MessageSet itself is exactly 37 (size of message) + 8 
> (offset) + 4 (message_size) = 49 bytes.
> In comparison an empty message set element should be of size 8 (offset) + 4 
> (message_size) + 4 (crc) + 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) 
> + 0 (key is null) + 4(num_value_bytes) + 0(value is null)  = 26 bytes
> I occasionally receive a MessageSet which says size is 1000. A size of 1000 
> is not divisible by my MessageSet element size which is 49 bytes. When I 
> parse such a message set I can actually read 20 of message set elements(49 
> bytes) which is 980 bytes. I have 20 extra bytes to parse now which is 
> actually less than even an empty message (26 bytes). At this moment I don't 
> know how to parse the messages any more.
> I will attach a file for a response that can actually cause me to run into 
> this problem as well as the sample ccde.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3200) MessageSet from broker seems invalid

2016-02-03 Thread Rajiv Kurian (JIRA)
Rajiv Kurian created KAFKA-3200:
---

 Summary: MessageSet from broker seems invalid
 Key: KAFKA-3200
 URL: https://issues.apache.org/jira/browse/KAFKA-3200
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.0
 Environment: Linux,  running Oracle JVM 1.8
Reporter: Rajiv Kurian


I am writing a java consumer client for Kafka and using the protocol guide at 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 
to parse buffers. I am currently running into a problem parsing certain fetch 
responses. Many times it works fine but some other times it does not. It might 
just be a bug with my implementation in which case I apologize.

My messages are uncompressed and exactly 23 bytes in length and has null keys. 
So each Message in my MessageSet is exactly size 4 (crc) + 1(magic_bytes) + 1 
(attributes) + 4(key_num_bytes) + 0 (key is null) + 4(num_value_bytes) + 
23(value_bytes) = 37 bytes.

So each element of the MessageSet itself is exactly 37 (size of message) + 8 
(offset) + 4 (message_size) = 49 bytes.

In comparison an empty message set element should be of size 8 (offset) + 4 
(message_size) + 4 (crc) + 1(magic_bytes) + 1 (attributes) + 4(key_num_bytes) + 
0 (key is null) + 4(num_value_bytes) + 0(value is null)  = 26 bytes

I occasionally receive a MessageSet which says size is 1000. A size of 1000 is 
not divisible by my MessageSet element size which is 49 bytes. When I parse 
such a message set I can actually read 20 of message set elements(49 bytes) 
which is 980 bytes. I have 20 extra bytes to parse now which is actually less 
than even an empty message (26 bytes). At this moment I don't know how to parse 
the messages any more.

I will attach a file for a response that can actually cause me to run into this 
problem as well as the sample ccde.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126877#comment-15126877
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

[~hachikuji] I tried your patch. The Exceptions are now gone, but the CPU has 
remained high (25% + from 17% before the new client was added). I have attached 
the CPU breakdown and the allocation break down screen shots and comments.
Some notes:
1. The exceptions seem to be gone completely. The overall CPU has gone down to 
25% odd from the 27% before. So it has gotten a bit better. But the percentage 
of CPU used by the Kafka part of the code has gone up to 40.58% of the total 
used by my process. Most of the CPU is now spent on hash map code. Again I 
don't understand why there is so much CPU being used to get single digit 60 
byte messages per second (64 partitions striped across 3 brokers).

2. The allocations % has believe it or not gone up even more at about 31.26% of 
my entire processes allocation. Again it is baffling that it allocates so much 
to get so few messages. The total sum allocations from the TLAB in the 5 minute 
period has gone up to 14.05 GB from the 6.95 GB done by the client which threw 
a lot of exceptions. Again that seems to be a staggering amount of allocations 
for something that does 1 message odd a second.

My poll timings are done with a 5 second timeout which seems high enough.

Let me know if I can do more profiling or provide other info.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here ti

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Attachment: Memory-profile-patched-client.png

Memory profile of the patched client. Notes:

1.A lot of it is in clients.consumer.internals.Fetcher.createFetchRequests(). 
Again quite a bit of hash map allocations.
2. The majority of the rest of allocations seems to be in NetworkClient.poll().

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Memory-profile-patched-client.png, Screen Shot 
> 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-02-01 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Attachment: Screen Shot 2016-02-01 at 11.09.32 AM.png

CPU break down of the patched client. Some notes:
1. 40.58% of the process' CPU profile is on these poll calls which are done 
with a timeout of 5 seconds.
2. A lot of cpu is spent on hash map operations.
3. The rest of the cpu seems to be spent mostly in NetworkClient.poll().

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
> Attachments: Screen Shot 2016-02-01 at 11.09.32 AM.png
>
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122659#comment-15122659
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Thanks Jason. I can try to do that early next week. Have a lot of deadlines 
this week so might not get the chance to get on it.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - ht

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122614#comment-15122614
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

Actually I managed to dig through the logs and find the producer config logs 
from the producer:
2016-01-26T02:53:31.497Z INFO  [PathChildrenCache-0] 
[s.o.a.k.c.producer.ProducerConfig   ] {}: ProducerConfig values: 
compression.type = none
metric.reporters = []
metadata.max.age.ms = 30
metadata.fetch.timeout.ms = 6
acks = 1
batch.size = 10240
reconnect.backoff.ms = 10
bootstrap.servers = [our-kafka-brokers]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 2097152
timeout.ms = 3
key.serializer = class sf.disco.kafka.VoidSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = false
value.serializer = class 
sf.org.apache.kafka.common.serialization.ByteArraySerializer
metrics.sample.window.ms = 3
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 100
client.id = 

I don't explicitly set compression and it appears from the config that no 
compression was set.

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null

[jira] [Commented] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122594#comment-15122594
 ] 

Rajiv Kurian commented on KAFKA-3159:
-

I don't enable compression on the topic. However the producer (0.8.2) might 
just decide to compress all the same. How can I tell?

> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> --
>
> Key: KAFKA-3159
> URL: https://issues.apache.org/jira/browse/KAFKA-3159
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: Linux, Oracle JVM 8.
>Reporter: Rajiv Kurian
>Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
> metadata.max.age.ms = 30
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = myGroup.id
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 2097152
> bootstrap.servers = [myBrokerList]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 1000
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> session.timeout.ms = 3
> metrics.num.samples = 2
> client.id = 
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class sf.kafka.VoidDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 4
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> fetch.min.bytes = 512
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords records = null;
> try {
> // Here timeout is about 10 seconds, so it is pretty big.
> records = consumer.poll(timeout);
> } catch (Exception e) {
>// This never hits for us
> logger.error("Exception polling Kafka ", e);
> records = null;
> }
> if (records != null) {
> for (ConsumerRecord record : records) {
>// The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
> handler.handleMessage(ByteBuffer.wrap(record.value()));
> }
> }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - 

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a timeout of 10 seconds and 
get a s

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a timeout of 10 seconds and 
get a s

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a timeout of 10 seconds and 
get a s

[jira] [Updated] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:

Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a timeout of 10 seconds and 
get a s

[jira] [Created] (KAFKA-3159) Kafka consumer 0.9.0.0 client poll is very CPU intensive under certain conditions

2016-01-27 Thread Rajiv Kurian (JIRA)
Rajiv Kurian created KAFKA-3159:
---

 Summary: Kafka consumer 0.9.0.0  client poll is very CPU intensive 
under certain conditions
 Key: KAFKA-3159
 URL: https://issues.apache.org/jira/browse/KAFKA-3159
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.9.0.0
 Environment: Linux, Oracle JVM 8.
Reporter: Rajiv Kurian


We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

metadata.max.age.ms = 30

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

group.id = myGroup.id

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

reconnect.backoff.ms = 50

sasl.kerberos.ticket.renew.window.factor = 0.8

max.partition.fetch.bytes = 2097152

bootstrap.servers = [myBrokerList]

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

enable.auto.commit = false

ssl.key.password = null

fetch.max.wait.ms = 1000

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

session.timeout.ms = 3

metrics.num.samples = 2

client.id = 

ssl.endpoint.identification.algorithm = null

key.deserializer = class sf.kafka.VoidDeserializer

ssl.protocol = TLS

check.crcs = true

request.timeout.ms = 4

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.keystore.location = null

heartbeat.interval.ms = 3000

auto.commit.interval.ms = 5000

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

fetch.min.bytes = 512

send.buffer.bytes = 131072

auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords records = null;
try {
// Here timeout is about 10 seconds, so it is pretty big.
records = consumer.poll(timeout);
} catch (Exception e) {
   // This never hits for us
logger.error("Exception polling Kafka ", e);
records = null;
}

if (records != null) {
for (ConsumerRecord record : records) {
   // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
handler.handleMessage(ByteBuffer.wrap(record.value()));
}
}
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/tHjdVnM So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a

[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385500#comment-14385500
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/28/15 7:40 PM:
--

[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions spread across multiple brokers 
and not just a single broker. Again since this is just a rewrite with the sole 
purpose of exploring possible performance improvements there can be mainly two 
consequences:
i) It shows no improvements: In that case we end up not spending too much time 
changing the current code, and the hacky code just gets us to this conclusion 
faster.
ii) It shows interesting improvements: If this were true, we can afford to 
spend some time seeing which things actually improved performance and make a 
call on how to integrate best.

It might be counterproductive to look at the current client implementation and 
look at the % of time spent in each of the bottlenecks because those numbers 
are a consequence of the current memory layout. For example if we do an on the 
fly CRC check and decompression - CRC check time might go up a bit because now 
we are not striding over a contiguous ByteBuffer in one sweep. Right now the 
current client has this pattern ---  CRC check on Message1 --> CRC check on 
Message2  --> CRC check on MessageN --> Hand message 1 to consumer  --> 
Hand message N to consumer. Instead with the current proposal we will have a 
pattern of  -  Do CRC on a Message1 --> Hand Message1 to consumer --> Do 
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are 
separated by potential (certain?) cache floundering during the handling of the 
message by the consumer. On the other hand from the perspective of the 
consumer, the pattern looks like this -- Do CRC and validation on all messages 
starting with 1 to  N --> Hand messages 1 to N to client. Now by the time the 
Kafka consumer is done with validating and deserializing message N, message 1 
is possibly already out of the cache. With the new approach since we hand over 
a message right after validating it, we give the consumer a hot in cache 
message, which might improve the consumer processing enough to offset for the 
loss in CRC striding efficiency. Or it may not. It might just turn out that 
doing the CRC validation upfront is just a pure win since all the CRC tables 
will be in cache etc and striding access for the CRC math is worth an extra 
iteration of the ByteBuffer contents. But it is might still be more profitable 
to elide copies and prevent creation of objects by doing on the fly decoding 
and handing out indexes into the actual response ByteBuffer. This result might 
further be affected by how expensive the deserialization and processing of the 
message is. If the message is a bloated JSON encoded object that is 
de-serialized into a POJO and then processed really slowly then none of this 
will probably matter. On the other hand if the message is a compact and binary 
encoded and can be processed with minimal cache misses, this stuff might add 
up. My point is that basing the TODOs on the current profile may not be optimal 
because the profile is a massive consequence of the current layout and 
allocation patterns. Also the profile will give %s and we might be able to keep 
the same %s but just still reduce the overall time taken for the entire 
consumer processing cycle. Just to belabor the point even further, the current 
hash map implementations might suffer so many cache misses that they mask an 
underlying improvement opportunity for the data in the maps. Switching to 
compact primitive arrays based open hash maps might surface that opportunity 
again.

Is there a performance test that is used to keep track of the new Consumer's 
performance? If so maybe I can wrap that in a JMH suite and re-use that to test 
improvements?


was (Author: rzidane):
[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions 

[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385500#comment-14385500
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/28/15 7:40 PM:
--

[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions spread across multiple brokers 
and not just a single broker. Again since this is just a rewrite with the sole 
purpose of exploring possible performance improvements there can be mainly two 
consequences:
i) It shows no improvements: In that case we end up not spending too much time 
changing the current code, and the hacky code just gets us to this conclusion 
faster.
ii) It shows interesting improvements: If this were true, we can afford to 
spend some time seeing which things actually improved performance and make a 
call on how to integrate best.

It might be counterproductive to look at the current client implementation and 
look at the % of time spent in each of the bottlenecks because those numbers 
are a consequence of the current memory layout. For example if we do an on the 
fly CRC check and decompression - CRC check time might go up a bit because now 
we are not striding over a contiguous ByteBuffer in one sweep. Right now the 
current client has this pattern ---  CRC check on Message1 --> CRC check on 
Message2  --> CRC check on MessageN --> Hand message 1 to consumer  --> 
Hand message N to consumer. Instead with the current proposal we will have a 
pattern of  -  Do CRC on a Message1 --> Hand Message1 to consumer --> Do 
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are 
separated by potential (certain?) cache floundering during the handling of the 
message by the client. On the other hand from the perspective of the consumer, 
the pattern looks like this -- Do CRC and validation on all messages starting 
with 1 to  N --> Hand messages 1 to N to client. Now by the time the Kafka 
consumer is done with validating and deserializing message N, message 1 is 
possibly already out of the cache. With the new approach since we hand over a 
message right after validating it, we give the consumer a hot in cache message, 
which might improve the consumer processing enough to offset for the loss in 
CRC striding efficiency. Or it may not. It might just turn out that doing the 
CRC validation upfront is just a pure win since all the CRC tables will be in 
cache etc and striding access for the CRC math is worth an extra iteration of 
the ByteBuffer contents. But it is might still be more profitable to elide 
copies and prevent creation of objects by doing on the fly decoding and handing 
out indexes into the actual response ByteBuffer. This result might further be 
affected by how expensive the deserialization and processing of the message is. 
If the message is a bloated JSON encoded object that is de-serialized into a 
POJO and then processed really slowly then none of this will probably matter. 
On the other hand if the message is a compact and binary encoded and can be 
processed with minimal cache misses, this stuff might add up. My point is that 
basing the TODOs on the current profile may not be optimal because the profile 
is a massive consequence of the current layout and allocation patterns. Also 
the profile will give %s and we might be able to keep the same %s but just 
still reduce the overall time taken for the entire consumer processing cycle. 
Just to belabor the point even further, the current hash map implementations 
might suffer so many cache misses that they mask an underlying improvement 
opportunity for the data in the maps. Switching to compact primitive arrays 
based open hash maps might surface that opportunity again.

Is there a performance test that is used to keep track of the new Consumer's 
performance? If so maybe I can wrap that in a JMH suite and re-use that to test 
improvements?


was (Author: rzidane):
[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions spr

[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-28 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385500#comment-14385500
 ] 

Rajiv Kurian commented on KAFKA-2045:
-

[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions spread across multiple brokers 
and not just a single broker. Again since this is just a rewrite with the sole 
purpose of exploring possible performance improvements there can be mainly two 
consequences:
i) It shows no improvements: In that case we end up not spending too much time 
changing the current code, and the hacky code just gets us to this conclusion 
faster.
ii) It shows interesting improvements: If this were true, we can afford to 
spend some time seeing which things actually improved performance and make a 
call on how to integrate best.

It might be counterproductive to look at the current client implementation and 
look at the % of time spent in each of the bottlenecks because those numbers 
are a consequence of the current memory layout. For example if we do an on the 
fly CRC check and decompression - CRC check time might go up a bit because now 
we are not striding over a contiguous ByteBuffer in one sweep. Right now the 
current client has this pattern ---  CRC check on Message1--> CRC check on 
Message2 --> CRC check on MessageN --> Hand message 1 to consumer --> 
Hand message N to consumer. Instead with the current proposal we will have a 
pattern of  -  Do CRC on a Message1 --> Hand Message1 to consumer --> Do 
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are 
separated by potential (certain?) cache floundering during the handling of the 
message by the client. On the other hand from the perspective of the consumer, 
the pattern looks like this -- Do CRC and validation on all messages starting 
with 1 to  N --> Hand messages 1 to N to client. Now by the time the Kafka 
consumer is done with validating and deserializing message N, message 1 is 
possibly already out of the cache. With the new approach since we hand over a 
message right after validating it, we give the consumer a hot in cache message, 
which might improve the consumer processing enough to offset for the loss in 
CRC striding efficiency. Or it may not. It might just turn out that doing the 
CRC validation upfront is just a pure win since all the CRC tables will be in 
cache etc and striding access for the CRC math is worth an extra iteration of 
the ByteBuffer contents. But it is might still be more profitable to elide 
copies and prevent creation of objects by doing on the fly decoding and handing 
out indexes into the actual response ByteBuffer. This result might further be 
affected by how expensive the deserialization and processing of the message is. 
If the message is a bloated JSON encoded object that is de-serialized into a 
POJO and then processed really slowly then none of this will probably matter. 
On the other hand if the message is a compact and binary encoded and can be 
processed with minimal cache misses, this stuff might add up. My point is that 
basing the TODOs on the current profile may not be optimal because the profile 
is a massive consequence of the current layout and allocation patterns. Also 
the profile will give %s and we might be able to keep the same %s but just 
still reduce the overall time taken for the entire consumer processing cycle. 
Just to belabor the point even further, the current hash map implementations 
might suffer so many cache misses that they mask an underlying improvement 
opportunity for the data in the maps. Switching to compact primitive arrays 
based open hash maps might surface that opportunity again.

Is there a performance test that is used to keep track of the new Consumer's 
performance? If so maybe I can wrap that in a JMH suite and re-use that to test 
improvements?

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-27 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384660#comment-14384660
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 9:07 PM:
--

1. "We can actually make serious performance improvements by improving memory 
allocation patterns" - Yeah this is definitely the crux of it. Any 
performance improvements should also look at long term effects like GC 
activity, longest GC pause etc in addition to just throughput. Even the 
throughput and latency numbers will have to be looked at for a long time 
especially in an application where things don't fit in the L1 or L2 caches. I 
have usually found that with Java most benchmarks (even ones conducted with 
JMH) lie because of how short in duration they are. Since Java has a Thread 
Local Allocation Buffer, objects  allocated in quick succession get allocated 
next to each other in memory too. So even though an ArrayList of objects is an 
array of pointers to objects, the fact that these objects were allocated next 
to each other means they get 95% (hand wave hand wave) of the benefits of an 
equivalent std::vector of structs in C++. The nice memory-striding effects of 
sequential buffers holds even if it is a linked list of Objects again given 
that the Objects themselves were next to each other. But over time even if a 
single Object is actually not deleted/shuffled in the ArrayList,  a garbage 
collection is very likely to move them around in memory and when this happens 
they don't move as an entire unit but separately. Now what began as sequential 
access degenerates into an array of pointers to randomly laid out objects. And 
performance of these is an order of magnitude lower than arrays of sequentially 
laid out structs in C. A ByteBuffer/sun.misc.Unsafe based approach on the other 
hand never changes memory layout so the benefits continue to hold. This is why 
in my experience the 99.99th and above percentiles of typical POJO based 
solutions tanks and is orders of magnitude worse than the 99th etc, whereas 
solutions based on ByteBuffers and sun.misc.Unsafe have 99.99s that are maybe 
4-5 times worse than the 99th. But again there might (will?) be 
other bottlenecks like the network or CRC that might show up before one can get 
the max out of such a design.

2. "We don't mangle the code to badly in doing so" - I am planning to write a 
prototype using my own code from scratch that would include things like on the 
fly protocol parsing, buffer management and socket management. I'll  keep 
looking at /copy  the existing code to ensure that I handle errors correctly. 
It is just easier to start from fresh - that way I can work solely on getting 
this to work rather than worrying about how to fit this design in the current 
class hierarchy. A separate prototype will also probably provide the best 
platform for a performance demo since I can use things like primitive array 
based open hash-maps and other non-allocating primitives based data structures 
for metadata management. I can also use char sequences instead of Java's 
allocating strings for topics and such just to see how much of a difference 
they make. It just gives me a lot of options without messing with trunk. If 
this works out and we see an improvement in performance that seems interesting, 
we can work on how best to not mangle the code and/or decide which parts are 
worth mangling for the extra performance. Thoughts?



was (Author: rzidane):
1. "We can actually make serious performance improvements by improving memory 
allocation patterns" - Yeah this is definitely the crux of it. Any 
performance improvements should also look at long term effects like GC 
activity, longest GC pause etc in addition to just throughput. Even the 
throughput and latency numbers will have to be looked at for a long time 
especially in an application where things don't fit in the L1 or L2 caches. I 
have usually found that with Java most benchmarks (even ones conducted with 
JMH) lie because of how short in duration they are. Since Java has a Thread 
Local Allocation Buffer, objects  allocated in quick succession get allocated 
next to each other in memory too. So even though an ArrayList of objects is an 
array of pointers to objects, the fact that these objects were allocated next 
to each other means they get 95% (hand wave hand wave) of the benefits of an 
equivalent std::vector of structs in C++. The nice memory-striding effects of 
sequential buffers holds even if it is a linked list of Objects again given 
that the Objects themselves were next to each other. But over time even if a 
single Object is actually not deleted/shuffled in the ArrayList,  a garbage 
collection is very likely to move them around in memory and when this happens 
they don't move as an entire unit but separately. Now what began as sequential 
access degenerates into an arra

[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-27 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384660#comment-14384660
 ] 

Rajiv Kurian commented on KAFKA-2045:
-

1. "We can actually make serious performance improvements by improving memory 
allocation patterns" - Yeah this is definitely the crux of it. Any 
performance improvements should also look at long term effects like GC 
activity, longest GC pause etc in addition to just throughput. Even the 
throughput and latency numbers will have to be looked at for a long time 
especially in an application where things don't fit in the L1 or L2 caches. I 
have usually found that with Java most benchmarks (even ones conducted with 
JMH) lie because of how short in duration they are. Since Java has a Thread 
Local Allocation Buffer, objects  allocated in quick succession get allocated 
next to each other in memory too. So even though an ArrayList of objects is an 
array of pointers to objects, the fact that these objects were allocated next 
to each other means they get 95% (hand wave hand wave) of the benefits of an 
equivalent std::vector of structs in C++. The nice memory-striding effects of 
sequential buffers holds even if it is a linked list of Objects again given 
that the Objects themselves were next to each other. But over time even if a 
single Object is actually not deleted/shuffled in the ArrayList,  a garbage 
collection is very likely to move them around in memory and when this happens 
they don't move as an entire unit but separately. Now what began as sequential 
access degenerates into an array of pointers to randomly laid out objects. And 
performance of these is an order of magnitude lower than arrays of sequentially 
laid out structs in C. A ByteBuffer/sun.misc.Unsafe based approach on the other 
hand never changes memory layout so the benefits continue to hold. This is why 
in my experience the 99.99th and above percentiles of typical POJO based 
solutions tanks and is orders of magnitude worse than the 99th etc, whereas 
solutions based on ByteBuffers and sun.misc.Unsafe have 99.99s that are maybe 
4-5 times worse than the 99th. But again there might (will?) be 
other bottlenecks like the network or CRC that might show up before one can get 
the max out of such a design.

2. "We don't mangle the code to badly in doing so" - I am planning to write a 
prototype using my own code from scratch that would include things like on the 
fly protocol parsing, buffer management and socket management. I'll  keep 
looking at /copy  the existing code to ensure that I handle errors correctly. 
It is just easier to start from fresh - that way I can work solely on getting 
this to work rather than worrying about how to fit this design in the current 
class hierarchy. A separate no strings prototype will also probably provide the 
best platform for a performance demo since I can use things like primitive 
array based open hash-maps and other non-allocating primitives based data 
structures for metadata management. It just gives me a lot of options without 
messing with trunk. If this works out and we see an improvement in performance 
that seems interesting, we can work on how best to not mangle the code etc. 
Thoughts?


> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-27 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384544#comment-14384544
 ] 

Rajiv Kurian commented on KAFKA-2045:
-

[~jkreps] the simple pool of ByteBuffers definitely sounds like an easier thing 
to start out with. Like you said a nice thing that a single buffer offers is 
absolute memory bounds, but I am sure there are other ways to tackle that. I 
could just have a setting for highest number of concurrent requests which is 
equal to the highest number of concurrent buffers per broker. We can then 
create buffers lazily (up to the max) and rotate between them in order. So for 
3 buffers we could go 0->1->2->0 etc. The consumer would still have an index 
into this pool as would the network producer. The network producer will not be 
able to re-use a response buffer that is still being iterated upon so the 
consumption of a response cannot be delayed forever without causing poll calls 
to run out of buffers and just return empty iterators.

Your proposed API for ConsumerRecords reuse sounds fine.

This gives me enough to work on a prototype, which I hope I can do soon with 
permission from the bosses.

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-26 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:54 AM:
--

Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 _
___:___}_+

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator ( : ) remains behind the network 
iterator( } ). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response has 

[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-26 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:53 AM:
--

Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 
{___:___}_+

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator ( : ) remains behind the network 
iterator( } ). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response 

[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-26 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:53 AM:
--

Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 _
{___:___}_+

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator ( : ) remains behind the network 
iterator( } ). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response has

[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-26 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
 ] 

Rajiv Kurian commented on KAFKA-2045:
-

Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 
|___:___|_}

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
| is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
} is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator (:) remains behind the network 
iterator(|). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response has been iterated through ( i.e bytes iterated == 
si

[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2015-03-26 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:51 AM:
--

Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 
|___:___|_}

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
| is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
} is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator ( : ) remains behind the network 
iterator(|). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response ha

[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-26 Thread Rajiv Kurian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383147#comment-14383147
 ] 

Rajiv Kurian commented on KAFKA-2045:
-

Bounding the ByteBuffers and statically allocating them would be great. On 
consumers do we need any more than a ByteBuffer per broker that the client is 
talking to? Why do we need a buffer per topic/partition? Even if the leader for 
a topic/partition changes, we will ultimately know about it and ask the new 
leader for data. This data will still be after the previous data for the 
topic/partition that moved so to the consumer it will just look like another 
message set and order per topic/partition is still maintained.

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)