divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1248924148


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input 
ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   Please add the following cases in the unit test to validate the same 
behaviour before and after this PR:
   1. Source ByteBuffer has an underlying array but it is shared with other 
byte buffers (e.g. this source bytebuffer is carved out using slice)
   2. Source ByteBuffer has a non-zero position and limit = remaining.
   3. Source ByteBuffer has a non zero position and limit = position.
   4. Source Bytebuffer is backed by a direct buffer.
   5. Incoming Bytebuffer is a read-only buffer.
   



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
         }
     }
 
-    /*
+    /**
      * Default hashing function to choose a partition from the serialized key 
bytes
      */
     public static int partitionForKey(final byte[] serializedKey, final int 
numPartitions) {
         return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
     }
 
+    /**
+     * Default hashing function to choose a partition from the serialized key 
bytes
+     */
+    public static int partitionForKey(final ByteBuffer serializedKey, final 
int numPartitions) {
+        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;

Review Comment:
   murmur2() will change the position of the bytebuffer and (correct me if I am 
wrong) we want to read it again to actually get the key (using remaining()). 
Wouldn't we end up reading the wrong key since position has already been 
changed by this method?



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   This is a performance sensitive code. Can we have a microbenchmark to ensure 
that we haven't regressed here? You can use 
https://github.com/sangupta/murmur/blob/master/src/test/java/com/sangupta/murmur/MurmurPerformanceTests.java
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to