Re: Kafka sending messages with zero copy

2015-01-27 Thread Guozhang Wang
Thanks Rajiv, looking forward to your prototype.

Guozhang

On Mon, Jan 26, 2015 at 2:30 PM, Rajiv Kurian  wrote:

> Hi Guozhang,
>
> I am a bit busy at work. When I get the change I'll definitely try to get a
> proof of concept going. Not the kafka protocol, but just the buffering and
> threading structures, maybe just write to another socket. I think it would
> be useful just to get the queueing and buffer management going and prove
> that it can be done in a zero copy way in a multi producer single consumer
> environment. If that is working, then the single consumer can be the kafka
> network sync thread.
>
> Thanks,
> Rajiv
>
> On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang 
> wrote:
>
> > Hi Rajiv,
> >
> > Thanks for this proposal, it would be great if you can upload some
> > implementation patch for the CAS idea and show some memory usage / perf
> > differences.
> >
> > Guozhang
> >
> > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian 
> > wrote:
> >
> > > Resuscitating this thread. I've done some more experiments and
> profiling.
> > > My messages are very tiny (currently 25 bytes) per message and creating
> > > multiple objects per message leads to a lot of churn. The memory churn
> > > through creation of convenience objects is more than the memory being
> > used
> > > by my objects right now. I could probably batch my messages further, to
> > > make this effect less pronounced.​ I did some rather unscientific
> > > experiments with a flyweight approach on top of the ByteBuffer for a
> > simple
> > > messaging API (peer to peer NIO based so not a real comparison) and the
> > > numbers were very satisfactory and there is no garbage created in
> steady
> > > state at all. Though I don't expect such good numbers from actually
> going
> > > through the broker + all the other extra stuff that a real producer
> would
> > > do, I think there is great potential here.
> > >
> > > The general mechanism for me is this:
> > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> > > performance) is created per partition.
> > > ii) A CAS loop (in Java 7 and less) or even better
> unsafe.getAndAddInt()
> > in
> > > Java 8 can be used to claim a chunk of bytes on the per topic buffer.
> > This
> > > code can be invoked from multiple threads in a wait free manner
> > (wait-free
> > > in Java 8, since getAndAddInt() is wait-free).  Once a region in the
> > buffer
> > > is claimed, it can be operated on using the flyweight method that we
> > talked
> > > about. If the buffer doesn't have enough space then we can drop the
> > message
> > > or move onto a new buffer. Further this creates absolutely zero objects
> > in
> > > steady state (only a few objects created in the beginning). Even if the
> > > flyweight method is not desired, the API can just take byte arrays or
> > > objects that need to be serialized and copy them onto the per topic
> > buffers
> > > in a similar way. This API has been validated in Aeron too, so I am
> > pretty
> > > confident that it will work well. For the zero copy technique here is a
> > > link to Aeron API with zero copy -
> > > https://github.com/real-logic/Aeron/issues/18. The regular one copies
> > byte
> > > arrays but without any object creation.
> > > iii) The producer send thread can then just go in FIFO order through
> the
> > > buffer sending messages that have been committed using NIO to rotate
> > > between brokers. We might need a background thread to zero out used
> > buffers
> > > too.
> > >
> > > I've left out some details, but again none of this very revolutionary -
> > > it's mostly the same techniques used in Aeron. I really think that we
> can
> > > keep the API ga rbage free and wait-free (even in the multi producer
> > case)
> > > without compromising how pretty it looks - the total zero copy API will
> > low
> > > level, but it should only be used by advanced users. Moreover the usual
> > > producer.send(msg, topic, partition) can use the efficient ByteBuffer
> > > offset API internally without it itself creating any garbage. With the
> > > technique I talked about there is no need for an intermediate queue of
> > any
> > > kind since the underlying ByteBuffer per partition acts as the queue.
> > >
> > > I can do more experiments with some real producer code instead of my
> toy
> > > code to further validate the idea, but I am pretty sure that both
> > > throughput and jitter characteristics will improve thanks to lower
> > > contention (wait-free in java 8 with a single getAndAddInt() operation
> > for
> > > sync ) and better cache locality (C like buffers and a few constant
> > number
> > > of objects per partition). If you guys are interested, I'd love to talk
> > > more. Again just to reiterate, I don't think the API will suffer at
> all -
> > > most of this can be done under the covers. Additionally it will open up
> > > things so that a low level zero copy API is possible.
> > >
> > > Thanks,
> > > Rajiv
> > >
> >
> >
> >
> > --
> > -- Guozhang
>

Re: Kafka sending messages with zero copy

2015-01-26 Thread Rajiv Kurian
Hi Guozhang,

I am a bit busy at work. When I get the change I'll definitely try to get a
proof of concept going. Not the kafka protocol, but just the buffering and
threading structures, maybe just write to another socket. I think it would
be useful just to get the queueing and buffer management going and prove
that it can be done in a zero copy way in a multi producer single consumer
environment. If that is working, then the single consumer can be the kafka
network sync thread.

Thanks,
Rajiv

On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang  wrote:

