[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
 ] 

Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:54 AM:
--------------------------------------------------------------

Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 _____________________________________
_______________:___________}_____________+

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator ( : ) remains behind the network 
iterator( } ). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response has been iterated through ( i.e bytes iterated == 
size of message) we increment the consumer iterator by the size of the message 
+ 1 (for the header).

This cycle continues and all we need is a single buffer and a few longs to 
manage all the  sequences. I might not have been very clear with the 
explanation but the tldr version is that it is a circular buffer with a twist 
that it handles variable sized messages. Since that means that based on the 
sequence of messages we may not have enough contiguous space in the buffer to 
represent an entire message, we need to do some trickery to communicate to the 
consumer about this scenario and to instruct it to skip past the temporarily 
wasted space. This trickery is implemented in form of a header per response. I 
will be happy to answer questions about the design, and provide some diagrams 
showing some typical runs.

Like Jay suggested if application level deserialization is a bottleneck that 
needs to be solved by passing slices of these ByteBuffers out to a pool of 
threads, then this approach WILL NOT work since we expect the ByteBuffer to be 
linearly iterated in one go. If we want slices to be passed to a pool of 
threads then probably copying each individual message to new ByteBuffers is the 
only good option. For my application that is definitely not the case since 
deserialization is free and the cache friendliness of iterating over a hot 
buffer trumps every other factor.

To solve for this use case, the more low level iterator (discussed in previous 
paragraphs) can be wrapped in a higher level iterator that just copies the 
bytes for each message to a new ByteBuffer and hands them over. The low level 
iterator is still responsible for buffer management and the higher level 
iterator is just a wrapper over it and consumes a message by copying it's bytes 
to a new ByteBuffer and hands that ByteBuffer to the application. The 
application is now free to transfer these to other threads for 
processing/deserialization. There is still a caveat with the higher level 
iterator - it still MUST be consumed in its entirety on the thread that called 
poll(). The ByteBuffers from the high level iterator could be sent to other 
threads. This doesn't seem like much of a limitation especially if the API is 
very clear that the iterator itself is not thread safe.

I understand that this is a big big change, but based on your feedback I'd be 
happy to work on a prototype once I have the license to do so from my bosses. 
Given our priorities I think this will happen soon. Would love to get more 
feedback here.


was (Author: rzidane):
Copying from the email list and expanding here.

My proposal is a single RequestBuffer and a single ResponseBuffer per broker 
per Consumer. We also need another ByteBuffer to write decompressed message 
sets (only one message set at a time) to. Another part of the proposal is that 
when we get a complete response we iterate through the ResponseBuffer and hand 
out pointers into the buffer to the main low level iterator. 

The work flow will look a bit like this:

i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is 
complete.
iii) When the response is complete respond with an iterator to the response 
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread 
since we use the a single mutable iterator to go through the ByteBuffer.

It is tricker when we consider that during iteration the consumer might send 
more kafka requests and call poll further. I have a proposal to handle this and 
still allow requests/responses to be pipelined. I have written something like 
this for another application and since this is all happening in a single thread 
it is a bit easier. Here is my proposed design:

The response buffer looks a bit like this:
 _____________________________________
{_______________:___________}_____________+

: is the  consumer iterator i.e. the position of the next message to be 
consumed. This is always at the start of a new response, new message set, new 
message in a message set, end of a response etc. Because iterating on the fly 
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the 
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.

Some details:
i) Most of the times  the consumer iterator ( : ) remains behind the network 
iterator( } ). It will catch up when we have consumed all messages.

ii) Sometimes we will have fewer bytes than required for a complete response at 
the end of the buffer. In such a case we will have to wait till we have enough 
space in the front of the buffer i.e. consumer iterator has moved on enough to 
create enough space. In such a case we will write some special value at the 
index where we skipped to the end. This will let the consumer know that it 
needs to skip ahead to the front of the buffer. This means that every response 
HAS to be prepended by a special header (can be a single byte) which says if 
the following bytes are a valid message or not.  Say 1 means valid, 0 means 
invalid. The consumer will only know that there is more to read when the 
network-producer sequence has gone ahead of the consumer sequence. And it will 
either read the message right there (if the header says 1) or skip to the 
beginning of the buffer (if the header says 0).

