[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385500#comment-14385500
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/28/15 7:40 PM:
--------------------------------------------------------------

[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions spread across multiple brokers 
and not just a single broker. Again since this is just a rewrite with the sole 
purpose of exploring possible performance improvements there can be mainly two 
consequences:
i) It shows no improvements: In that case we end up not spending too much time 
changing the current code, and the hacky code just gets us to this conclusion 
faster.
ii) It shows interesting improvements: If this were true, we can afford to 
spend some time seeing which things actually improved performance and make a 
call on how to integrate best.

It might be counterproductive to look at the current client implementation and 
look at the % of time spent in each of the bottlenecks because those numbers 
are a consequence of the current memory layout. For example if we do an on the 
fly CRC check and decompression - CRC check time might go up a bit because now 
we are not striding over a contiguous ByteBuffer in one sweep. Right now the 
current client has this pattern ---  CRC check on Message1 --> CRC check on 
Message2 .... --> CRC check on MessageN --> Hand message 1 to consumer .... --> 
Hand message N to consumer. Instead with the current proposal we will have a 
pattern of  -----  Do CRC on a Message1 --> Hand Message1 to consumer --> Do 
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are 
separated by potential (certain?) cache floundering during the handling of the 
message by the client. On the other hand from the perspective of the consumer, 
the pattern looks like this -- Do CRC and validation on all messages starting 
with 1 to  N --> Hand messages 1 to N to client. Now by the time the Kafka 
consumer is done with validating and deserializing message N, message 1 is 
possibly already out of the cache. With the new approach since we hand over a 
message right after validating it, we give the consumer a hot in cache message, 
which might improve the consumer processing enough to offset for the loss in 
CRC striding efficiency. Or it may not. It might just turn out that doing the 
CRC validation upfront is just a pure win since all the CRC tables will be in 
cache etc and striding access for the CRC math is worth an extra iteration of 
the ByteBuffer contents. But it is might still be more profitable to elide 
copies and prevent creation of objects by doing on the fly decoding and handing 
out indexes into the actual response ByteBuffer. This result might further be 
affected by how expensive the deserialization and processing of the message is. 
If the message is a bloated JSON encoded object that is de-serialized into a 
POJO and then processed really slowly then none of this will probably matter. 
On the other hand if the message is a compact and binary encoded and can be 
processed with minimal cache misses, this stuff might add up. My point is that 
basing the TODOs on the current profile may not be optimal because the profile 
is a massive consequence of the current layout and allocation patterns. Also 
the profile will give %s and we might be able to keep the same %s but just 
still reduce the overall time taken for the entire consumer processing cycle. 
Just to belabor the point even further, the current hash map implementations 
might suffer so many cache misses that they mask an underlying improvement 
opportunity for the data in the maps. Switching to compact primitive arrays 
based open hash maps might surface that opportunity again.

Is there a performance test that is used to keep track of the new Consumer's 
performance? If so maybe I can wrap that in a JMH suite and re-use that to test 
improvements?


was (Author: rzidane):
[~jkreps] Totally agree with you on the concerns with a re-write. I am sure 
I'll end up re-using most of the code, otherwise it will take too long in any 
case. But given this is just a prototype, I want the freedom to be able to make 
changes without being bound by the existing architecture  and class hierarchy 
of the client. Even if I do re-implement some of the parts I'll make sure that 
the client can (a) Do metadata requests so it can react to leaders moving etc. 
(b) Actually read from multiple topic/partitions spread across multiple brokers 
and not just a single broker. Again since this is just a rewrite with the sole 
purpose of exploring possible performance improvements there can be mainly two 
consequences:
i) It shows no improvements: In that case we end up not spending too much time 
changing the current code, and the hacky code just gets us to this conclusion 
faster.
ii) It shows interesting improvements: If this were true, we can afford to 
spend some time seeing which things actually improved performance and make a 
call on how to integrate best.

It might be counterproductive to look at the current client implementation and 
look at the % of time spent in each of the bottlenecks because those numbers 
are a consequence of the current memory layout. For example if we do an on the 
fly CRC check and decompression - CRC check time might go up a bit because now 
we are not striding over a contiguous ByteBuffer in one sweep. Right now the 
current client has this pattern ---  CRC check on Message1--> CRC check on 
Message2 ....--> CRC check on MessageN --> Hand message 1 to consumer ....--> 
Hand message N to consumer. Instead with the current proposal we will have a 
pattern of  -----  Do CRC on a Message1 --> Hand Message1 to consumer --> Do 
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are 
separated by potential (certain?) cache floundering during the handling of the 
message by the client. On the other hand from the perspective of the consumer, 
the pattern looks like this -- Do CRC and validation on all messages starting 
with 1 to  N --> Hand messages 1 to N to client. Now by the time the Kafka 
consumer is done with validating and deserializing message N, message 1 is 
possibly already out of the cache. With the new approach since we hand over a 
message right after validating it, we give the consumer a hot in cache message, 
which might improve the consumer processing enough to offset for the loss in 
CRC striding efficiency. Or it may not. It might just turn out that doing the 
CRC validation upfront is just a pure win since all the CRC tables will be in 
cache etc and striding access for the CRC math is worth an extra iteration of 
the ByteBuffer contents. But it is might still be more profitable to elide 
copies and prevent creation of objects by doing on the fly decoding and handing 
out indexes into the actual response ByteBuffer. This result might further be 
affected by how expensive the deserialization and processing of the message is. 
If the message is a bloated JSON encoded object that is de-serialized into a 
POJO and then processed really slowly then none of this will probably matter. 
On the other hand if the message is a compact and binary encoded and can be 
processed with minimal cache misses, this stuff might add up. My point is that 
basing the TODOs on the current profile may not be optimal because the profile 
is a massive consequence of the current layout and allocation patterns. Also 
the profile will give %s and we might be able to keep the same %s but just 
still reduce the overall time taken for the entire consumer processing cycle. 
Just to belabor the point even further, the current hash map implementations 
might suffer so many cache misses that they mask an underlying improvement 
opportunity for the data in the maps. Switching to compact primitive arrays 
based open hash maps might surface that opportunity again.

Is there a performance test that is used to keep track of the new Consumer's 
performance? If so maybe I can wrap that in a JMH suite and re-use that to test 
improvements?

> Memory Management on the consumer
> ---------------------------------
>
>                 Key: KAFKA-2045
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2045
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to