Re: Kafka sending messages with zero copy
Thanks Rajiv, looking forward to your prototype. Guozhang On Mon, Jan 26, 2015 at 2:30 PM, Rajiv Kurian wrote: > Hi Guozhang, > > I am a bit busy at work. When I get the change I'll definitely try to get a > proof of concept going. Not the kafka protocol, but just the buffering and > threading structures, maybe just write to another socket. I think it would > be useful just to get the queueing and buffer management going and prove > that it can be done in a zero copy way in a multi producer single consumer > environment. If that is working, then the single consumer can be the kafka > network sync thread. > > Thanks, > Rajiv > > On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang > wrote: > > > Hi Rajiv, > > > > Thanks for this proposal, it would be great if you can upload some > > implementation patch for the CAS idea and show some memory usage / perf > > differences. > > > > Guozhang > > > > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian > > wrote: > > > > > Resuscitating this thread. I've done some more experiments and > profiling. > > > My messages are very tiny (currently 25 bytes) per message and creating > > > multiple objects per message leads to a lot of churn. The memory churn > > > through creation of convenience objects is more than the memory being > > used > > > by my objects right now. I could probably batch my messages further, to > > > make this effect less pronounced. I did some rather unscientific > > > experiments with a flyweight approach on top of the ByteBuffer for a > > simple > > > messaging API (peer to peer NIO based so not a real comparison) and the > > > numbers were very satisfactory and there is no garbage created in > steady > > > state at all. Though I don't expect such good numbers from actually > going > > > through the broker + all the other extra stuff that a real producer > would > > > do, I think there is great potential here. > > > > > > The general mechanism for me is this: > > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar > > > performance) is created per partition. > > > ii) A CAS loop (in Java 7 and less) or even better > unsafe.getAndAddInt() > > in > > > Java 8 can be used to claim a chunk of bytes on the per topic buffer. > > This > > > code can be invoked from multiple threads in a wait free manner > > (wait-free > > > in Java 8, since getAndAddInt() is wait-free). Once a region in the > > buffer > > > is claimed, it can be operated on using the flyweight method that we > > talked > > > about. If the buffer doesn't have enough space then we can drop the > > message > > > or move onto a new buffer. Further this creates absolutely zero objects > > in > > > steady state (only a few objects created in the beginning). Even if the > > > flyweight method is not desired, the API can just take byte arrays or > > > objects that need to be serialized and copy them onto the per topic > > buffers > > > in a similar way. This API has been validated in Aeron too, so I am > > pretty > > > confident that it will work well. For the zero copy technique here is a > > > link to Aeron API with zero copy - > > > https://github.com/real-logic/Aeron/issues/18. The regular one copies > > byte > > > arrays but without any object creation. > > > iii) The producer send thread can then just go in FIFO order through > the > > > buffer sending messages that have been committed using NIO to rotate > > > between brokers. We might need a background thread to zero out used > > buffers > > > too. > > > > > > I've left out some details, but again none of this very revolutionary - > > > it's mostly the same techniques used in Aeron. I really think that we > can > > > keep the API ga rbage free and wait-free (even in the multi producer > > case) > > > without compromising how pretty it looks - the total zero copy API will > > low > > > level, but it should only be used by advanced users. Moreover the usual > > > producer.send(msg, topic, partition) can use the efficient ByteBuffer > > > offset API internally without it itself creating any garbage. With the > > > technique I talked about there is no need for an intermediate queue of > > any > > > kind since the underlying ByteBuffer per partition acts as the queue. > > > > > > I can do more experiments with some real producer code instead of my > toy > > > code to further validate the idea, but I am pretty sure that both > > > throughput and jitter characteristics will improve thanks to lower > > > contention (wait-free in java 8 with a single getAndAddInt() operation > > for > > > sync ) and better cache locality (C like buffers and a few constant > > number > > > of objects per partition). If you guys are interested, I'd love to talk > > > more. Again just to reiterate, I don't think the API will suffer at > all - > > > most of this can be done under the covers. Additionally it will open up > > > things so that a low level zero copy API is possible. > > > > > > Thanks, > > > Rajiv > > > > > > > > > > > -- > > -- Guozhang >
Re: Kafka sending messages with zero copy
Hi Guozhang, I am a bit busy at work. When I get the change I'll definitely try to get a proof of concept going. Not the kafka protocol, but just the buffering and threading structures, maybe just write to another socket. I think it would be useful just to get the queueing and buffer management going and prove that it can be done in a zero copy way in a multi producer single consumer environment. If that is working, then the single consumer can be the kafka network sync thread. Thanks, Rajiv On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang wrote: > Hi Rajiv, > > Thanks for this proposal, it would be great if you can upload some > implementation patch for the CAS idea and show some memory usage / perf > differences. > > Guozhang > > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian > wrote: > > > Resuscitating this thread. I've done some more experiments and profiling. > > My messages are very tiny (currently 25 bytes) per message and creating > > multiple objects per message leads to a lot of churn. The memory churn > > through creation of convenience objects is more than the memory being > used > > by my objects right now. I could probably batch my messages further, to > > make this effect less pronounced. I did some rather unscientific > > experiments with a flyweight approach on top of the ByteBuffer for a > simple > > messaging API (peer to peer NIO based so not a real comparison) and the > > numbers were very satisfactory and there is no garbage created in steady > > state at all. Though I don't expect such good numbers from actually going > > through the broker + all the other extra stuff that a real producer would > > do, I think there is great potential here. > > > > The general mechanism for me is this: > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar > > performance) is created per partition. > > ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() > in > > Java 8 can be used to claim a chunk of bytes on the per topic buffer. > This > > code can be invoked from multiple threads in a wait free manner > (wait-free > > in Java 8, since getAndAddInt() is wait-free). Once a region in the > buffer > > is claimed, it can be operated on using the flyweight method that we > talked > > about. If the buffer doesn't have enough space then we can drop the > message > > or move onto a new buffer. Further this creates absolutely zero objects > in > > steady state (only a few objects created in the beginning). Even if the > > flyweight method is not desired, the API can just take byte arrays or > > objects that need to be serialized and copy them onto the per topic > buffers > > in a similar way. This API has been validated in Aeron too, so I am > pretty > > confident that it will work well. For the zero copy technique here is a > > link to Aeron API with zero copy - > > https://github.com/real-logic/Aeron/issues/18. The regular one copies > byte > > arrays but without any object creation. > > iii) The producer send thread can then just go in FIFO order through the > > buffer sending messages that have been committed using NIO to rotate > > between brokers. We might need a background thread to zero out used > buffers > > too. > > > > I've left out some details, but again none of this very revolutionary - > > it's mostly the same techniques used in Aeron. I really think that we can > > keep the API ga rbage free and wait-free (even in the multi producer > case) > > without compromising how pretty it looks - the total zero copy API will > low > > level, but it should only be used by advanced users. Moreover the usual > > producer.send(msg, topic, partition) can use the efficient ByteBuffer > > offset API internally without it itself creating any garbage. With the > > technique I talked about there is no need for an intermediate queue of > any > > kind since the underlying ByteBuffer per partition acts as the queue. > > > > I can do more experiments with some real producer code instead of my toy > > code to further validate the idea, but I am pretty sure that both > > throughput and jitter characteristics will improve thanks to lower > > contention (wait-free in java 8 with a single getAndAddInt() operation > for > > sync ) and better cache locality (C like buffers and a few constant > number > > of objects per partition). If you guys are interested, I'd love to talk > > more. Again just to reiterate, I don't think the API will suffer at all - > > most of this can be done under the covers. Additionally it will open up > > things so that a low level zero copy API is possible. > > > > Thanks, > > Rajiv > > > > > > -- > -- Guozhang >
Re: Kafka sending messages with zero copy
Hi Rajiv, Thanks for this proposal, it would be great if you can upload some implementation patch for the CAS idea and show some memory usage / perf differences. Guozhang On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian wrote: > Resuscitating this thread. I've done some more experiments and profiling. > My messages are very tiny (currently 25 bytes) per message and creating > multiple objects per message leads to a lot of churn. The memory churn > through creation of convenience objects is more than the memory being used > by my objects right now. I could probably batch my messages further, to > make this effect less pronounced. I did some rather unscientific > experiments with a flyweight approach on top of the ByteBuffer for a simple > messaging API (peer to peer NIO based so not a real comparison) and the > numbers were very satisfactory and there is no garbage created in steady > state at all. Though I don't expect such good numbers from actually going > through the broker + all the other extra stuff that a real producer would > do, I think there is great potential here. > > The general mechanism for me is this: > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar > performance) is created per partition. > ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() in > Java 8 can be used to claim a chunk of bytes on the per topic buffer. This > code can be invoked from multiple threads in a wait free manner (wait-free > in Java 8, since getAndAddInt() is wait-free). Once a region in the buffer > is claimed, it can be operated on using the flyweight method that we talked > about. If the buffer doesn't have enough space then we can drop the message > or move onto a new buffer. Further this creates absolutely zero objects in > steady state (only a few objects created in the beginning). Even if the > flyweight method is not desired, the API can just take byte arrays or > objects that need to be serialized and copy them onto the per topic buffers > in a similar way. This API has been validated in Aeron too, so I am pretty > confident that it will work well. For the zero copy technique here is a > link to Aeron API with zero copy - > https://github.com/real-logic/Aeron/issues/18. The regular one copies byte > arrays but without any object creation. > iii) The producer send thread can then just go in FIFO order through the > buffer sending messages that have been committed using NIO to rotate > between brokers. We might need a background thread to zero out used buffers > too. > > I've left out some details, but again none of this very revolutionary - > it's mostly the same techniques used in Aeron. I really think that we can > keep the API ga rbage free and wait-free (even in the multi producer case) > without compromising how pretty it looks - the total zero copy API will low > level, but it should only be used by advanced users. Moreover the usual > producer.send(msg, topic, partition) can use the efficient ByteBuffer > offset API internally without it itself creating any garbage. With the > technique I talked about there is no need for an intermediate queue of any > kind since the underlying ByteBuffer per partition acts as the queue. > > I can do more experiments with some real producer code instead of my toy > code to further validate the idea, but I am pretty sure that both > throughput and jitter characteristics will improve thanks to lower > contention (wait-free in java 8 with a single getAndAddInt() operation for > sync ) and better cache locality (C like buffers and a few constant number > of objects per partition). If you guys are interested, I'd love to talk > more. Again just to reiterate, I don't think the API will suffer at all - > most of this can be done under the covers. Additionally it will open up > things so that a low level zero copy API is possible. > > Thanks, > Rajiv > -- -- Guozhang
Re: Kafka sending messages with zero copy
Resuscitating this thread. I've done some more experiments and profiling. My messages are very tiny (currently 25 bytes) per message and creating multiple objects per message leads to a lot of churn. The memory churn through creation of convenience objects is more than the memory being used by my objects right now. I could probably batch my messages further, to make this effect less pronounced. I did some rather unscientific experiments with a flyweight approach on top of the ByteBuffer for a simple messaging API (peer to peer NIO based so not a real comparison) and the numbers were very satisfactory and there is no garbage created in steady state at all. Though I don't expect such good numbers from actually going through the broker + all the other extra stuff that a real producer would do, I think there is great potential here. The general mechanism for me is this: i) A buffer (I used Unsafe but I imagine ByteBuffer having similar performance) is created per partition. ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() in Java 8 can be used to claim a chunk of bytes on the per topic buffer. This code can be invoked from multiple threads in a wait free manner (wait-free in Java 8, since getAndAddInt() is wait-free). Once a region in the buffer is claimed, it can be operated on using the flyweight method that we talked about. If the buffer doesn't have enough space then we can drop the message or move onto a new buffer. Further this creates absolutely zero objects in steady state (only a few objects created in the beginning). Even if the flyweight method is not desired, the API can just take byte arrays or objects that need to be serialized and copy them onto the per topic buffers in a similar way. This API has been validated in Aeron too, so I am pretty confident that it will work well. For the zero copy technique here is a link to Aeron API with zero copy - https://github.com/real-logic/Aeron/issues/18. The regular one copies byte arrays but without any object creation. iii) The producer send thread can then just go in FIFO order through the buffer sending messages that have been committed using NIO to rotate between brokers. We might need a background thread to zero out used buffers too. I've left out some details, but again none of this very revolutionary - it's mostly the same techniques used in Aeron. I really think that we can keep the API ga rbage free and wait-free (even in the multi producer case) without compromising how pretty it looks - the total zero copy API will low level, but it should only be used by advanced users. Moreover the usual producer.send(msg, topic, partition) can use the efficient ByteBuffer offset API internally without it itself creating any garbage. With the technique I talked about there is no need for an intermediate queue of any kind since the underlying ByteBuffer per partition acts as the queue. I can do more experiments with some real producer code instead of my toy code to further validate the idea, but I am pretty sure that both throughput and jitter characteristics will improve thanks to lower contention (wait-free in java 8 with a single getAndAddInt() operation for sync ) and better cache locality (C like buffers and a few constant number of objects per partition). If you guys are interested, I'd love to talk more. Again just to reiterate, I don't think the API will suffer at all - most of this can be done under the covers. Additionally it will open up things so that a low level zero copy API is possible. Thanks, Rajiv
Re: Kafka sending messages with zero copy
Yeah this is the org.apache.kafka.producer client in the clients/ directory. -Jay On Fri, Oct 24, 2014 at 9:54 AM, Rajiv Kurian wrote: > Thanks! > > On Fri, Oct 24, 2014 at 9:03 AM, Guozhang Wang wrote: > > > I think 0.8.2 already used the new producer as the standard client. > > > > Guozhang > > > > On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian > > wrote: > > > > > Thanks I'll take a look at both. Just to be sure we are talking about > > > client version 0.82 right? > > > > > > > > > > > > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang > > wrote: > > > > > > > Rajiv, > > > > > > > > The new producer does maintain a buffer per partition, but you need > to > > > > consider synchronizing the access to the buffer since it can take > data > > > from > > > > multiple caller threads. I think Jay's suggestion 1) does the same > > thing > > > > for your purpose if you already have the data buffer storing your > data: > > > by > > > > creating a ProduceRecord it would not incur copying the data into a > > > > temporary buffer, but instead mark the offset / length of in the byte > > > > buffer that user specifies, the when producer.send() is called the > > > > underlying buffer is copied to the producer buffer. If you do not > have > > > the > > > > data buffer but just want to write directly to the producer buffer > that > > > is > > > > one step further. > > > > > > > > As for the consumer, you can take a look at the MemoryRecord's > iterator > > > > implementation, which I think already implements what you want. > > > > > > > > Guozhang > > > > > > > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian > > > > wrote: > > > > > > > > > I want to avoid allocations since I am using Java in a C mode. Even > > > > though > > > > > creating objects is a mere thread local pointer bump in Java, > freeing > > > > them > > > > > is not so cheap and causes uncontrollable jitter. The second > > > motivation > > > > is > > > > > to avoid copying of data. Since I have objects which really look > > like C > > > > > structs that can be sent over the wire it's most efficient for me > to > > > > write > > > > > them out in the very exact buffer that will be sent over the wire. > > > > > > > > > > As for the bad API I completely agree - it is a very C style API > and > > > > > definitely not usable in a productive way by most developers. My > > point > > > > was > > > > > that this work is done by the protocol handling layer in any case, > > > maybe > > > > it > > > > > can be extended to allow a user access to it's internals in a safe > > way > > > > both > > > > > during writing and reading. The present API then can be written as > a > > > > layer > > > > > over this "ugly" non allocating API. > > > > > > > > > > Re (1) and (2). Instead of giving out keys, values as bytes which > > > implies > > > > > copies, I'd ideally like to scribble them straight into the buffer > > that > > > > you > > > > > are accumulating data onto before sending it. I am guessing you > > already > > > > > need a single buffer per partition or you have a single buffer per > > > > broker. > > > > > All of this probably implies a single threaded producer where I can > > be > > > in > > > > > charge of the event loop. > > > > > > > > > > Right now my data is within ByteBuffer/Unsafe buffer based data > > > > structures. > > > > > They can be put on the wire without any serialization step if I was > > > using > > > > > Java NIO. Similarly they can be consumed on the other side without > > any > > > > > deserialization step. But with the current kafka API I have to: > > > > > i) Copy data from my ByteBuffers onto new byte arrays. > > > > > ii) Wrap byte arrays from (i) in a new object. I can't even re-use > > > this > > > > > object since I don't know when kafka's send thread/serialization > > thread > > > > is > > > > > really done with it. > > > > > iii) Write an encoder that just takes the byte array from this > > wrapper > > > > > object and hands it to Kafka. > > > > > > > > > > Similarly on the consumer: > > > > > i) Kafka will make copies of slices (representing user values) of > > the > > > > > ByteBuffer that was transferred from a broker into byte arrays. > > > > > ii) Allocate an object (using the decoder) that wraps these byte > > > arrays > > > > > and hand them to me. > > > > > > > > > > My imaginary (admittedly non-java-esque manual allocation style) > API > > > > would > > > > > give me a pointer to Kafka's ByteBuffer that it has been > accumulating > > > > > protocol messages on for either writing (on producer) or reading > (on > > > > > consumer). I know it's a long shot but I still wanted to get the > > team's > > > > > thoughts on it. I'd be happy to contribute if we can come to an > > > agreement > > > > > on the API design. My hypothesis is that if the internal protocol > > > parsing > > > > > and buffer creation logic is written like this, it wouldn't be too > > > tough > > > > to > > > > > expose it's innards and have the
Re: Kafka sending messages with zero copy
Thanks! On Fri, Oct 24, 2014 at 9:03 AM, Guozhang Wang wrote: > I think 0.8.2 already used the new producer as the standard client. > > Guozhang > > On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian > wrote: > > > Thanks I'll take a look at both. Just to be sure we are talking about > > client version 0.82 right? > > > > > > > > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang > wrote: > > > > > Rajiv, > > > > > > The new producer does maintain a buffer per partition, but you need to > > > consider synchronizing the access to the buffer since it can take data > > from > > > multiple caller threads. I think Jay's suggestion 1) does the same > thing > > > for your purpose if you already have the data buffer storing your data: > > by > > > creating a ProduceRecord it would not incur copying the data into a > > > temporary buffer, but instead mark the offset / length of in the byte > > > buffer that user specifies, the when producer.send() is called the > > > underlying buffer is copied to the producer buffer. If you do not have > > the > > > data buffer but just want to write directly to the producer buffer that > > is > > > one step further. > > > > > > As for the consumer, you can take a look at the MemoryRecord's iterator > > > implementation, which I think already implements what you want. > > > > > > Guozhang > > > > > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian > > > wrote: > > > > > > > I want to avoid allocations since I am using Java in a C mode. Even > > > though > > > > creating objects is a mere thread local pointer bump in Java, freeing > > > them > > > > is not so cheap and causes uncontrollable jitter. The second > > motivation > > > is > > > > to avoid copying of data. Since I have objects which really look > like C > > > > structs that can be sent over the wire it's most efficient for me to > > > write > > > > them out in the very exact buffer that will be sent over the wire. > > > > > > > > As for the bad API I completely agree - it is a very C style API and > > > > definitely not usable in a productive way by most developers. My > point > > > was > > > > that this work is done by the protocol handling layer in any case, > > maybe > > > it > > > > can be extended to allow a user access to it's internals in a safe > way > > > both > > > > during writing and reading. The present API then can be written as a > > > layer > > > > over this "ugly" non allocating API. > > > > > > > > Re (1) and (2). Instead of giving out keys, values as bytes which > > implies > > > > copies, I'd ideally like to scribble them straight into the buffer > that > > > you > > > > are accumulating data onto before sending it. I am guessing you > already > > > > need a single buffer per partition or you have a single buffer per > > > broker. > > > > All of this probably implies a single threaded producer where I can > be > > in > > > > charge of the event loop. > > > > > > > > Right now my data is within ByteBuffer/Unsafe buffer based data > > > structures. > > > > They can be put on the wire without any serialization step if I was > > using > > > > Java NIO. Similarly they can be consumed on the other side without > any > > > > deserialization step. But with the current kafka API I have to: > > > > i) Copy data from my ByteBuffers onto new byte arrays. > > > > ii) Wrap byte arrays from (i) in a new object. I can't even re-use > > this > > > > object since I don't know when kafka's send thread/serialization > thread > > > is > > > > really done with it. > > > > iii) Write an encoder that just takes the byte array from this > wrapper > > > > object and hands it to Kafka. > > > > > > > > Similarly on the consumer: > > > > i) Kafka will make copies of slices (representing user values) of > the > > > > ByteBuffer that was transferred from a broker into byte arrays. > > > > ii) Allocate an object (using the decoder) that wraps these byte > > arrays > > > > and hand them to me. > > > > > > > > My imaginary (admittedly non-java-esque manual allocation style) API > > > would > > > > give me a pointer to Kafka's ByteBuffer that it has been accumulating > > > > protocol messages on for either writing (on producer) or reading (on > > > > consumer). I know it's a long shot but I still wanted to get the > team's > > > > thoughts on it. I'd be happy to contribute if we can come to an > > agreement > > > > on the API design. My hypothesis is that if the internal protocol > > parsing > > > > and buffer creation logic is written like this, it wouldn't be too > > tough > > > to > > > > expose it's innards and have the current encoding/decoding APIs just > > use > > > > this low level API/ > > > > > > > > Thanks for listening to my rant. > > > > > > > > > > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps > > wrote: > > > > > > > > > It sounds like you are primarily interested in optimizing the > > producer? > > > > > > > > > > There is no way to produce data without any allocation being done > > and I > > > > > think getting t
Re: Kafka sending messages with zero copy
I think 0.8.2 already used the new producer as the standard client. Guozhang On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian wrote: > Thanks I'll take a look at both. Just to be sure we are talking about > client version 0.82 right? > > > > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang wrote: > > > Rajiv, > > > > The new producer does maintain a buffer per partition, but you need to > > consider synchronizing the access to the buffer since it can take data > from > > multiple caller threads. I think Jay's suggestion 1) does the same thing > > for your purpose if you already have the data buffer storing your data: > by > > creating a ProduceRecord it would not incur copying the data into a > > temporary buffer, but instead mark the offset / length of in the byte > > buffer that user specifies, the when producer.send() is called the > > underlying buffer is copied to the producer buffer. If you do not have > the > > data buffer but just want to write directly to the producer buffer that > is > > one step further. > > > > As for the consumer, you can take a look at the MemoryRecord's iterator > > implementation, which I think already implements what you want. > > > > Guozhang > > > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian > > wrote: > > > > > I want to avoid allocations since I am using Java in a C mode. Even > > though > > > creating objects is a mere thread local pointer bump in Java, freeing > > them > > > is not so cheap and causes uncontrollable jitter. The second > motivation > > is > > > to avoid copying of data. Since I have objects which really look like C > > > structs that can be sent over the wire it's most efficient for me to > > write > > > them out in the very exact buffer that will be sent over the wire. > > > > > > As for the bad API I completely agree - it is a very C style API and > > > definitely not usable in a productive way by most developers. My point > > was > > > that this work is done by the protocol handling layer in any case, > maybe > > it > > > can be extended to allow a user access to it's internals in a safe way > > both > > > during writing and reading. The present API then can be written as a > > layer > > > over this "ugly" non allocating API. > > > > > > Re (1) and (2). Instead of giving out keys, values as bytes which > implies > > > copies, I'd ideally like to scribble them straight into the buffer that > > you > > > are accumulating data onto before sending it. I am guessing you already > > > need a single buffer per partition or you have a single buffer per > > broker. > > > All of this probably implies a single threaded producer where I can be > in > > > charge of the event loop. > > > > > > Right now my data is within ByteBuffer/Unsafe buffer based data > > structures. > > > They can be put on the wire without any serialization step if I was > using > > > Java NIO. Similarly they can be consumed on the other side without any > > > deserialization step. But with the current kafka API I have to: > > > i) Copy data from my ByteBuffers onto new byte arrays. > > > ii) Wrap byte arrays from (i) in a new object. I can't even re-use > this > > > object since I don't know when kafka's send thread/serialization thread > > is > > > really done with it. > > > iii) Write an encoder that just takes the byte array from this wrapper > > > object and hands it to Kafka. > > > > > > Similarly on the consumer: > > > i) Kafka will make copies of slices (representing user values) of the > > > ByteBuffer that was transferred from a broker into byte arrays. > > > ii) Allocate an object (using the decoder) that wraps these byte > arrays > > > and hand them to me. > > > > > > My imaginary (admittedly non-java-esque manual allocation style) API > > would > > > give me a pointer to Kafka's ByteBuffer that it has been accumulating > > > protocol messages on for either writing (on producer) or reading (on > > > consumer). I know it's a long shot but I still wanted to get the team's > > > thoughts on it. I'd be happy to contribute if we can come to an > agreement > > > on the API design. My hypothesis is that if the internal protocol > parsing > > > and buffer creation logic is written like this, it wouldn't be too > tough > > to > > > expose it's innards and have the current encoding/decoding APIs just > use > > > this low level API/ > > > > > > Thanks for listening to my rant. > > > > > > > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps > wrote: > > > > > > > It sounds like you are primarily interested in optimizing the > producer? > > > > > > > > There is no way to produce data without any allocation being done > and I > > > > think getting to that would be pretty hard and lead to bad apis, but > > > > avoiding memory allocation entirely shouldn't be necessary. Small > > > transient > > > > objects in java are pretty cheap to allocate and deallocate. The new > > > Kafka > > > > producer API that is on trunk and will be in 0.8.2 is much more > > > disciplined > > > > in it
Re: Kafka sending messages with zero copy
Thanks I'll take a look at both. Just to be sure we are talking about client version 0.82 right? On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang wrote: > Rajiv, > > The new producer does maintain a buffer per partition, but you need to > consider synchronizing the access to the buffer since it can take data from > multiple caller threads. I think Jay's suggestion 1) does the same thing > for your purpose if you already have the data buffer storing your data: by > creating a ProduceRecord it would not incur copying the data into a > temporary buffer, but instead mark the offset / length of in the byte > buffer that user specifies, the when producer.send() is called the > underlying buffer is copied to the producer buffer. If you do not have the > data buffer but just want to write directly to the producer buffer that is > one step further. > > As for the consumer, you can take a look at the MemoryRecord's iterator > implementation, which I think already implements what you want. > > Guozhang > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian > wrote: > > > I want to avoid allocations since I am using Java in a C mode. Even > though > > creating objects is a mere thread local pointer bump in Java, freeing > them > > is not so cheap and causes uncontrollable jitter. The second motivation > is > > to avoid copying of data. Since I have objects which really look like C > > structs that can be sent over the wire it's most efficient for me to > write > > them out in the very exact buffer that will be sent over the wire. > > > > As for the bad API I completely agree - it is a very C style API and > > definitely not usable in a productive way by most developers. My point > was > > that this work is done by the protocol handling layer in any case, maybe > it > > can be extended to allow a user access to it's internals in a safe way > both > > during writing and reading. The present API then can be written as a > layer > > over this "ugly" non allocating API. > > > > Re (1) and (2). Instead of giving out keys, values as bytes which implies > > copies, I'd ideally like to scribble them straight into the buffer that > you > > are accumulating data onto before sending it. I am guessing you already > > need a single buffer per partition or you have a single buffer per > broker. > > All of this probably implies a single threaded producer where I can be in > > charge of the event loop. > > > > Right now my data is within ByteBuffer/Unsafe buffer based data > structures. > > They can be put on the wire without any serialization step if I was using > > Java NIO. Similarly they can be consumed on the other side without any > > deserialization step. But with the current kafka API I have to: > > i) Copy data from my ByteBuffers onto new byte arrays. > > ii) Wrap byte arrays from (i) in a new object. I can't even re-use this > > object since I don't know when kafka's send thread/serialization thread > is > > really done with it. > > iii) Write an encoder that just takes the byte array from this wrapper > > object and hands it to Kafka. > > > > Similarly on the consumer: > > i) Kafka will make copies of slices (representing user values) of the > > ByteBuffer that was transferred from a broker into byte arrays. > > ii) Allocate an object (using the decoder) that wraps these byte arrays > > and hand them to me. > > > > My imaginary (admittedly non-java-esque manual allocation style) API > would > > give me a pointer to Kafka's ByteBuffer that it has been accumulating > > protocol messages on for either writing (on producer) or reading (on > > consumer). I know it's a long shot but I still wanted to get the team's > > thoughts on it. I'd be happy to contribute if we can come to an agreement > > on the API design. My hypothesis is that if the internal protocol parsing > > and buffer creation logic is written like this, it wouldn't be too tough > to > > expose it's innards and have the current encoding/decoding APIs just use > > this low level API/ > > > > Thanks for listening to my rant. > > > > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps wrote: > > > > > It sounds like you are primarily interested in optimizing the producer? > > > > > > There is no way to produce data without any allocation being done and I > > > think getting to that would be pretty hard and lead to bad apis, but > > > avoiding memory allocation entirely shouldn't be necessary. Small > > transient > > > objects in java are pretty cheap to allocate and deallocate. The new > > Kafka > > > producer API that is on trunk and will be in 0.8.2 is much more > > disciplined > > > in it's usage of memory though there is still some allocation. The goal > > is > > > to avoid copying the *data* multiple times, even if we do end up > creating > > > some small helper objects along the way (the idea is that the data may > be > > > rather large). > > > > > > If you wanted to further optimize the new producer there are two things > > > that could be done that would
Re: Kafka sending messages with zero copy
Rajiv, The new producer does maintain a buffer per partition, but you need to consider synchronizing the access to the buffer since it can take data from multiple caller threads. I think Jay's suggestion 1) does the same thing for your purpose if you already have the data buffer storing your data: by creating a ProduceRecord it would not incur copying the data into a temporary buffer, but instead mark the offset / length of in the byte buffer that user specifies, the when producer.send() is called the underlying buffer is copied to the producer buffer. If you do not have the data buffer but just want to write directly to the producer buffer that is one step further. As for the consumer, you can take a look at the MemoryRecord's iterator implementation, which I think already implements what you want. Guozhang On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian wrote: > I want to avoid allocations since I am using Java in a C mode. Even though > creating objects is a mere thread local pointer bump in Java, freeing them > is not so cheap and causes uncontrollable jitter. The second motivation is > to avoid copying of data. Since I have objects which really look like C > structs that can be sent over the wire it's most efficient for me to write > them out in the very exact buffer that will be sent over the wire. > > As for the bad API I completely agree - it is a very C style API and > definitely not usable in a productive way by most developers. My point was > that this work is done by the protocol handling layer in any case, maybe it > can be extended to allow a user access to it's internals in a safe way both > during writing and reading. The present API then can be written as a layer > over this "ugly" non allocating API. > > Re (1) and (2). Instead of giving out keys, values as bytes which implies > copies, I'd ideally like to scribble them straight into the buffer that you > are accumulating data onto before sending it. I am guessing you already > need a single buffer per partition or you have a single buffer per broker. > All of this probably implies a single threaded producer where I can be in > charge of the event loop. > > Right now my data is within ByteBuffer/Unsafe buffer based data structures. > They can be put on the wire without any serialization step if I was using > Java NIO. Similarly they can be consumed on the other side without any > deserialization step. But with the current kafka API I have to: > i) Copy data from my ByteBuffers onto new byte arrays. > ii) Wrap byte arrays from (i) in a new object. I can't even re-use this > object since I don't know when kafka's send thread/serialization thread is > really done with it. > iii) Write an encoder that just takes the byte array from this wrapper > object and hands it to Kafka. > > Similarly on the consumer: > i) Kafka will make copies of slices (representing user values) of the > ByteBuffer that was transferred from a broker into byte arrays. > ii) Allocate an object (using the decoder) that wraps these byte arrays > and hand them to me. > > My imaginary (admittedly non-java-esque manual allocation style) API would > give me a pointer to Kafka's ByteBuffer that it has been accumulating > protocol messages on for either writing (on producer) or reading (on > consumer). I know it's a long shot but I still wanted to get the team's > thoughts on it. I'd be happy to contribute if we can come to an agreement > on the API design. My hypothesis is that if the internal protocol parsing > and buffer creation logic is written like this, it wouldn't be too tough to > expose it's innards and have the current encoding/decoding APIs just use > this low level API/ > > Thanks for listening to my rant. > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps wrote: > > > It sounds like you are primarily interested in optimizing the producer? > > > > There is no way to produce data without any allocation being done and I > > think getting to that would be pretty hard and lead to bad apis, but > > avoiding memory allocation entirely shouldn't be necessary. Small > transient > > objects in java are pretty cheap to allocate and deallocate. The new > Kafka > > producer API that is on trunk and will be in 0.8.2 is much more > disciplined > > in it's usage of memory though there is still some allocation. The goal > is > > to avoid copying the *data* multiple times, even if we do end up creating > > some small helper objects along the way (the idea is that the data may be > > rather large). > > > > If you wanted to further optimize the new producer there are two things > > that could be done that would help: > > 1. Avoid the copy when creating the ProducerRecord instance. This could > be > > done by accepting a length/offset along with the key and value and making > > use of this when writing to the records instance. As it is your key and > > value need to be complete byte arrays. > > 2. Avoid the copy during request serialization. This is a little > trickier. >
Re: Kafka sending messages with zero copy
I want to avoid allocations since I am using Java in a C mode. Even though creating objects is a mere thread local pointer bump in Java, freeing them is not so cheap and causes uncontrollable jitter. The second motivation is to avoid copying of data. Since I have objects which really look like C structs that can be sent over the wire it's most efficient for me to write them out in the very exact buffer that will be sent over the wire. As for the bad API I completely agree - it is a very C style API and definitely not usable in a productive way by most developers. My point was that this work is done by the protocol handling layer in any case, maybe it can be extended to allow a user access to it's internals in a safe way both during writing and reading. The present API then can be written as a layer over this "ugly" non allocating API. Re (1) and (2). Instead of giving out keys, values as bytes which implies copies, I'd ideally like to scribble them straight into the buffer that you are accumulating data onto before sending it. I am guessing you already need a single buffer per partition or you have a single buffer per broker. All of this probably implies a single threaded producer where I can be in charge of the event loop. Right now my data is within ByteBuffer/Unsafe buffer based data structures. They can be put on the wire without any serialization step if I was using Java NIO. Similarly they can be consumed on the other side without any deserialization step. But with the current kafka API I have to: i) Copy data from my ByteBuffers onto new byte arrays. ii) Wrap byte arrays from (i) in a new object. I can't even re-use this object since I don't know when kafka's send thread/serialization thread is really done with it. iii) Write an encoder that just takes the byte array from this wrapper object and hands it to Kafka. Similarly on the consumer: i) Kafka will make copies of slices (representing user values) of the ByteBuffer that was transferred from a broker into byte arrays. ii) Allocate an object (using the decoder) that wraps these byte arrays and hand them to me. My imaginary (admittedly non-java-esque manual allocation style) API would give me a pointer to Kafka's ByteBuffer that it has been accumulating protocol messages on for either writing (on producer) or reading (on consumer). I know it's a long shot but I still wanted to get the team's thoughts on it. I'd be happy to contribute if we can come to an agreement on the API design. My hypothesis is that if the internal protocol parsing and buffer creation logic is written like this, it wouldn't be too tough to expose it's innards and have the current encoding/decoding APIs just use this low level API/ Thanks for listening to my rant. On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps wrote: > It sounds like you are primarily interested in optimizing the producer? > > There is no way to produce data without any allocation being done and I > think getting to that would be pretty hard and lead to bad apis, but > avoiding memory allocation entirely shouldn't be necessary. Small transient > objects in java are pretty cheap to allocate and deallocate. The new Kafka > producer API that is on trunk and will be in 0.8.2 is much more disciplined > in it's usage of memory though there is still some allocation. The goal is > to avoid copying the *data* multiple times, even if we do end up creating > some small helper objects along the way (the idea is that the data may be > rather large). > > If you wanted to further optimize the new producer there are two things > that could be done that would help: > 1. Avoid the copy when creating the ProducerRecord instance. This could be > done by accepting a length/offset along with the key and value and making > use of this when writing to the records instance. As it is your key and > value need to be complete byte arrays. > 2. Avoid the copy during request serialization. This is a little trickier. > During request serialization we need to take the records for each partition > and create a request that contains all of them. It is possible to do this > with no further recopying of data but somewhat tricky. > > My recommendation would be to try the new producer api and see how that > goes. If you need to optimize further we would definitely take patches for > (1) and (2). > > -Jay > > On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian > wrote: > > > I have a flyweight style protocol that I use for my messages. Thus they > > require no serialization/deserialization to be processed. The messages > are > > just offset, length pairs within a ByteBuffer. > > > > Is there a producer and consumer API that forgoes allocation? I just want > > to give the kakfa producer offsets from a ByteBuffer. Similarly it would > be > > ideal if I could get a ByteBuffer and offsets into it from the consumer. > > Even if I could get byte arrays (implying a copy but no decoding phase) > on > > the consumer that would be great. Right now
Re: Kafka sending messages with zero copy
My use case is that though I am using Java, I want to work in as close to zero garbage environment. All my internal data structures are based on ByteBuffers or buffers allocated using Unsafe. Thus my objects are already in a state where they can be transmitted without any serialization step i.e. my objects are just flyweights over buffers in the Cap'nProto or SBE style. The current API is forcing me to allocate to send my already serialized objects. I'd imagine something like this on a single threaded producer: // My message has a single int, double and long. public static final int SIZE_PER_OBJECT = 4 + 8 + 8; // int + double + long. KafkaPartition partition = kafkaProducer.getPartition("Topic", partitionNumber); KafkaBuffer kafkaBuffer = partition.getBuffer(); // I am imagining a single buffer per partition. ByteBuffer buffer = ByteBuffer.allocate(0); // Set the appropriate headers based on the kafka protocol in the buffer and ensure that there is space in the buffer for a payload of SIZE_PER_OBJECT. kafkaBuffer.getNextFlyweight(SIZE_PER_OBJECT, buffer); // Just set the address of the input bytebuffer to the right pointer in kafka's protocol Bytebuffer and the capacity to SIZE_PER_OBJECT. One could use reflection to set it's capacity and address. // I treat it like a byte buffer getting appropriate exceptions if I try writing past the end marker. buffer.writeInt(1); buffer.writeDouble(2.0); buffer.writeLong(5L); kafkaBuffer.commit(); // I tell kafka that I am done with writing my object. I.e. it can calculate the CRC32 etc of the latest write. partition.flush(); // Potentially flush to the broker. Kafka probably has to layout a ByteBuffer encoded as per it's protocol. Right now it probably takes objects and serializes them into that ByteBuffer. It would be great as an advanced api to get an offset into that ByteBuffer and use that as a space to write my message there. This can be an API that is used by the more simple encoding producer behind the scenes. The consumer API can similarly allow one to re-use a flyweight object to read messages. For examples: ByteBuffer myBuffer = ByteBuffer.allocate(0).asReadOnlyBuffer(); // To ensure I cannot scribble into the internal protocol bytes. while (true) { KafkaBuffer kafkaBuffer = consumer.poll(timeout); while (kafkaBuffer.hasRemainingMessages()) { kafkaBuffer.getNextMsgIntoBuffer(myBuffer); // Sets mybuffer to point at the right offset in the kafka ByteBuffer where the message starts. myBuffer.getInt(); // 1 myBuffer.getDouble(); // 2.0 myBuffer.getLong(); '' 5L } // Done with processing messages from this set of buffers. } I know this is probably a very rare use case, but single threaded producers (the consumer in 0.9 already seems single threaded which is great!) and an API for ByteBuffer manipulation would allow it to work extremely well in high throughput environments where avoiding garbage creation/collection is really important. Thanks! On Thu, Oct 23, 2014 at 5:03 PM, Guozhang Wang wrote: > Rajiv, > > Could you let me know your use case? Are you sending a very large file and > hence would prefer streaming manner instead of messages? > > Guozhang > > On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian > wrote: > > > I have a flyweight style protocol that I use for my messages. Thus they > > require no serialization/deserialization to be processed. The messages > are > > just offset, length pairs within a ByteBuffer. > > > > Is there a producer and consumer API that forgoes allocation? I just want > > to give the kakfa producer offsets from a ByteBuffer. Similarly it would > be > > ideal if I could get a ByteBuffer and offsets into it from the consumer. > > Even if I could get byte arrays (implying a copy but no decoding phase) > on > > the consumer that would be great. Right now it seems to me that the only > > way to get messages from Kafka is through a message object, which implies > > Kafka allocates these messages all the time. I am willing to use the > > upcoming 0.9 API too. > > > > Thanks. > > > > > > -- > -- Guozhang >
Re: Kafka sending messages with zero copy
It sounds like you are primarily interested in optimizing the producer? There is no way to produce data without any allocation being done and I think getting to that would be pretty hard and lead to bad apis, but avoiding memory allocation entirely shouldn't be necessary. Small transient objects in java are pretty cheap to allocate and deallocate. The new Kafka producer API that is on trunk and will be in 0.8.2 is much more disciplined in it's usage of memory though there is still some allocation. The goal is to avoid copying the *data* multiple times, even if we do end up creating some small helper objects along the way (the idea is that the data may be rather large). If you wanted to further optimize the new producer there are two things that could be done that would help: 1. Avoid the copy when creating the ProducerRecord instance. This could be done by accepting a length/offset along with the key and value and making use of this when writing to the records instance. As it is your key and value need to be complete byte arrays. 2. Avoid the copy during request serialization. This is a little trickier. During request serialization we need to take the records for each partition and create a request that contains all of them. It is possible to do this with no further recopying of data but somewhat tricky. My recommendation would be to try the new producer api and see how that goes. If you need to optimize further we would definitely take patches for (1) and (2). -Jay On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian wrote: > I have a flyweight style protocol that I use for my messages. Thus they > require no serialization/deserialization to be processed. The messages are > just offset, length pairs within a ByteBuffer. > > Is there a producer and consumer API that forgoes allocation? I just want > to give the kakfa producer offsets from a ByteBuffer. Similarly it would be > ideal if I could get a ByteBuffer and offsets into it from the consumer. > Even if I could get byte arrays (implying a copy but no decoding phase) on > the consumer that would be great. Right now it seems to me that the only > way to get messages from Kafka is through a message object, which implies > Kafka allocates these messages all the time. I am willing to use the > upcoming 0.9 API too. > > Thanks. >
Re: Kafka sending messages with zero copy
Rajiv, Could you let me know your use case? Are you sending a very large file and hence would prefer streaming manner instead of messages? Guozhang On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian wrote: > I have a flyweight style protocol that I use for my messages. Thus they > require no serialization/deserialization to be processed. The messages are > just offset, length pairs within a ByteBuffer. > > Is there a producer and consumer API that forgoes allocation? I just want > to give the kakfa producer offsets from a ByteBuffer. Similarly it would be > ideal if I could get a ByteBuffer and offsets into it from the consumer. > Even if I could get byte arrays (implying a copy but no decoding phase) on > the consumer that would be great. Right now it seems to me that the only > way to get messages from Kafka is through a message object, which implies > Kafka allocates these messages all the time. I am willing to use the > upcoming 0.9 API too. > > Thanks. > -- -- Guozhang