Hello all,

We have been working on implementing a custom partitioner for our producer 
within a simple stream application, that will partition the records by a member 
field when sending them to the output topic. By looking at the contract of the 
partition() method in the Partitioner interface, it would seem that the value 
Object would be in its deserialized form when this method is called:

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] 
valueBytes, Cluster cluster);

For a regular producer that’s instantiated, this seems to work correctly. 
However, within the RecordCollectorImpl class, we found that in a streams app, 
the record key and value are serialized prior to being sent as seen below:


final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

streamsProducer.send(serializedRecord, (metadata, exception) -> {

We didn’t want to have to deserialize the value object again within the custom 
partitioner, so is there another way around this? Or is this a bug within the 
streams producer code?

Thanks in advance!
Upesh Desai

Upesh Desai
Senior Software Developer
ude...@itrsgroup.com
www.itrsgroup.com
Internet communications are not secure and therefore the ITRS Group does not 
accept legal responsibility for the contents of this message. Any view or 
opinions presented are solely those of the author and do not necessarily 
represent those of the ITRS Group unless otherwise specifically stated.
[itrs.email.signature]

Reply via email to