> Hi Rajiv,
>
> Thanks for this proposal, it would be great if you can upload some
> implementation patch for the CAS idea and show some memory usage / perf
> differences.
>
> Guozhang
>
> On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian 
> wrote:
>
> > Resuscitating this thread. I've done some more experiments and profiling.
> > My messages are very tiny (currently 25 bytes) per message and creating
> > multiple objects per message leads to a lot of churn. The memory churn
> > through creation of convenience objects is more than the memory being
> used
> > by my objects right now. I could probably batch my messages further, to
> > make this effect less pronounced.​ I did some rather unscientific
> > experiments with a flyweight approach on top of the ByteBuffer for a
> simple
> > messaging API (peer to peer NIO based so not a real comparison) and the
> > numbers were very satisfactory and there is no garbage created in steady
> > state at all. Though I don't expect such good numbers from actually going
> > through the broker + all the other extra stuff that a real producer would
> > do, I think there is great potential here.
> >
> > The general mechanism for me is this:
> > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> > performance) is created per partition.
> > ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt()
> in
> > Java 8 can be used to claim a chunk of bytes on the per topic buffer.
> This
> > code can be invoked from multiple threads in a wait free manner
> (wait-free
> > in Java 8, since getAndAddInt() is wait-free).  Once a region in the
> buffer
> > is claimed, it can be operated on using the flyweight method that we
> talked
> > about. If the buffer doesn't have enough space then we can drop the
> message
> > or move onto a new buffer. Further this creates absolutely zero objects
> in
> > steady state (only a few objects created in the beginning). Even if the
> > flyweight method is not desired, the API can just take byte arrays or
> > objects that need to be serialized and copy them onto the per topic
> buffers
> > in a similar way. This API has been validated in Aeron too, so I am
> pretty
> > confident that it will work well. For the zero copy technique here is a
> > link to Aeron API with zero copy -
> > https://github.com/real-logic/Aeron/issues/18. The regular one copies
> byte
> > arrays but without any object creation.
> > iii) The producer send thread can then just go in FIFO order through the
> > buffer sending messages that have been committed using NIO to rotate
> > between brokers. We might need a background thread to zero out used
> buffers
> > too.
> >
> > I've left out some details, but again none of this very revolutionary -
> > it's mostly the same techniques used in Aeron. I really think that we can
> > keep the API ga rbage free and wait-free (even in the multi producer
> case)
> > without compromising how pretty it looks - the total zero copy API will
> low
> > level, but it should only be used by advanced users. Moreover the usual
> > producer.send(msg, topic, partition) can use the efficient ByteBuffer
> > offset API internally without it itself creating any garbage. With the
> > technique I talked about there is no need for an intermediate queue of
> any
> > kind since the underlying ByteBuffer per partition acts as the queue.
> >
> > I can do more experiments with some real producer code instead of my toy
> > code to further validate the idea, but I am pretty sure that both
> > throughput and jitter characteristics will improve thanks to lower
> > contention (wait-free in java 8 with a single getAndAddInt() operation
> for
> > sync ) and better cache locality (C like buffers and a few constant
> number
> > of objects per partition). If you guys are interested, I'd love to talk
> > more. Again just to reiterate, I don't think the API will suffer at all -
> > most of this can be done under the covers. Additionally it will open up
> > things so that a low level zero copy API is possible.
> >
> > Thanks,
> > Rajiv
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka sending messages with zero copy

2015-01-16 Thread Guozhang Wang
Hi Rajiv,

Thanks for this proposal, it would be great if you can upload some
implementation patch for the CAS idea and show some memory usage / perf
differences.

Guozhang

On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian  wrote:

> Resuscitating this thread. I've done some more experiments and profiling.
> My messages are very tiny (currently 25 bytes) per message and creating
> multiple objects per message leads to a lot of churn. The memory churn
> through creation of convenience objects is more than the memory being used
> by my objects right now. I could probably batch my messages further, to
> make this effect less pronounced.​ I did some rather unscientific
> experiments with a flyweight approach on top of the ByteBuffer for a simple
> messaging API (peer to peer NIO based so not a real comparison) and the
> numbers were very satisfactory and there is no garbage created in steady
> state at all. Though I don't expect such good numbers from actually going
> through the broker + all the other extra stuff that a real producer would
> do, I think there is great potential here.
>
> The general mechanism for me is this:
> i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> performance) is created per partition.
> ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() in
> Java 8 can be used to claim a chunk of bytes on the per topic buffer. This
> code can be invoked from multiple threads in a wait free manner (wait-free
> in Java 8, since getAndAddInt() is wait-free).  Once a region in the buffer
> is claimed, it can be operated on using the flyweight method that we talked
> about. If the buffer doesn't have enough space then we can drop the message
> or move onto a new buffer. Further this creates absolutely zero objects in
> steady state (only a few objects created in the beginning). Even if the
> flyweight method is not desired, the API can just take byte arrays or
> objects that need to be serialized and copy them onto the per topic buffers
> in a similar way. This API has been validated in Aeron too, so I am pretty
> confident that it will work well. For the zero copy technique here is a
> link to Aeron API with zero copy -
> https://github.com/real-logic/Aeron/issues/18. The regular one copies byte
> arrays but without any object creation.
> iii) The producer send thread can then just go in FIFO order through the
> buffer sending messages that have been committed using NIO to rotate
> between brokers. We might need a background thread to zero out used buffers
> too.
>
> I've left out some details, but again none of this very revolutionary -
> it's mostly the same techniques used in Aeron. I really think that we can
> keep the API ga rbage free and wait-free (even in the multi producer case)
> without compromising how pretty it looks - the total zero copy API will low
> level, but it should only be used by advanced users. Moreover the usual
> producer.send(msg, topic, partition) can use the efficient ByteBuffer
> offset API internally without it itself creating any garbage. With the
> technique I talked about there is no need for an intermediate queue of any
> kind since the underlying ByteBuffer per partition acts as the queue.
>
> I can do more experiments with some real producer code instead of my toy
> code to further validate the idea, but I am pretty sure that both
> throughput and jitter characteristics will improve thanks to lower
> contention (wait-free in java 8 with a single getAndAddInt() operation for
> sync ) and better cache locality (C like buffers and a few constant number
> of objects per partition). If you guys are interested, I'd love to talk
> more. Again just to reiterate, I don't think the API will suffer at all -
> most of this can be done under the covers. Additionally it will open up
> things so that a low level zero copy API is possible.
>
> Thanks,
> Rajiv
>



-- 
-- Guozhang


Re: Kafka sending messages with zero copy

2014-12-14 Thread Rajiv Kurian
Resuscitating this thread. I've done some more experiments and profiling.
My messages are very tiny (currently 25 bytes) per message and creating
multiple objects per message leads to a lot of churn. The memory churn
through creation of convenience objects is more than the memory being used
by my objects right now. I could probably batch my messages further, to
make this effect less pronounced.​ I did some rather unscientific
experiments with a flyweight approach on top of the ByteBuffer for a simple
messaging API (peer to peer NIO based so not a real comparison) and the
numbers were very satisfactory and there is no garbage created in steady
state at all. Though I don't expect such good numbers from actually going
through the broker + all the other extra stuff that a real producer would
do, I think there is great potential here.

