My use case is that though I am using Java, I want to work in as close to zero garbage environment. All my internal data structures are based on ByteBuffers or buffers allocated using Unsafe. Thus my objects are already in a state where they can be transmitted without any serialization step i.e. my objects are just flyweights over buffers in the Cap'nProto or SBE style. The current API is forcing me to allocate to send my already serialized objects.
I'd imagine something like this on a single threaded producer: // My message has a single int, double and long. public static final int SIZE_PER_OBJECT = 4 + 8 + 8; // int + double + long. KafkaPartition partition = kafkaProducer.getPartition("Topic", partitionNumber); KafkaBuffer kafkaBuffer = partition.getBuffer(); // I am imagining a single buffer per partition. ByteBuffer buffer = ByteBuffer.allocate(0); // Set the appropriate headers based on the kafka protocol in the buffer and ensure that there is space in the buffer for a payload of SIZE_PER_OBJECT. kafkaBuffer.getNextFlyweight(SIZE_PER_OBJECT, buffer); // Just set the address of the input bytebuffer to the right pointer in kafka's protocol Bytebuffer and the capacity to SIZE_PER_OBJECT. One could use reflection to set it's capacity and address. // I treat it like a byte buffer getting appropriate exceptions if I try writing past the end marker. buffer.writeInt(1); buffer.writeDouble(2.0); buffer.writeLong(5L); kafkaBuffer.commit(); // I tell kafka that I am done with writing my object. I.e. it can calculate the CRC32 etc of the latest write. partition.flush(); // Potentially flush to the broker. Kafka probably has to layout a ByteBuffer encoded as per it's protocol. Right now it probably takes objects and serializes them into that ByteBuffer. It would be great as an advanced api to get an offset into that ByteBuffer and use that as a space to write my message there. This can be an API that is used by the more simple encoding producer behind the scenes. The consumer API can similarly allow one to re-use a flyweight object to read messages. For examples: ByteBuffer myBuffer = ByteBuffer.allocate(0).asReadOnlyBuffer(); // To ensure I cannot scribble into the internal protocol bytes. while (true) { KafkaBuffer kafkaBuffer = consumer.poll(timeout); while (kafkaBuffer.hasRemainingMessages()) { kafkaBuffer.getNextMsgIntoBuffer(myBuffer); // Sets mybuffer to point at the right offset in the kafka ByteBuffer where the message starts. myBuffer.getInt(); // 1 myBuffer.getDouble(); // 2.0 myBuffer.getLong(); '' 5L } // Done with processing messages from this set of buffers. } I know this is probably a very rare use case, but single threaded producers (the consumer in 0.9 already seems single threaded which is great!) and an API for ByteBuffer manipulation would allow it to work extremely well in high throughput environments where avoiding garbage creation/collection is really important. Thanks! On Thu, Oct 23, 2014 at 5:03 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Rajiv, > > Could you let me know your use case? Are you sending a very large file and > hence would prefer streaming manner instead of messages? > > Guozhang > > On Thu, Oct 23, 2014 at 4:03 PM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > I have a flyweight style protocol that I use for my messages. Thus they > > require no serialization/deserialization to be processed. The messages > are > > just offset, length pairs within a ByteBuffer. > > > > Is there a producer and consumer API that forgoes allocation? I just want > > to give the kakfa producer offsets from a ByteBuffer. Similarly it would > be > > ideal if I could get a ByteBuffer and offsets into it from the consumer. > > Even if I could get byte arrays (implying a copy but no decoding phase) > on > > the consumer that would be great. Right now it seems to me that the only > > way to get messages from Kafka is through a message object, which implies > > Kafka allocates these messages all the time. I am willing to use the > > upcoming 0.9 API too. > > > > Thanks. > > > > > > -- > -- Guozhang >