iii) Every time the network producer prepares to write a new response to an 
index in the buffer it needs to ensure that there is at least 4 bytes (size of 
message field) + 1 byte for the header + some other minimum amount we can use 
as a heuristic before it considers the buffer slice usable. If the buffer slice 
is not usable it has to write the skip ahead header (0) and increment its 
sequence to point exactly to the end of the buffer. Once the network producer 
finds enough space in the thread it should wait till at least 4 bytes are read 
so that it can definitively know the request size. When it reads the size it is 
certain how many contiguous bytes are required (size of message + 1 byte for 
header) . Now it can decide with certainty whether it can continue with the 
slice of the buffer it has (i.e from current pos till end of buffer) or if it 
has to write the skip ahead header (0) and wait till it gets more contiguous 
space. If it can continue then it will wait till the entire response is read 
into the buffer (i,e bytes read == size of response). When this happens, it 
needs to increment its sequence by size of response + 1 (1 for the header ) and 
also set the header to 1 to indicate that there is a readable response.

iv)  A ConsumerRecordIterator is only reset/created once we have an entire 
contiguous response. Each ConsumerRecordIterator will have a pointer to the 
beginning of the response and its size. The iterator will hand out 
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a  
pointer to the beginning of the message it is pointing to and a size/pointer to 
the end. It can also have a mutable reference field for the Topic and an int 
for the partition. All fields are mutable so that these flyweights can be 
re-used.

v) Once an entire response has been iterated through ( i.e bytes iterated == 
size of message) we increment the consumer iterator by the size of the message 
+ 1 (for the header).

This cycle continues and all we need is a single buffer and a few longs to 
manage all the  sequences. I might not have been very clear with the 
explanation but the tldr version is that it is a circular buffer with a twist 
that it handles variable sized messages. Since that means that based on the 
sequence of messages we may not have enough contiguous space in the buffer to 
represent an entire message, we need to do some trickery to communicate to the 
consumer about this scenario and to instruct it to skip past the temporarily 
wasted space. This trickery is implemented in form of a header per response. I 
will be happy to answer questions about the design, and provide some diagrams 
showing some typical runs.

Like Jay suggested if application level deserialization is a bottleneck that 
needs to be solved by passing slices of these ByteBuffers out to a pool of 
threads, then this approach WILL NOT work since we expect the ByteBuffer to be 
linearly iterated in one go. If we want slices to be passed to a pool of 
threads then probably copying each individual message to new ByteBuffers is the 
only good option. For my application that is definitely not the case since 
deserialization is free and the cache friendliness of iterating over a hot 
buffer trumps every other factor.

To solve for this use case, the more low level iterator (discussed in previous 
paragraphs) can be wrapped in a higher level iterator that just copies the 
bytes for each message to a new ByteBuffer and hands them over. The low level 
iterator is still responsible for buffer management and the higher level 
iterator is just a wrapper over it and consumes a message by copying it's bytes 
to a new ByteBuffer and hands that ByteBuffer to the application. The 
application is now free to transfer these to other threads for 
processing/deserialization. There is still a caveat with the higher level 
iterator - it still MUST be consumed in its entirety on the thread that called 
poll(). The ByteBuffers from the high level iterator could be sent to other 
threads. This doesn't seem like much of a limitation especially if the API is 
very clear that the iterator itself is not thread safe.

I understand that this is a big big change, but based on your feedback I'd be 
happy to work on a prototype once I have the license to do so from my bosses. 
Given our priorities I think this will happen soon. Would love to get more 
feedback here.

> Memory Management on the consumer
> ---------------------------------
>
>                 Key: KAFKA-2045
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2045
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to