My use case is that though I am using Java, I want to work in as close to
zero garbage environment. All my internal data structures are based on
ByteBuffers or buffers allocated using Unsafe. Thus my objects are already
in a state where they can be transmitted without any serialization step
i.e. my objects are just flyweights over buffers in the Cap'nProto or SBE
style. The current API is forcing me to allocate to send my already
serialized objects.

I'd imagine something like this on a single threaded producer:

// My message has a single int, double and long.
public static final int SIZE_PER_OBJECT = 4 + 8 + 8; // int + double + long.

KafkaPartition partition = kafkaProducer.getPartition("Topic",
partitionNumber);
KafkaBuffer kafkaBuffer = partition.getBuffer(); // I am imagining a single
buffer per partition.
ByteBuffer buffer = ByteBuffer.allocate(0);
// Set the appropriate headers based on the kafka protocol in the buffer
and ensure that there is space in the buffer for a payload of
SIZE_PER_OBJECT.
kafkaBuffer.getNextFlyweight(SIZE_PER_OBJECT, buffer); // Just set the
address of the input bytebuffer to the right pointer in kafka's protocol
Bytebuffer and the capacity to SIZE_PER_OBJECT. One could use reflection to
set it's capacity and address.
// I treat it like a byte buffer getting appropriate exceptions if I try
writing past the end marker.
buffer.writeInt(1);
buffer.writeDouble(2.0);
buffer.writeLong(5L);
kafkaBuffer.commit(); // I tell kafka that I am done with writing my
object. I.e. it can calculate the CRC32 etc of the latest write.
partition.flush(); // Potentially flush to the broker.

Kafka probably has to layout a ByteBuffer encoded as per it's protocol.
Right now it probably takes objects and serializes them into that
ByteBuffer. It would be great as an advanced api to get an offset into that
ByteBuffer and use that as a space to write my message there. This can be
an API that is used by the more simple encoding producer behind the scenes.

The consumer API can similarly allow one to re-use a flyweight object to
read messages. For examples:

ByteBuffer myBuffer = ByteBuffer.allocate(0).asReadOnlyBuffer();  // To
ensure I cannot scribble into the internal protocol bytes.
while (true) {
  KafkaBuffer kafkaBuffer = consumer.poll(timeout);
  while (kafkaBuffer.hasRemainingMessages()) {
    kafkaBuffer.getNextMsgIntoBuffer(myBuffer); // Sets mybuffer to point
at the right offset in the kafka ByteBuffer where the message starts.
    myBuffer.getInt();  // 1
    myBuffer.getDouble();  // 2.0
    myBuffer.getLong(); '' 5L
  }
 // Done with processing messages from this set of buffers.
}

I know this is probably a very rare use case, but single threaded producers
(the consumer in 0.9 already seems single threaded which is great!) and an
API for ByteBuffer manipulation would allow it to work extremely well in
high throughput environments where avoiding garbage creation/collection is
really important.


Thanks!


On Thu, Oct 23, 2014 at 5:03 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Rajiv,
>
> Could you let me know your use case? Are you sending a very large file and
> hence would prefer streaming manner instead of messages?
>
> Guozhang
>
> On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I have a flyweight style protocol that I use for my messages. Thus they
> > require no serialization/deserialization to be processed. The messages
> are
> > just offset, length pairs within a ByteBuffer.
> >
> > Is there a producer and consumer API that forgoes allocation? I just want
> > to give the kakfa producer offsets from a ByteBuffer. Similarly it would
> be
> > ideal if I could get a ByteBuffer and offsets into it from the consumer.
> > Even if I could get byte arrays (implying a copy but no decoding phase)
> on
> > the consumer that would be great. Right now it seems to me that the only
> > way to get messages from Kafka is through a message object, which implies
> > Kafka allocates these messages all the time. I am willing to use the
> > upcoming 0.9 API too.
> >
> > Thanks.
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to