divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250526734


##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 
1).put(bytes);
         final ByteBuffer heapBuffer1 = 
ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();

Review Comment:
   > Source ByteBuffer has an underlying array but it is shared with other byte 
buffers (e.g. this source bytebuffer is carved out using slice)
   
   This test case is not covered here. 
   You can use something like:
   ```
   val garbageBytes = "garbage bytes".getBytes(Charset.defaultCharset())
   val extraBytes = "extra bytes".getBytes(Charset.defaultCharset())
   val bigReadBuffer = ByteBuffer.allocate(channel.size().toInt + 
garbageBytes.length + extraBytes.length)
   bigReadBuffer.put(extraBytes)
   val readBuffer = bigReadBuffer.slice()
   serializer.serialize(topic, readBuffer)
   ```
   
   The difference with using duplicate() is that duplicate() retains the 
original position wrt to the underlying array. Using duplicate() won't catch 
the bugs where some code makes an incorrect assumption that underlying array 
offset 0 is the same as pos=0. In duplicate(), it is true but for slice() it 
may not be true.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();

Review Comment:
   nit
   To get consistent results across benchmark runs, it is preferred to seed the 
random number with a constant value. 



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();
+        bytesArray = new byte[bytes];
+        byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : 
ByteBuffer.allocate(bytes);
+        for (int i = 0; i < bytes; i++) {
+            final byte b = (byte) random.nextInt(Byte.MIN_VALUE, 
Byte.MAX_VALUE + 1);
+            byteBuffer.put(i, b);
+            bytesArray[i] = b;
+        }
+
+        if (readonly) {
+            byteBuffer = byteBuffer.asReadOnlyBuffer();
+        }
+    }
+
+    @Benchmark
+    public int byteBufferMurmur2() {
+        return Utils.murmur2(byteBuffer);

Review Comment:
   It is preferred to use blackhole [1] to consume the returned value. If we 
don't, then we risk the probability of compiler optimising this out thinking 
that the result is never used.
   
   [1] 
https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_09_Blackholes.java



##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 
1).put(bytes);
         final ByteBuffer heapBuffer1 = 
ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+        final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
+        final ByteBuffer heapBuffer5 = heapBuffer2.duplicate();
+        final ByteBuffer heapBuffer6 = heapBuffer0.asReadOnlyBuffer();
+        final ByteBuffer heapBuffer7 = heapBuffer1.asReadOnlyBuffer();
+        final ByteBuffer heapBuffer8 = heapBuffer2.asReadOnlyBuffer();
         final ByteBuffer directBuffer0 = 
ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
         final ByteBuffer directBuffer1 = 
ByteBuffer.allocateDirect(bytes.length).put(bytes);
         try (final ByteBufferSerializer serializer = new 
ByteBufferSerializer()) {
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
-            assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer1));
+            assertNull(serializer.serialize(topic, null));
+            assertNull(serializer.serializeToByteBuffer(topic, null));
+            assertArrayEquals(new byte[0], serializer.serialize(topic, 
ByteBuffer.allocate(0)));
+
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer0);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer1);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer2);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer3);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer4);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer5);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer6);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer7);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer8);
+
+            testByteBufferSerCompatibility0(serializer, bytes, directBuffer0);
+            testByteBufferSerCompatibility0(serializer, bytes, directBuffer1);
+        }
+    }
+
+    private void testByteBufferSerCompatibility0(ByteBufferSerializer 
serializer,
+                                                 byte[] expectedBytes,
+                                                 ByteBuffer buffer) {
+        final ByteBuffer duplicatedBuf0 = buffer.duplicate();
+        final ByteBuffer duplicatedBuf1 = buffer.duplicate();
+        assertEquals(duplicatedBuf0, duplicatedBuf1);

Review Comment:
   Isn't this assertion unnecessary? We just duplicated the buffer in the above 
two lines. With this assertion we are actually testing whether duplicate() 
works correct or not! We don't need to test that.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {
+        final int length = data.remaining();
+        final int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        final int length4 = length / 4;
+        data = data.order() == LITTLE_ENDIAN ? data : 
data.slice().order(LITTLE_ENDIAN);

Review Comment:
   I understand that you are doing this because murmur2 works on little endian 
order. Please add a comment explaining it here.
   
   However (and correct me if I am wrong), this line says, "if the current 
buffer does not interpret the underlying ordering of data as little endian, 
then create a new buffer which will interpret the underlying data as little 
endian". This logic seems incorrect to be because what if the underlying data 
is in big endian format. Using a bytebuffer to forcefully view it in little 
endian won't change the underlying data. 
   
   Alternatively, let's do two things:
   1. If the incoming ByteBuffer in Serializer is not in little endian, then we 
need to convert the underlying data  into little endian (and yes that will be a 
data copy).
   2. Remove this statement from here and instead replace with a check to throw 
an exception if it receives a bytebuffer configured as little endian. 



##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 
1).put(bytes);
         final ByteBuffer heapBuffer1 = 
ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+        final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();

Review Comment:
   We should use `@Parameterized` here for all these different parameters that 
we are testing.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   Seems like we have a 13% regression for non-direct buffer cases and similar 
improvement in direct buffer cases. The additional computation we are 
performing for non-direct buffer is `data.arrayOffset() + data.position()` 
which is done once (hopefully, compiler is intelligent enough to that). Could 
we perhaps take it out of the loop just to be double sure.
   
   Any other thoughts on why do we have a perf difference between direct vs 
non-direct buffer here? I expect a performance difference between them when GC 
is involved but in this case we are not creating more objects (except when 
calling slice()). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to