Yeah this is the org.apache.kafka.producer client in the clients/ directory.

-Jay

On Fri, Oct 24, 2014 at 9:54 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Thanks!
>
> On Fri, Oct 24, 2014 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I think 0.8.2 already used the new producer as the standard client.
> >
> > Guozhang
> >
> > On Fri, Oct 24, 2014 at 8:51 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Thanks I'll take a look at both. Just to be sure we are talking about
> > > client version 0.82 right?
> > >
> > >
> > >
> > > On Fri, Oct 24, 2014 at 8:39 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Rajiv,
> > > >
> > > > The new producer does maintain a buffer per partition, but you need
> to
> > > > consider synchronizing the access to the buffer since it can take
> data
> > > from
> > > > multiple caller threads. I think Jay's suggestion 1) does the same
> > thing
> > > > for your purpose if you already have the data buffer storing your
> data:
> > > by
> > > > creating a ProduceRecord it would not incur copying the data into a
> > > > temporary buffer, but instead mark the offset / length of in the byte
> > > > buffer that user specifies, the when producer.send() is called the
> > > > underlying buffer is copied to the producer buffer. If you do not
> have
> > > the
> > > > data buffer but just want to write directly to the producer buffer
> that
> > > is
> > > > one step further.
> > > >
> > > > As for the consumer, you can take a look at the MemoryRecord's
> iterator
> > > > implementation, which I think already implements what you want.
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Oct 23, 2014 at 6:13 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > > wrote:
> > > >
> > > > > I want to avoid allocations since I am using Java in a C mode. Even
> > > > though
> > > > > creating objects is a mere thread local pointer bump in Java,
> freeing
> > > > them
> > > > > is not so cheap and causes  uncontrollable jitter. The second
> > > motivation
> > > > is
> > > > > to avoid copying of data. Since I have objects which really look
> > like C
> > > > > structs that can be sent over the wire it's most efficient for me
> to
> > > > write
> > > > > them out in the very exact buffer that will be sent over the wire.
> > > > >
> > > > > As for the bad API I completely agree -  it is a very C style API
> and
> > > > > definitely not usable in a productive way by most developers. My
> > point
> > > > was
> > > > > that this work is done by the protocol handling layer in any case,
> > > maybe
> > > > it
> > > > > can be extended to allow a user access to it's internals in a safe
> > way
> > > > both
> > > > > during writing and reading. The present API then can be written as
> a
> > > > layer
> > > > > over this "ugly" non allocating API.
> > > > >
> > > > > Re (1) and (2). Instead of giving out keys, values as bytes which
> > > implies
> > > > > copies, I'd ideally like to scribble them straight into the buffer
> > that
> > > > you
> > > > > are accumulating data onto before sending it. I am guessing you
> > already
> > > > > need a single buffer per partition or you have a single buffer per
> > > > broker.
> > > > > All of this probably implies a single threaded producer where I can
> > be
> > > in
> > > > > charge of the event loop.
> > > > >
> > > > > Right now my data is within ByteBuffer/Unsafe buffer based data
> > > > structures.
> > > > > They can be put on the wire without any serialization step if I was
> > > using
> > > > > Java NIO. Similarly they can be consumed on the other side without
> > any
> > > > > deserialization step. But with the current kafka API I have to:
> > > > >   i) Copy data from my ByteBuffers onto new byte arrays.
> > > > >  ii) Wrap byte arrays from (i) in a new object. I can't even re-use
> > > this
> > > > > object since I don't know when kafka's send thread/serialization
> > thread
> > > > is
> > > > > really done with it.
> > > > >  iii) Write an encoder that just takes the byte array from this
> > wrapper
> > > > > object and hands it to Kafka.
> > > > >
> > > > > Similarly on the consumer:
> > > > >   i) Kafka will make copies of slices (representing user values) of
> > the
> > > > > ByteBuffer that was transferred from a broker into byte arrays.
> > > > >  ii) Allocate an object  (using the decoder) that wraps these byte
> > > arrays
> > > > > and hand them to me.
> > > > >
> > > > > My imaginary (admittedly non-java-esque manual allocation style)
> API
> > > > would
> > > > > give me a pointer to Kafka's ByteBuffer that it has been
> accumulating
> > > > > protocol messages on for either writing (on producer) or reading
> (on
> > > > > consumer). I know it's a long shot but I still wanted to get the
> > team's
> > > > > thoughts on it. I'd be happy to contribute if we can come to an
> > > agreement
> > > > > on the API design. My hypothesis is that if the internal protocol
> > > parsing
> > > > > and buffer creation logic is written like this, it wouldn't be too
> > > tough
> > > > to
> > > > > expose it's innards and have the current encoding/decoding APIs
> just
> > > use
> > > > > this low level API/
> > > > >
> > > > > Thanks for listening to my rant.
> > > > >
> > > > >
> > > > > On Thu, Oct 23, 2014 at 5:19 PM, Jay Kreps <jay.kr...@gmail.com>
> > > wrote:
> > > > >
> > > > > > It sounds like you are primarily interested in optimizing the
> > > producer?
> > > > > >
> > > > > > There is no way to produce data without any allocation being done
> > > and I
> > > > > > think getting to that would be pretty hard and lead to bad apis,
> > but
> > > > > > avoiding memory allocation entirely shouldn't be necessary. Small
> > > > > transient
> > > > > > objects in java are pretty cheap to allocate and deallocate. The
> > new
> > > > > Kafka
> > > > > > producer API that is on trunk and will be in 0.8.2 is much more
> > > > > disciplined
> > > > > > in it's usage of memory though there is still some allocation.
> The
> > > goal
> > > > > is
> > > > > > to avoid copying the *data* multiple times, even if we do end up
> > > > creating
> > > > > > some small helper objects along the way (the idea is that the
> data
> > > may
> > > > be
> > > > > > rather large).
> > > > > >
> > > > > > If you wanted to further optimize the new producer there are two
> > > things
> > > > > > that could be done that would help:
> > > > > > 1. Avoid the copy when creating the ProducerRecord instance. This
> > > could
> > > > > be
> > > > > > done by accepting a length/offset along with the key and value
> and
> > > > making
> > > > > > use of this when writing to the records instance. As it is your
> key
> > > and
> > > > > > value need to be complete byte arrays.
> > > > > > 2. Avoid the copy during request serialization. This is a little
> > > > > trickier.
> > > > > > During request serialization we need to take the records for each
> > > > > partition
> > > > > > and create a request that contains all of them. It is possible to
> > do
> > > > this
> > > > > > with no further recopying of data but somewhat tricky.
> > > > > >
> > > > > > My recommendation would be to try the new producer api and see
> how
> > > that
> > > > > > goes. If you need to optimize further we would definitely take
> > > patches
> > > > > for
> > > > > > (1) and (2).
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian <
> > ra...@signalfuse.com>
> > > > > > wrote:
> > > > > >
> > > > > > > I have a flyweight style protocol that I use for my messages.
> > Thus
> > > > they
> > > > > > > require no serialization/deserialization to be processed. The
> > > > messages
> > > > > > are
> > > > > > > just offset, length pairs within a ByteBuffer.
> > > > > > >
> > > > > > > Is there a producer and consumer API that forgoes allocation? I
> > > just
> > > > > want
> > > > > > > to give the kakfa producer offsets from a ByteBuffer. Similarly
> > it
> > > > > would
> > > > > > be
> > > > > > > ideal if I could get a ByteBuffer and offsets into it from the
> > > > > consumer.
> > > > > > > Even if I could get byte arrays (implying a copy but no
> decoding
> > > > phase)
> > > > > > on
> > > > > > > the consumer that would be great. Right now it seems to me that
> > the
> > > > > only
> > > > > > > way to get messages from Kafka is through a message object,
> which
> > > > > implies
> > > > > > > Kafka allocates these messages all the time. I am willing to
> use
> > > the
> > > > > > > upcoming 0.9 API too.
> > > > > > >
> > > > > > > Thanks.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to