[
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14383418#comment-14383418
]
Rajiv Kurian edited comment on KAFKA-2045 at 3/27/15 6:53 AM:
--------------------------------------------------------------
Copying from the email list and expanding here.
My proposal is a single RequestBuffer and a single ResponseBuffer per broker
per Consumer. We also need another ByteBuffer to write decompressed message
sets (only one message set at a time) to. Another part of the proposal is that
when we get a complete response we iterate through the ResponseBuffer and hand
out pointers into the buffer to the main low level iterator.
The work flow will look a bit like this:
i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is
complete.
iii) When the response is complete respond with an iterator to the response
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread
since we use the a single mutable iterator to go through the ByteBuffer.
It is tricker when we consider that during iteration the consumer might send
more kafka requests and call poll further. I have a proposal to handle this and
still allow requests/responses to be pipelined. I have written something like
this for another application and since this is all happening in a single thread
it is a bit easier. Here is my proposed design:
The response buffer looks a bit like this:
_____________________________________
{_______________:___________}_____________+
: is the consumer iterator i.e. the position of the next message to be
consumed. This is always at the start of a new response, new message set, new
message in a message set, end of a response etc. Because iterating on the fly
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.
Some details:
i) Most of the times the consumer iterator ( : ) remains behind the network
iterator( } ). It will catch up when we have consumed all messages.
ii) Sometimes we will have fewer bytes than required for a complete response at
the end of the buffer. In such a case we will have to wait till we have enough
space in the front of the buffer i.e. consumer iterator has moved on enough to
create enough space. In such a case we will write some special value at the
index where we skipped to the end. This will let the consumer know that it
needs to skip ahead to the front of the buffer. This means that every response
HAS to be prepended by a special header (can be a single byte) which says if
the following bytes are a valid message or not. Say 1 means valid, 0 means
invalid. The consumer will only know that there is more to read when the
network-producer sequence has gone ahead of the consumer sequence. And it will
either read the message right there (if the header says 1) or skip to the
beginning of the buffer (if the header says 0).
iii) Every time the network producer prepares to write a new response to an
index in the buffer it needs to ensure that there is at least 4 bytes (size of
message field) + 1 byte for the header + some other minimum amount we can use
as a heuristic before it considers the buffer slice usable. If the buffer slice
is not usable it has to write the skip ahead header (0) and increment its
sequence to point exactly to the end of the buffer. Once the network producer
finds enough space in the thread it should wait till at least 4 bytes are read
so that it can definitively know the request size. When it reads the size it is
certain how many contiguous bytes are required (size of message + 1 byte for
header) . Now it can decide with certainty whether it can continue with the
slice of the buffer it has (i.e from current pos till end of buffer) or if it
has to write the skip ahead header (0) and wait till it gets more contiguous
space. If it can continue then it will wait till the entire response is read
into the buffer (i,e bytes read == size of response). When this happens, it
needs to increment its sequence by size of response + 1 (1 for the header ) and
also set the header to 1 to indicate that there is a readable response.
iv) A ConsumerRecordIterator is only reset/created once we have an entire
contiguous response. Each ConsumerRecordIterator will have a pointer to the
beginning of the response and its size. The iterator will hand out
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a
pointer to the beginning of the message it is pointing to and a size/pointer to
the end. It can also have a mutable reference field for the Topic and an int
for the partition. All fields are mutable so that these flyweights can be
re-used.
v) Once an entire response has been iterated through ( i.e bytes iterated ==
size of message) we increment the consumer iterator by the size of the message
+ 1 (for the header).
This cycle continues and all we need is a single buffer and a few longs to
manage all the sequences. I might not have been very clear with the
explanation but the tldr version is that it is a circular buffer with a twist
that it handles variable sized messages. Since that means that based on the
sequence of messages we may not have enough contiguous space in the buffer to
represent an entire message, we need to do some trickery to communicate to the
consumer about this scenario and to instruct it to skip past the temporarily
wasted space. This trickery is implemented in form of a header per response. I
will be happy to answer questions about the design, and provide some diagrams
showing some typical runs.
Like Jay suggested if application level deserialization is a bottleneck that
needs to be solved by passing slices of these ByteBuffers out to a pool of
threads, then this approach WILL NOT work since we expect the ByteBuffer to be
linearly iterated in one go. If we want slices to be passed to a pool of
threads then probably copying each individual message to new ByteBuffers is the
only good option. For my application that is definitely not the case since
deserialization is free and the cache friendliness of iterating over a hot
buffer trumps every other factor.
To solve for this use case, the more low level iterator (discussed in previous
paragraphs) can be wrapped in a higher level iterator that just copies the
bytes for each message to a new ByteBuffer and hands them over. The low level
iterator is still responsible for buffer management and the higher level
iterator is just a wrapper over it and consumes a message by copying it's bytes
to a new ByteBuffer and hands that ByteBuffer to the application. The
application is now free to transfer these to other threads for
processing/deserialization. There is still a caveat with the higher level
iterator - it still MUST be consumed in its entirety on the thread that called
poll(). The ByteBuffers from the high level iterator could be sent to other
threads. This doesn't seem like much of a limitation especially if the API is
very clear that the iterator itself is not thread safe.
I understand that this is a big big change, but based on your feedback I'd be
happy to work on a prototype once I have the license to do so from my bosses.
Given our priorities I think this will happen soon. Would love to get more
feedback here.
was (Author: rzidane):
Copying from the email list and expanding here.
My proposal is a single RequestBuffer and a single ResponseBuffer per broker
per Consumer. We also need another ByteBuffer to write decompressed message
sets (only one message set at a time) to. Another part of the proposal is that
when we get a complete response we iterate through the ResponseBuffer and hand
out pointers into the buffer to the main low level iterator.
The work flow will look a bit like this:
i) Re-use the same request buffer to create a request and write to the socket.
ii) On poll re-use the same response buffer to read in the request till it is
complete.
iii) When the response is complete respond with an iterator to the response
ByteBuffer. The consumer must now consume the entire ByteBuffer on this thread
since we use the a single mutable iterator to go through the ByteBuffer.
It is tricker when we consider that during iteration the consumer might send
more kafka requests and call poll further. I have a proposal to handle this and
still allow requests/responses to be pipelined. I have written something like
this for another application and since this is all happening in a single thread
it is a bit easier. Here is my proposed design:
The response buffer looks a bit like this:
________________________________________
{_______________:___________}_____________+
: is the consumer iterator i.e. the position of the next message to be
consumed. This is always at the start of a new response, new message set, new
message in a message set, end of a response etc. Because iterating on the fly
means we will go from one token to the next one.
} is the network producer iterator i.e. the position of the next byte from the
broker. This can be any arbitrary byte boundary really.
+ is the end of the buffer.
Some details:
i) Most of the times the consumer iterator ( : ) remains behind the network
iterator( } ). It will catch up when we have consumed all messages.
ii) Sometimes we will have fewer bytes than required for a complete response at
the end of the buffer. In such a case we will have to wait till we have enough
space in the front of the buffer i.e. consumer iterator has moved on enough to
create enough space. In such a case we will write some special value at the
index where we skipped to the end. This will let the consumer know that it
needs to skip ahead to the front of the buffer. This means that every response
HAS to be prepended by a special header (can be a single byte) which says if
the following bytes are a valid message or not. Say 1 means valid, 0 means
invalid. The consumer will only know that there is more to read when the
network-producer sequence has gone ahead of the consumer sequence. And it will
either read the message right there (if the header says 1) or skip to the
beginning of the buffer (if the header says 0).
iii) Every time the network producer prepares to write a new response to an
index in the buffer it needs to ensure that there is at least 4 bytes (size of
message field) + 1 byte for the header + some other minimum amount we can use
as a heuristic before it considers the buffer slice usable. If the buffer slice
is not usable it has to write the skip ahead header (0) and increment its
sequence to point exactly to the end of the buffer. Once the network producer
finds enough space in the thread it should wait till at least 4 bytes are read
so that it can definitively know the request size. When it reads the size it is
certain how many contiguous bytes are required (size of message + 1 byte for
header) . Now it can decide with certainty whether it can continue with the
slice of the buffer it has (i.e from current pos till end of buffer) or if it
has to write the skip ahead header (0) and wait till it gets more contiguous
space. If it can continue then it will wait till the entire response is read
into the buffer (i,e bytes read == size of response). When this happens, it
needs to increment its sequence by size of response + 1 (1 for the header ) and
also set the header to 1 to indicate that there is a readable response.
iv) A ConsumerRecordIterator is only reset/created once we have an entire
contiguous response. Each ConsumerRecordIterator will have a pointer to the
beginning of the response and its size. The iterator will hand out
ConsumerRecord messages (or reuse them). Each ConsumerRecord also has a
pointer to the beginning of the message it is pointing to and a size/pointer to
the end. It can also have a mutable reference field for the Topic and an int
for the partition. All fields are mutable so that these flyweights can be
re-used.
v) Once an entire response has been iterated through ( i.e bytes iterated ==
size of message) we increment the consumer iterator by the size of the message
+ 1 (for the header).
This cycle continues and all we need is a single buffer and a few longs to
manage all the sequences. I might not have been very clear with the
explanation but the tldr version is that it is a circular buffer with a twist
that it handles variable sized messages. Since that means that based on the
sequence of messages we may not have enough contiguous space in the buffer to
represent an entire message, we need to do some trickery to communicate to the
consumer about this scenario and to instruct it to skip past the temporarily
wasted space. This trickery is implemented in form of a header per response. I
will be happy to answer questions about the design, and provide some diagrams
showing some typical runs.
Like Jay suggested if application level deserialization is a bottleneck that
needs to be solved by passing slices of these ByteBuffers out to a pool of
threads, then this approach WILL NOT work since we expect the ByteBuffer to be
linearly iterated in one go. If we want slices to be passed to a pool of
threads then probably copying each individual message to new ByteBuffers is the
only good option. For my application that is definitely not the case since
deserialization is free and the cache friendliness of iterating over a hot
buffer trumps every other factor.
To solve for this use case, the more low level iterator (discussed in previous
paragraphs) can be wrapped in a higher level iterator that just copies the
bytes for each message to a new ByteBuffer and hands them over. The low level
iterator is still responsible for buffer management and the higher level
iterator is just a wrapper over it and consumes a message by copying it's bytes
to a new ByteBuffer and hands that ByteBuffer to the application. The
application is now free to transfer these to other threads for
processing/deserialization. There is still a caveat with the higher level
iterator - it still MUST be consumed in its entirety on the thread that called
poll(). The ByteBuffers from the high level iterator could be sent to other
threads. This doesn't seem like much of a limitation especially if the API is
very clear that the iterator itself is not thread safe.
I understand that this is a big big change, but based on your feedback I'd be
happy to work on a prototype once I have the license to do so from my bosses.
Given our priorities I think this will happen soon. Would love to get more
feedback here.
> Memory Management on the consumer
> ---------------------------------
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)