The general mechanism for me is this:
i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
performance) is created per partition.
ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() in
Java 8 can be used to claim a chunk of bytes on the per topic buffer. This
code can be invoked from multiple threads in a wait free manner (wait-free
in Java 8, since getAndAddInt() is wait-free).  Once a region in the buffer
is claimed, it can be operated on using the flyweight method that we talked
about. If the buffer doesn't have enough space then we can drop the message
or move onto a new buffer. Further this creates absolutely zero objects in
steady state (only a few objects created in the beginning). Even if the
flyweight method is not desired, the API can just take byte arrays or
objects that need to be serialized and copy them onto the per topic buffers
in a similar way. This API has been validated in Aeron too, so I am pretty
confident that it will work well. For the zero copy technique here is a
link to Aeron API with zero copy -
https://github.com/real-logic/Aeron/issues/18. The regular one copies byte
arrays but without any object creation.
iii) The producer send thread can then just go in FIFO order through the
buffer sending messages that have been committed using NIO to rotate
between brokers. We might need a background thread to zero out used buffers
too.

I've left out some details, but again none of this very revolutionary -
it's mostly the same techniques used in Aeron. I really think that we can
keep the API ga rbage free and wait-free (even in the multi producer case)
without compromising how pretty it looks - the total zero copy API will low
level, but it should only be used by advanced users. Moreover the usual
producer.send(msg, topic, partition) can use the efficient ByteBuffer
offset API internally without it itself creating any garbage. With the
technique I talked about there is no need for an intermediate queue of any
kind since the underlying ByteBuffer per partition acts as the queue.

I can do more experiments with some real producer code instead of my toy
code to further validate the idea, but I am pretty sure that both
throughput and jitter characteristics will improve thanks to lower
contention (wait-free in java 8 with a single getAndAddInt() operation for
sync ) and better cache locality (C like buffers and a few constant number
of objects per partition). If you guys are interested, I'd love to talk
more. Again just to reiterate, I don't think the API will suffer at all -
most of this can be done under the covers. Additionally it will open up
things so that a low level zero copy API is possible.

Thanks,
Rajiv


Re: Kafka sending messages with zero copy

2014-10-24 Thread Jay Kreps
Yeah this is the org.apache.kafka.producer client in the clients/ directory.

-Jay

On Fri, Oct 24, 2014 at 9:54 AM, Rajiv Kurian  wrote:

> Thanks!
>
> On Fri, Oct 24, 2014 at 9:03 AM, Guozhang Wang  wrote:
>
> > I think 0.8.2 already used the new producer as the standard client.
> >
> > Guozhang
> >
> > On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian 
> > wrote:
> >
> > > Thanks I'll take a look at both. Just to be sure we are talking about
> > > client version 0.82 right?
> > >
> > >
> > >
> > > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Rajiv,
> > > >
> > > > The new producer does maintain a buffer per partition, but you need
> to
> > > > consider synchronizing the access to the buffer since it can take
> data
> > > from
> > > > multiple caller threads. I think Jay's suggestion 1) does the same
> > thing
> > > > for your purpose if you already have the data buffer storing your
> data:
> > > by
> > > > creating a ProduceRecord it would not incur copying the data into a
> > > > temporary buffer, but instead mark the offset / length of in the byte
> > > > buffer that user specifies, the when producer.send() is called the
> > > > underlying buffer is copied to the producer buffer. If you do not
> have
> > > the
> > > > data buffer but just want to write directly to the producer buffer
> that
> > > is
> > > > one step further.
> > > >
> > > > As for the consumer, you can take a look at the MemoryRecord's
> iterator
> > > > implementation, which I think already implements what you want.
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian 
> > > > wrote:
> > > >
> > > > > I want to avoid allocations since I am using Java in a C mode. Even
> > > > though
> > > > > creating objects is a mere thread local pointer bump in Java,
> freeing
> > > > them
> > > > > is not so cheap and causes  uncontrollable jitter. The second
> > > motivation
> > > > is
> > > > > to avoid copying of data. Since I have objects which really look
> > like C
> > > > > structs that can be sent over the wire it's most efficient for me
> to
> > > > write
> > > > > them out in the very exact buffer that will be sent over the wire.
> > > > >
> > > > > As for the bad API I completely agree -  it is a very C style API
> and
> > > > > definitely not usable in a productive way by most developers. My
> > point
> > > > was
> > > > > that this work is done by the protocol handling layer in any case,
> > > maybe
> > > > it
> > > > > can be extended to allow a user access to it's internals in a safe
> > way
> > > > both
> > > > > during writing and reading. The present API then can be written as
> a
> > > > layer
> > > > > over this "ugly" non allocating API.
> > > > >
> > > > > Re (1) and (2). Instead of giving out keys, values as bytes which
> > > implies
> > > > > copies, I'd ideally like to scribble them straight into the buffer
> > that
> > > > you
> > > > > are accumulating data onto before sending it. I am guessing you
> > already
> > > > > need a single buffer per partition or you have a single buffer per
> > > > broker.
> > > > > All of this probably implies a single threaded producer where I can
> > be
> > > in
> > > > > charge of the event loop.
> > > > >
> > > > > Right now my data is within ByteBuffer/Unsafe buffer based data
> > > > structures.
> > > > > They can be put on the wire without any serialization step if I was
> > > using
> > > > > Java NIO. Similarly they can be consumed on the other side without
> > any
> > > > > deserialization step. But with the current kafka API I have to:
> > > > >   i) Copy data from my ByteBuffers onto new byte arrays.
> > > > >  ii) Wrap byte arrays from (i) in a new object. I can't even re-use
> > > this
> > > > > object since I don't know when kafka's send thread/serialization
> > thread
> > > > is
> > > > > really done with it.
> > > > >  iii) Write an encoder that just takes the byte array from this
> > wrapper
> > > > > object and hands it to Kafka.
> > > > >
> > > > > Similarly on the consumer:
> > > > >   i) Kafka will make copies of slices (representing user values) of
> > the
> > > > > ByteBuffer that was transferred from a broker into byte arrays.
> > > > >  ii) Allocate an object  (using the decoder) that wraps these byte
> > > arrays
> > > > > and hand them to me.
> > > > >
> > > > > My imaginary (admittedly non-java-esque manual allocation style)
> API
> > > > would
> > > > > give me a pointer to Kafka's ByteBuffer that it has been
> accumulating
> > > > > protocol messages on for either writing (on producer) or reading
> (on
> > > > > consumer). I know it's a long shot but I still wanted to get the
> > team's
> > > > > thoughts on it. I'd be happy to contribute if we can come to an
> > > agreement
> > > > > on the API design. My hypothesis is that if the internal protocol
> > > parsing
> > > > > and buffer creation logic is written like this, it wouldn't be too
> > > tough
> > > > to
> > > > > expose it's innards and have the 

