Hi Guozhang, I am a bit busy at work. When I get the change I'll definitely try to get a proof of concept going. Not the kafka protocol, but just the buffering and threading structures, maybe just write to another socket. I think it would be useful just to get the queueing and buffer management going and prove that it can be done in a zero copy way in a multi producer single consumer environment. If that is working, then the single consumer can be the kafka network sync thread.
Thanks, Rajiv On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Rajiv, > > Thanks for this proposal, it would be great if you can upload some > implementation patch for the CAS idea and show some memory usage / perf > differences. > > Guozhang > > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > Resuscitating this thread. I've done some more experiments and profiling. > > My messages are very tiny (currently 25 bytes) per message and creating > > multiple objects per message leads to a lot of churn. The memory churn > > through creation of convenience objects is more than the memory being > used > > by my objects right now. I could probably batch my messages further, to > > make this effect less pronounced. I did some rather unscientific > > experiments with a flyweight approach on top of the ByteBuffer for a > simple > > messaging API (peer to peer NIO based so not a real comparison) and the > > numbers were very satisfactory and there is no garbage created in steady > > state at all. Though I don't expect such good numbers from actually going > > through the broker + all the other extra stuff that a real producer would > > do, I think there is great potential here. > > > > The general mechanism for me is this: > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar > > performance) is created per partition. > > ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() > in > > Java 8 can be used to claim a chunk of bytes on the per topic buffer. > This > > code can be invoked from multiple threads in a wait free manner > (wait-free > > in Java 8, since getAndAddInt() is wait-free). Once a region in the > buffer > > is claimed, it can be operated on using the flyweight method that we > talked > > about. If the buffer doesn't have enough space then we can drop the > message > > or move onto a new buffer. Further this creates absolutely zero objects > in > > steady state (only a few objects created in the beginning). Even if the > > flyweight method is not desired, the API can just take byte arrays or > > objects that need to be serialized and copy them onto the per topic > buffers > > in a similar way. This API has been validated in Aeron too, so I am > pretty > > confident that it will work well. For the zero copy technique here is a > > link to Aeron API with zero copy - > > https://github.com/real-logic/Aeron/issues/18. The regular one copies > byte > > arrays but without any object creation. > > iii) The producer send thread can then just go in FIFO order through the > > buffer sending messages that have been committed using NIO to rotate > > between brokers. We might need a background thread to zero out used > buffers > > too. > > > > I've left out some details, but again none of this very revolutionary - > > it's mostly the same techniques used in Aeron. I really think that we can > > keep the API ga rbage free and wait-free (even in the multi producer > case) > > without compromising how pretty it looks - the total zero copy API will > low > > level, but it should only be used by advanced users. Moreover the usual > > producer.send(msg, topic, partition) can use the efficient ByteBuffer > > offset API internally without it itself creating any garbage. With the > > technique I talked about there is no need for an intermediate queue of > any > > kind since the underlying ByteBuffer per partition acts as the queue. > > > > I can do more experiments with some real producer code instead of my toy > > code to further validate the idea, but I am pretty sure that both > > throughput and jitter characteristics will improve thanks to lower > > contention (wait-free in java 8 with a single getAndAddInt() operation > for > > sync ) and better cache locality (C like buffers and a few constant > number > > of objects per partition). If you guys are interested, I'd love to talk > > more. Again just to reiterate, I don't think the API will suffer at all - > > most of this can be done under the covers. Additionally it will open up > > things so that a low level zero copy API is possible. > > > > Thanks, > > Rajiv > > > > > > -- > -- Guozhang >