divijvaidya commented on code in PR #12685: URL: https://github.com/apache/kafka/pull/12685#discussion_r1248924148
########## clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java: ########## @@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) { data.flip(); return Utils.toArray(data); } + + /** + * Note that this method will modify the position and limit of the input ByteBuffer. + * + * @param topic topic associated with data + * @param data typed data + * @return serialized ByteBuffer + */ + @Override + public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) { Review Comment: Please add the following cases in the unit test to validate the same behaviour before and after this PR: 1. Source ByteBuffer has an underlying array but it is shared with other byte buffers (e.g. this source bytebuffer is carved out using slice) 2. Source ByteBuffer has a non-zero position and limit = remaining. 3. Source ByteBuffer has a non zero position and limit = position. 4. Source Bytebuffer is backed by a direct buffer. 5. Incoming Bytebuffer is a read-only buffer. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ########## @@ -321,13 +322,20 @@ public int partition() { } } - /* + /** * Default hashing function to choose a partition from the serialized key bytes */ public static int partitionForKey(final byte[] serializedKey, final int numPartitions) { return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; } + /** + * Default hashing function to choose a partition from the serialized key bytes + */ + public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) { + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; Review Comment: murmur2() will change the position of the bytebuffer and (correct me if I am wrong) we want to read it again to actually get the key (using remaining()). Wouldn't we end up reading the wrong key since position has already been changed by this method? ########## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ########## @@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) { return h; } + /** + * Generates 32 bit murmur2 hash from ByteBuffer + * @param data ByteBuffer to hash + * @return 32 bit hash of the given ByteBuffer + */ + @SuppressWarnings("fallthrough") + public static int murmur2(ByteBuffer data) { Review Comment: This is a performance sensitive code. Can we have a microbenchmark to ensure that we haven't regressed here? You can use https://github.com/sangupta/murmur/blob/master/src/test/java/com/sangupta/murmur/MurmurPerformanceTests.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org