Re: Kafka sending messages with zero copy

2014-10-24 Thread Rajiv Kurian
Thanks!

On Fri, Oct 24, 2014 at 9:03 AM, Guozhang Wang  wrote:

> I think 0.8.2 already used the new producer as the standard client.
>
> Guozhang
>
> On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian 
> wrote:
>
> > Thanks I'll take a look at both. Just to be sure we are talking about
> > client version 0.82 right?
> >
> >
> >
> > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang 
> wrote:
> >
> > > Rajiv,
> > >
> > > The new producer does maintain a buffer per partition, but you need to
> > > consider synchronizing the access to the buffer since it can take data
> > from
> > > multiple caller threads. I think Jay's suggestion 1) does the same
> thing
> > > for your purpose if you already have the data buffer storing your data:
> > by
> > > creating a ProduceRecord it would not incur copying the data into a
> > > temporary buffer, but instead mark the offset / length of in the byte
> > > buffer that user specifies, the when producer.send() is called the
> > > underlying buffer is copied to the producer buffer. If you do not have
> > the
> > > data buffer but just want to write directly to the producer buffer that
> > is
> > > one step further.
> > >
> > > As for the consumer, you can take a look at the MemoryRecord's iterator
> > > implementation, which I think already implements what you want.
> > >
> > > Guozhang
> > >
> > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian 
> > > wrote:
> > >
> > > > I want to avoid allocations since I am using Java in a C mode. Even
> > > though
> > > > creating objects is a mere thread local pointer bump in Java, freeing
> > > them
> > > > is not so cheap and causes  uncontrollable jitter. The second
> > motivation
> > > is
> > > > to avoid copying of data. Since I have objects which really look
> like C
> > > > structs that can be sent over the wire it's most efficient for me to
> > > write
> > > > them out in the very exact buffer that will be sent over the wire.
> > > >
> > > > As for the bad API I completely agree -  it is a very C style API and
> > > > definitely not usable in a productive way by most developers. My
> point
> > > was
> > > > that this work is done by the protocol handling layer in any case,
> > maybe
> > > it
> > > > can be extended to allow a user access to it's internals in a safe
> way
> > > both
> > > > during writing and reading. The present API then can be written as a
> > > layer
> > > > over this "ugly" non allocating API.
> > > >
> > > > Re (1) and (2). Instead of giving out keys, values as bytes which
> > implies
> > > > copies, I'd ideally like to scribble them straight into the buffer
> that
> > > you
> > > > are accumulating data onto before sending it. I am guessing you
> already
> > > > need a single buffer per partition or you have a single buffer per
> > > broker.
> > > > All of this probably implies a single threaded producer where I can
> be
> > in
> > > > charge of the event loop.
> > > >
> > > > Right now my data is within ByteBuffer/Unsafe buffer based data
> > > structures.
> > > > They can be put on the wire without any serialization step if I was
> > using
> > > > Java NIO. Similarly they can be consumed on the other side without
> any
> > > > deserialization step. But with the current kafka API I have to:
> > > >   i) Copy data from my ByteBuffers onto new byte arrays.
> > > >  ii) Wrap byte arrays from (i) in a new object. I can't even re-use
> > this
> > > > object since I don't know when kafka's send thread/serialization
> thread
> > > is
> > > > really done with it.
> > > >  iii) Write an encoder that just takes the byte array from this
> wrapper
> > > > object and hands it to Kafka.
> > > >
> > > > Similarly on the consumer:
> > > >   i) Kafka will make copies of slices (representing user values) of
> the
> > > > ByteBuffer that was transferred from a broker into byte arrays.
> > > >  ii) Allocate an object  (using the decoder) that wraps these byte
> > arrays
> > > > and hand them to me.
> > > >
> > > > My imaginary (admittedly non-java-esque manual allocation style) API
> > > would
> > > > give me a pointer to Kafka's ByteBuffer that it has been accumulating
> > > > protocol messages on for either writing (on producer) or reading (on
> > > > consumer). I know it's a long shot but I still wanted to get the
> team's
> > > > thoughts on it. I'd be happy to contribute if we can come to an
> > agreement
> > > > on the API design. My hypothesis is that if the internal protocol
> > parsing
> > > > and buffer creation logic is written like this, it wouldn't be too
> > tough
> > > to
> > > > expose it's innards and have the current encoding/decoding APIs just
> > use
> > > > this low level API/
> > > >
> > > > Thanks for listening to my rant.
> > > >
> > > >
> > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps 
> > wrote:
> > > >
> > > > > It sounds like you are primarily interested in optimizing the
> > producer?
> > > > >
> > > > > There is no way to produce data without any allocation being done
> > and I
> > > > > think getting t

Re: Kafka sending messages with zero copy

2014-10-24 Thread Guozhang Wang
I think 0.8.2 already used the new producer as the standard client.

Guozhang

