[ 
https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326876#comment-17326876
 ] 

Kirill Rodionov commented on KAFKA-5761:
----------------------------------------

If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating 
them in the serializer is not going to achieve much because the data is copied 
to accumulator (in KafkaProducer.doSend) after the serializer is invoked and 
there's no way to release the buffer back to the pool other than in finalize() 
method.

There appears to be more sense in explicit support for ByteBuffer as a value so 
that user code can allocate it before calling producer.send and release after 
that since it's guaranteed that the contents have been copied to producer's 
accumulator by then

The only huccup here is that "value.serializer" property is mandatory and its 
presence is checked at KafkaProducer's contstruction time when there's no value 
to query its type.

If you make the value.serializer property optional, then there's a possibility 
of an error at the first send() invocation if the value happens to be anything 
other than ByteBuffer

 

The other option is to keep value serializer mandatory like now but ignore it 
if the value is a ByteBuffer whose contents can be copied without any 
serializers

valueBytes would have to be removed from Partitioner's signature (no impl I 
know uses that argument anyway)

 

WDYT?

> Serializer API should support ByteBuffer
> ----------------------------------------
>
>                 Key: KAFKA-5761
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5761
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.11.0.0
>            Reporter: Bhaskar Gollapudi
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>              Labels: features, performance
>
> Consider the Serializer : Its main method is :
> byte[] serialize(String topic, T data);
> Producer applications create a implementation that takes in an instance (
> of T ) and convert that to a byte[]. This byte array is allocated a new for
> this message.This byte array then is handed over to Kafka Producer API
> internals that write the bytes to buffer/ network socket. When the next
> message arrives , the serializer instead of creating a new byte[] , should
> try to reuse the existing byte[] for the new message. This requires two
> things :
> 1. The process of handing off the bytes to the buffer/socket and reusing
> the byte[] must happen on the same thread.
> 2 There should be a way for marking the end of available bytes in the
> byte[].
> The first is reasonably simple to understand. If this does not happen , and
> without other necessary synchrinization , the byte[] get corrupted and so
> is the message written to buffer/socket.However , this requirement is easy
> to meet for a producer application , because it controls the threads on
> which the serializer is invoked.
> The second is where the problem lies with the current API. It does not
> allow a variable size of bytes to be read from a container. It is limited
> by the byte[]'s length. This forces the producer to
> 1 either create a new byte[] for a message that is bigger than the previous
> one.
> OR
> 2. Decide a max size and use a padding .
> Both are cumbersome and error prone, and may cause wasting of network
> bandwidth.
> Instead , if there is an Serializer with this method :
> ByteBuffer serialize(String topic, T data);
> This helps to implements a reusable bytes container for  clients to avoid
> allocations for each message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to