[
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14385500#comment-14385500
]
Rajiv Kurian edited comment on KAFKA-2045 at 3/28/15 7:40 PM:
--------------------------------------------------------------
[~jkreps] Totally agree with you on the concerns with a re-write. I am sure
I'll end up re-using most of the code, otherwise it will take too long in any
case. But given this is just a prototype, I want the freedom to be able to make
changes without being bound by the existing architecture and class hierarchy
of the client. Even if I do re-implement some of the parts I'll make sure that
the client can (a) Do metadata requests so it can react to leaders moving etc.
(b) Actually read from multiple topic/partitions spread across multiple brokers
and not just a single broker. Again since this is just a rewrite with the sole
purpose of exploring possible performance improvements there can be mainly two
consequences:
i) It shows no improvements: In that case we end up not spending too much time
changing the current code, and the hacky code just gets us to this conclusion
faster.
ii) It shows interesting improvements: If this were true, we can afford to
spend some time seeing which things actually improved performance and make a
call on how to integrate best.
It might be counterproductive to look at the current client implementation and
look at the % of time spent in each of the bottlenecks because those numbers
are a consequence of the current memory layout. For example if we do an on the
fly CRC check and decompression - CRC check time might go up a bit because now
we are not striding over a contiguous ByteBuffer in one sweep. Right now the
current client has this pattern --- CRC check on Message1 --> CRC check on
Message2 .... --> CRC check on MessageN --> Hand message 1 to consumer .... -->
Hand message N to consumer. Instead with the current proposal we will have a
pattern of ----- Do CRC on a Message1 --> Hand Message1 to consumer --> Do
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are
separated by potential (certain?) cache floundering during the handling of the
message by the consumer. On the other hand from the perspective of the
consumer, the pattern looks like this -- Do CRC and validation on all messages
starting with 1 to N --> Hand messages 1 to N to client. Now by the time the
Kafka consumer is done with validating and deserializing message N, message 1
is possibly already out of the cache. With the new approach since we hand over
a message right after validating it, we give the consumer a hot in cache
message, which might improve the consumer processing enough to offset for the
loss in CRC striding efficiency. Or it may not. It might just turn out that
doing the CRC validation upfront is just a pure win since all the CRC tables
will be in cache etc and striding access for the CRC math is worth an extra
iteration of the ByteBuffer contents. But it is might still be more profitable
to elide copies and prevent creation of objects by doing on the fly decoding
and handing out indexes into the actual response ByteBuffer. This result might
further be affected by how expensive the deserialization and processing of the
message is. If the message is a bloated JSON encoded object that is
de-serialized into a POJO and then processed really slowly then none of this
will probably matter. On the other hand if the message is a compact and binary
encoded and can be processed with minimal cache misses, this stuff might add
up. My point is that basing the TODOs on the current profile may not be optimal
because the profile is a massive consequence of the current layout and
allocation patterns. Also the profile will give %s and we might be able to keep
the same %s but just still reduce the overall time taken for the entire
consumer processing cycle. Just to belabor the point even further, the current
hash map implementations might suffer so many cache misses that they mask an
underlying improvement opportunity for the data in the maps. Switching to
compact primitive arrays based open hash maps might surface that opportunity
again.
Is there a performance test that is used to keep track of the new Consumer's
performance? If so maybe I can wrap that in a JMH suite and re-use that to test
improvements?
was (Author: rzidane):
[~jkreps] Totally agree with you on the concerns with a re-write. I am sure
I'll end up re-using most of the code, otherwise it will take too long in any
case. But given this is just a prototype, I want the freedom to be able to make
changes without being bound by the existing architecture and class hierarchy
of the client. Even if I do re-implement some of the parts I'll make sure that
the client can (a) Do metadata requests so it can react to leaders moving etc.
(b) Actually read from multiple topic/partitions spread across multiple brokers
and not just a single broker. Again since this is just a rewrite with the sole
purpose of exploring possible performance improvements there can be mainly two
consequences:
i) It shows no improvements: In that case we end up not spending too much time
changing the current code, and the hacky code just gets us to this conclusion
faster.
ii) It shows interesting improvements: If this were true, we can afford to
spend some time seeing which things actually improved performance and make a
call on how to integrate best.
It might be counterproductive to look at the current client implementation and
look at the % of time spent in each of the bottlenecks because those numbers
are a consequence of the current memory layout. For example if we do an on the
fly CRC check and decompression - CRC check time might go up a bit because now
we are not striding over a contiguous ByteBuffer in one sweep. Right now the
current client has this pattern --- CRC check on Message1 --> CRC check on
Message2 .... --> CRC check on MessageN --> Hand message 1 to consumer .... -->
Hand message N to consumer. Instead with the current proposal we will have a
pattern of ----- Do CRC on a Message1 --> Hand Message1 to consumer --> Do
CRC on a Message2 --> Hand Message2 to the consumer . So the CRC checks are
separated by potential (certain?) cache floundering during the handling of the
message by the client. On the other hand from the perspective of the consumer,
the pattern looks like this -- Do CRC and validation on all messages starting
with 1 to N --> Hand messages 1 to N to client. Now by the time the Kafka
consumer is done with validating and deserializing message N, message 1 is
possibly already out of the cache. With the new approach since we hand over a
message right after validating it, we give the consumer a hot in cache message,
which might improve the consumer processing enough to offset for the loss in
CRC striding efficiency. Or it may not. It might just turn out that doing the
CRC validation upfront is just a pure win since all the CRC tables will be in
cache etc and striding access for the CRC math is worth an extra iteration of
the ByteBuffer contents. But it is might still be more profitable to elide
copies and prevent creation of objects by doing on the fly decoding and handing
out indexes into the actual response ByteBuffer. This result might further be
affected by how expensive the deserialization and processing of the message is.
If the message is a bloated JSON encoded object that is de-serialized into a
POJO and then processed really slowly then none of this will probably matter.
On the other hand if the message is a compact and binary encoded and can be
processed with minimal cache misses, this stuff might add up. My point is that
basing the TODOs on the current profile may not be optimal because the profile
is a massive consequence of the current layout and allocation patterns. Also
the profile will give %s and we might be able to keep the same %s but just
still reduce the overall time taken for the entire consumer processing cycle.
Just to belabor the point even further, the current hash map implementations
might suffer so many cache misses that they mask an underlying improvement
opportunity for the data in the maps. Switching to compact primitive arrays
based open hash maps might surface that opportunity again.
Is there a performance test that is used to keep track of the new Consumer's
performance? If so maybe I can wrap that in a JMH suite and re-use that to test
improvements?
> Memory Management on the consumer
> ---------------------------------
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)