On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian  wrote:

> Thanks I'll take a look at both. Just to be sure we are talking about
> client version 0.82 right?
>
>
>
> On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang  wrote:
>
> > Rajiv,
> >
> > The new producer does maintain a buffer per partition, but you need to
> > consider synchronizing the access to the buffer since it can take data
> from
> > multiple caller threads. I think Jay's suggestion 1) does the same thing
> > for your purpose if you already have the data buffer storing your data:
> by
> > creating a ProduceRecord it would not incur copying the data into a
> > temporary buffer, but instead mark the offset / length of in the byte
> > buffer that user specifies, the when producer.send() is called the
> > underlying buffer is copied to the producer buffer. If you do not have
> the
> > data buffer but just want to write directly to the producer buffer that
> is
> > one step further.
> >
> > As for the consumer, you can take a look at the MemoryRecord's iterator
> > implementation, which I think already implements what you want.
> >
> > Guozhang
> >
> > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian 
> > wrote:
> >
> > > I want to avoid allocations since I am using Java in a C mode. Even
> > though
> > > creating objects is a mere thread local pointer bump in Java, freeing
> > them
> > > is not so cheap and causes  uncontrollable jitter. The second
> motivation
> > is
> > > to avoid copying of data. Since I have objects which really look like C
> > > structs that can be sent over the wire it's most efficient for me to
> > write
> > > them out in the very exact buffer that will be sent over the wire.
> > >
> > > As for the bad API I completely agree -  it is a very C style API and
> > > definitely not usable in a productive way by most developers. My point
> > was
> > > that this work is done by the protocol handling layer in any case,
> maybe
> > it
> > > can be extended to allow a user access to it's internals in a safe way
> > both
> > > during writing and reading. The present API then can be written as a
> > layer
> > > over this "ugly" non allocating API.
> > >
> > > Re (1) and (2). Instead of giving out keys, values as bytes which
> implies
> > > copies, I'd ideally like to scribble them straight into the buffer that
> > you
> > > are accumulating data onto before sending it. I am guessing you already
> > > need a single buffer per partition or you have a single buffer per
> > broker.
> > > All of this probably implies a single threaded producer where I can be
> in
> > > charge of the event loop.
> > >
> > > Right now my data is within ByteBuffer/Unsafe buffer based data
> > structures.
> > > They can be put on the wire without any serialization step if I was
> using
> > > Java NIO. Similarly they can be consumed on the other side without any
> > > deserialization step. But with the current kafka API I have to:
> > >   i) Copy data from my ByteBuffers onto new byte arrays.
> > >  ii) Wrap byte arrays from (i) in a new object. I can't even re-use
> this
> > > object since I don't know when kafka's send thread/serialization thread
> > is
> > > really done with it.
> > >  iii) Write an encoder that just takes the byte array from this wrapper
> > > object and hands it to Kafka.
> > >
> > > Similarly on the consumer:
> > >   i) Kafka will make copies of slices (representing user values) of the
> > > ByteBuffer that was transferred from a broker into byte arrays.
> > >  ii) Allocate an object  (using the decoder) that wraps these byte
> arrays
> > > and hand them to me.
> > >
> > > My imaginary (admittedly non-java-esque manual allocation style) API
> > would
> > > give me a pointer to Kafka's ByteBuffer that it has been accumulating
> > > protocol messages on for either writing (on producer) or reading (on
> > > consumer). I know it's a long shot but I still wanted to get the team's
> > > thoughts on it. I'd be happy to contribute if we can come to an
> agreement
> > > on the API design. My hypothesis is that if the internal protocol
> parsing
> > > and buffer creation logic is written like this, it wouldn't be too
> tough
> > to
> > > expose it's innards and have the current encoding/decoding APIs just
> use
> > > this low level API/
> > >
> > > Thanks for listening to my rant.
> > >
> > >
> > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps 
> wrote:
> > >
> > > > It sounds like you are primarily interested in optimizing the
> producer?
> > > >
> > > > There is no way to produce data without any allocation being done
> and I
> > > > think getting to that would be pretty hard and lead to bad apis, but
> > > > avoiding memory allocation entirely shouldn't be necessary. Small
> > > transient
> > > > objects in java are pretty cheap to allocate and deallocate. The new
> > > Kafka
> > > > producer API that is on trunk and will be in 0.8.2 is much more
> > > disciplined
> > > > in it

Re: Kafka sending messages with zero copy

2014-10-24 Thread Rajiv Kurian
Thanks I'll take a look at both. Just to be sure we are talking about
client version 0.82 right?



On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang  wrote:

> Rajiv,
>
> The new producer does maintain a buffer per partition, but you need to
> consider synchronizing the access to the buffer since it can take data from
> multiple caller threads. I think Jay's suggestion 1) does the same thing
> for your purpose if you already have the data buffer storing your data: by
> creating a ProduceRecord it would not incur copying the data into a
> temporary buffer, but instead mark the offset / length of in the byte
> buffer that user specifies, the when producer.send() is called the
> underlying buffer is copied to the producer buffer. If you do not have the
> data buffer but just want to write directly to the producer buffer that is
> one step further.
>
> As for the consumer, you can take a look at the MemoryRecord's iterator
> implementation, which I think already implements what you want.
>
> Guozhang
>
> On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian 
> wrote:
>
> > I want to avoid allocations since I am using Java in a C mode. Even
> though
> > creating objects is a mere thread local pointer bump in Java, freeing
> them
> > is not so cheap and causes  uncontrollable jitter. The second motivation
> is
> > to avoid copying of data. Since I have objects which really look like C
> > structs that can be sent over the wire it's most efficient for me to
> write
> > them out in the very exact buffer that will be sent over the wire.
> >
> > As for the bad API I completely agree -  it is a very C style API and
> > definitely not usable in a productive way by most developers. My point
> was
> > that this work is done by the protocol handling layer in any case, maybe
> it
> > can be extended to allow a user access to it's internals in a safe way
> both
> > during writing and reading. The present API then can be written as a
> layer
> > over this "ugly" non allocating API.
> >
> > Re (1) and (2). Instead of giving out keys, values as bytes which implies
> > copies, I'd ideally like to scribble them straight into the buffer that
> you
> > are accumulating data onto before sending it. I am guessing you already
> > need a single buffer per partition or you have a single buffer per
> broker.
> > All of this probably implies a single threaded producer where I can be in
> > charge of the event loop.
> >
> > Right now my data is within ByteBuffer/Unsafe buffer based data
> structures.
> > They can be put on the wire without any serialization step if I was using
> > Java NIO. Similarly they can be consumed on the other side without any
> > deserialization step. But with the current kafka API I have to:
> >   i) Copy data from my ByteBuffers onto new byte arrays.
> >  ii) Wrap byte arrays from (i) in a new object. I can't even re-use this
> > object since I don't know when kafka's send thread/serialization thread
> is
> > really done with it.
> >  iii) Write an encoder that just takes the byte array from this wrapper
> > object and hands it to Kafka.
> >
> > Similarly on the consumer:
> >   i) Kafka will make copies of slices (representing user values) of the
> > ByteBuffer that was transferred from a broker into byte arrays.
> >  ii) Allocate an object  (using the decoder) that wraps these byte arrays
> > and hand them to me.
> >
> > My imaginary (admittedly non-java-esque manual allocation style) API
> would
> > give me a pointer to Kafka's ByteBuffer that it has been accumulating
> > protocol messages on for either writing (on producer) or reading (on
> > consumer). I know it's a long shot but I still wanted to get the team's
> > thoughts on it. I'd be happy to contribute if we can come to an agreement
> > on the API design. My hypothesis is that if the internal protocol parsing
> > and buffer creation logic is written like this, it wouldn't be too tough
> to
> > expose it's innards and have the current encoding/decoding APIs just use
> > this low level API/
> >
> > Thanks for listening to my rant.
> >
> >
> > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps  wrote:
> >
> > > It sounds like you are primarily interested in optimizing the producer?
> > >
> > > There is no way to produce data without any allocation being done and I
> > > think getting to that would be pretty hard and lead to bad apis, but
> > > avoiding memory allocation entirely shouldn't be necessary. Small
> > transient
> > > objects in java are pretty cheap to allocate and deallocate. The new
> > Kafka
> > > producer API that is on trunk and will be in 0.8.2 is much more
> > disciplined
> > > in it's usage of memory though there is still some allocation. The goal
> > is
> > > to avoid copying the *data* multiple times, even if we do end up
> creating
> > > some small helper objects along the way (the idea is that the data may
> be
> > > rather large).
> > >
> > > If you wanted to further optimize the new producer there are two things
> > > that could be done that would

Re: Kafka sending messages with zero copy

2014-10-24 Thread Guozhang Wang
Rajiv,

The new producer does maintain a buffer per partition, but you need to
consider synchronizing the access to the buffer since it can take data from
multiple caller threads. I think Jay's suggestion 1) does the same thing
for your purpose if you already have the data buffer storing your data: by
creating a ProduceRecord it would not incur copying the data into a
temporary buffer, but instead mark the offset / length of in the byte
buffer that user specifies, the when producer.send() is called the
underlying buffer is copied to the producer buffer. If you do not have the
data buffer but just want to write directly to the producer buffer that is
one step further.

As for the consumer, you can take a look at the MemoryRecord's iterator
implementation, which I think already implements what you want.

Guozhang

On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian  wrote:

> I want to avoid allocations since I am using Java in a C mode. Even though
> creating objects is a mere thread local pointer bump in Java, freeing them
> is not so cheap and causes  uncontrollable jitter. The second motivation is
> to avoid copying of data. Since I have objects which really look like C
> structs that can be sent over the wire it's most efficient for me to write
> them out in the very exact buffer that will be sent over the wire.
>
> As for the bad API I completely agree -  it is a very C style API and
> definitely not usable in a productive way by most developers. My point was
> that this work is done by the protocol handling layer in any case, maybe it
> can be extended to allow a user access to it's internals in a safe way both
> during writing and reading. The present API then can be written as a layer
> over this "ugly" non allocating API.
>
> Re (1) and (2). Instead of giving out keys, values as bytes which implies
> copies, I'd ideally like to scribble them straight into the buffer that you
> are accumulating data onto before sending it. I am guessing you already
> need a single buffer per partition or you have a single buffer per broker.
> All of this probably implies a single threaded producer where I can be in
> charge of the event loop.
>
> Right now my data is within ByteBuffer/Unsafe buffer based data structures.
> They can be put on the wire without any serialization step if I was using
> Java NIO. Similarly they can be consumed on the other side without any
> deserialization step. But with the current kafka API I have to:
>   i) Copy data from my ByteBuffers onto new byte arrays.
>  ii) Wrap byte arrays from (i) in a new object. I can't even re-use this
> object since I don't know when kafka's send thread/serialization thread is
> really done with it.
>  iii) Write an encoder that just takes the byte array from this wrapper
> object and hands it to Kafka.
>
> Similarly on the consumer:
>   i) Kafka will make copies of slices (representing user values) of the
> ByteBuffer that was transferred from a broker into byte arrays.
>  ii) Allocate an object  (using the decoder) that wraps these byte arrays
> and hand them to me.
>
> My imaginary (admittedly non-java-esque manual allocation style) API would
> give me a pointer to Kafka's ByteBuffer that it has been accumulating
> protocol messages on for either writing (on producer) or reading (on
> consumer). I know it's a long shot but I still wanted to get the team's
> thoughts on it. I'd be happy to contribute if we can come to an agreement
> on the API design. My hypothesis is that if the internal protocol parsing
> and buffer creation logic is written like this, it wouldn't be too tough to
> expose it's innards and have the current encoding/decoding APIs just use
> this low level API/
>
> Thanks for listening to my rant.
>
>
> On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps  wrote:
>
> > It sounds like you are primarily interested in optimizing the producer?
> >
> > There is no way to produce data without any allocation being done and I
> > think getting to that would be pretty hard and lead to bad apis, but
> > avoiding memory allocation entirely shouldn't be necessary. Small
> transient
> > objects in java are pretty cheap to allocate and deallocate. The new
> Kafka
> > producer API that is on trunk and will be in 0.8.2 is much more
> disciplined
> > in it's usage of memory though there is still some allocation. The goal
> is
> > to avoid copying the *data* multiple times, even if we do end up creating
> > some small helper objects along the way (the idea is that the data may be
> > rather large).
> >
> > If you wanted to further optimize the new producer there are two things
> > that could be done that would help:
> > 1. Avoid the copy when creating the ProducerRecord instance. This could
> be
> > done by accepting a length/offset along with the key and value and making
> > use of this when writing to the records instance. As it is your key and
> > value need to be complete byte arrays.
> > 2. Avoid the copy during request serialization. This is a little
> trickier.
>

Re: Kafka sending messages with zero copy

2014-10-23 Thread Rajiv Kurian
I want to avoid allocations since I am using Java in a C mode. Even though
creating objects is a mere thread local pointer bump in Java, freeing them
is not so cheap and causes  uncontrollable jitter. The second motivation is
to avoid copying of data. Since I have objects which really look like C
structs that can be sent over the wire it's most efficient for me to write
them out in the very exact buffer that will be sent over the wire.

As for the bad API I completely agree -  it is a very C style API and
definitely not usable in a productive way by most developers. My point was
that this work is done by the protocol handling layer in any case, maybe it
can be extended to allow a user access to it's internals in a safe way both
during writing and reading. The present API then can be written as a layer
over this "ugly" non allocating API.

Re (1) and (2). Instead of giving out keys, values as bytes which implies
copies, I'd ideally like to scribble them straight into the buffer that you
are accumulating data onto before sending it. I am guessing you already
need a single buffer per partition or you have a single buffer per broker.
All of this probably implies a single threaded producer where I can be in
charge of the event loop.

Right now my data is within ByteBuffer/Unsafe buffer based data structures.
They can be put on the wire without any serialization step if I was using
Java NIO. Similarly they can be consumed on the other side without any
deserialization step. But with the current kafka API I have to:
  i) Copy data from my ByteBuffers onto new byte arrays.
 ii) Wrap byte arrays from (i) in a new object. I can't even re-use this
object since I don't know when kafka's send thread/serialization thread is
really done with it.
 iii) Write an encoder that just takes the byte array from this wrapper
object and hands it to Kafka.

Similarly on the consumer:
  i) Kafka will make copies of slices (representing user values) of the
ByteBuffer that was transferred from a broker into byte arrays.
 ii) Allocate an object  (using the decoder) that wraps these byte arrays
and hand them to me.

My imaginary (admittedly non-java-esque manual allocation style) API would
give me a pointer to Kafka's ByteBuffer that it has been accumulating
protocol messages on for either writing (on producer) or reading (on
consumer). I know it's a long shot but I still wanted to get the team's
thoughts on it. I'd be happy to contribute if we can come to an agreement
on the API design. My hypothesis is that if the internal protocol parsing
and buffer creation logic is written like this, it wouldn't be too tough to
expose it's innards and have the current encoding/decoding APIs just use
this low level API/

Thanks for listening to my rant.


On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps  wrote:

> It sounds like you are primarily interested in optimizing the producer?
>
> There is no way to produce data without any allocation being done and I
> think getting to that would be pretty hard and lead to bad apis, but
> avoiding memory allocation entirely shouldn't be necessary. Small transient
> objects in java are pretty cheap to allocate and deallocate. The new Kafka
> producer API that is on trunk and will be in 0.8.2 is much more disciplined
> in it's usage of memory though there is still some allocation. The goal is
> to avoid copying the *data* multiple times, even if we do end up creating
> some small helper objects along the way (the idea is that the data may be
> rather large).
>
> If you wanted to further optimize the new producer there are two things
> that could be done that would help:
> 1. Avoid the copy when creating the ProducerRecord instance. This could be
> done by accepting a length/offset along with the key and value and making
> use of this when writing to the records instance. As it is your key and
> value need to be complete byte arrays.
> 2. Avoid the copy during request serialization. This is a little trickier.
> During request serialization we need to take the records for each partition
> and create a request that contains all of them. It is possible to do this
> with no further recopying of data but somewhat tricky.
>
> My recommendation would be to try the new producer api and see how that
> goes. If you need to optimize further we would definitely take patches for
> (1) and (2).
>
> -Jay
>
> On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian 
> wrote:
>
> > I have a flyweight style protocol that I use for my messages. Thus they
> > require no serialization/deserialization to be processed. The messages
> are
> > just offset, length pairs within a ByteBuffer.
> >
> > Is there a producer and consumer API that forgoes allocation? I just want
> > to give the kakfa producer offsets from a ByteBuffer. Similarly it would
> be
> > ideal if I could get a ByteBuffer and offsets into it from the consumer.
> > Even if I could get byte arrays (implying a copy but no decoding phase)
> on
> > the consumer that would be great. Right now

Re: Kafka sending messages with zero copy

2014-10-23 Thread Rajiv Kurian
My use case is that though I am using Java, I want to work in as close to
zero garbage environment. All my internal data structures are based on
ByteBuffers or buffers allocated using Unsafe. Thus my objects are already
in a state where they can be transmitted without any serialization step
i.e. my objects are just flyweights over buffers in the Cap'nProto or SBE
style. The current API is forcing me to allocate to send my already
serialized objects.

I'd imagine something like this on a single threaded producer:

// My message has a single int, double and long.
public static final int SIZE_PER_OBJECT = 4 + 8 + 8; // int + double + long.

KafkaPartition partition = kafkaProducer.getPartition("Topic",
partitionNumber);
KafkaBuffer kafkaBuffer = partition.getBuffer(); // I am imagining a single
buffer per partition.
ByteBuffer buffer = ByteBuffer.allocate(0);
// Set the appropriate headers based on the kafka protocol in the buffer
and ensure that there is space in the buffer for a payload of
SIZE_PER_OBJECT.
kafkaBuffer.getNextFlyweight(SIZE_PER_OBJECT, buffer); // Just set the
address of the input bytebuffer to the right pointer in kafka's protocol
Bytebuffer and the capacity to SIZE_PER_OBJECT. One could use reflection to
set it's capacity and address.
// I treat it like a byte buffer getting appropriate exceptions if I try
writing past the end marker.
buffer.writeInt(1);
buffer.writeDouble(2.0);
buffer.writeLong(5L);
kafkaBuffer.commit(); // I tell kafka that I am done with writing my
object. I.e. it can calculate the CRC32 etc of the latest write.
partition.flush(); // Potentially flush to the broker.

Kafka probably has to layout a ByteBuffer encoded as per it's protocol.
Right now it probably takes objects and serializes them into that
ByteBuffer. It would be great as an advanced api to get an offset into that
ByteBuffer and use that as a space to write my message there. This can be
an API that is used by the more simple encoding producer behind the scenes.

The consumer API can similarly allow one to re-use a flyweight object to
read messages. For examples:

ByteBuffer myBuffer = ByteBuffer.allocate(0).asReadOnlyBuffer();  // To
ensure I cannot scribble into the internal protocol bytes.
while (true) {
  KafkaBuffer kafkaBuffer = consumer.poll(timeout);
  while (kafkaBuffer.hasRemainingMessages()) {
kafkaBuffer.getNextMsgIntoBuffer(myBuffer); // Sets mybuffer to point
at the right offset in the kafka ByteBuffer where the message starts.
myBuffer.getInt();  // 1
myBuffer.getDouble();  // 2.0
myBuffer.getLong(); '' 5L
  }
 // Done with processing messages from this set of buffers.
}

I know this is probably a very rare use case, but single threaded producers
(the consumer in 0.9 already seems single threaded which is great!) and an
API for ByteBuffer manipulation would allow it to work extremely well in
high throughput environments where avoiding garbage creation/collection is
really important.


Thanks!


On Thu, Oct 23, 2014 at 5:03 PM, Guozhang Wang  wrote:

> Rajiv,
>
> Could you let me know your use case? Are you sending a very large file and
> hence would prefer streaming manner instead of messages?
>
> Guozhang
>
> On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian 
> wrote:
>
> > I have a flyweight style protocol that I use for my messages. Thus they
> > require no serialization/deserialization to be processed. The messages
> are
> > just offset, length pairs within a ByteBuffer.
> >
> > Is there a producer and consumer API that forgoes allocation? I just want
> > to give the kakfa producer offsets from a ByteBuffer. Similarly it would
> be
> > ideal if I could get a ByteBuffer and offsets into it from the consumer.
> > Even if I could get byte arrays (implying a copy but no decoding phase)
> on
> > the consumer that would be great. Right now it seems to me that the only
> > way to get messages from Kafka is through a message object, which implies
> > Kafka allocates these messages all the time. I am willing to use the
> > upcoming 0.9 API too.
> >
> > Thanks.
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka sending messages with zero copy

2014-10-23 Thread Jay Kreps
It sounds like you are primarily interested in optimizing the producer?

There is no way to produce data without any allocation being done and I
think getting to that would be pretty hard and lead to bad apis, but
avoiding memory allocation entirely shouldn't be necessary. Small transient
objects in java are pretty cheap to allocate and deallocate. The new Kafka
producer API that is on trunk and will be in 0.8.2 is much more disciplined
in it's usage of memory though there is still some allocation. The goal is
to avoid copying the *data* multiple times, even if we do end up creating
some small helper objects along the way (the idea is that the data may be
rather large).

If you wanted to further optimize the new producer there are two things
that could be done that would help:
1. Avoid the copy when creating the ProducerRecord instance. This could be
done by accepting a length/offset along with the key and value and making
use of this when writing to the records instance. As it is your key and
value need to be complete byte arrays.
2. Avoid the copy during request serialization. This is a little trickier.
During request serialization we need to take the records for each partition
and create a request that contains all of them. It is possible to do this
with no further recopying of data but somewhat tricky.

My recommendation would be to try the new producer api and see how that
goes. If you need to optimize further we would definitely take patches for
(1) and (2).

-Jay

On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian  wrote:

> I have a flyweight style protocol that I use for my messages. Thus they
> require no serialization/deserialization to be processed. The messages are
> just offset, length pairs within a ByteBuffer.
>
> Is there a producer and consumer API that forgoes allocation? I just want
> to give the kakfa producer offsets from a ByteBuffer. Similarly it would be
> ideal if I could get a ByteBuffer and offsets into it from the consumer.
> Even if I could get byte arrays (implying a copy but no decoding phase) on
> the consumer that would be great. Right now it seems to me that the only
> way to get messages from Kafka is through a message object, which implies
> Kafka allocates these messages all the time. I am willing to use the
> upcoming 0.9 API too.
>
> Thanks.
>


Re: Kafka sending messages with zero copy

2014-10-23 Thread Guozhang Wang
Rajiv,

Could you let me know your use case? Are you sending a very large file and
hence would prefer streaming manner instead of messages?

Guozhang

On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian  wrote:

> I have a flyweight style protocol that I use for my messages. Thus they
> require no serialization/deserialization to be processed. The messages are
> just offset, length pairs within a ByteBuffer.
>
> Is there a producer and consumer API that forgoes allocation? I just want
> to give the kakfa producer offsets from a ByteBuffer. Similarly it would be
> ideal if I could get a ByteBuffer and offsets into it from the consumer.
> Even if I could get byte arrays (implying a copy but no decoding phase) on
> the consumer that would be great. Right now it seems to me that the only
> way to get messages from Kafka is through a message object, which implies
> Kafka allocates these messages all the time. I am willing to use the
> upcoming 0.9 API too.
>
> Thanks.
>



-- 
-- Guozhang