This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9273cdc491d MINOR: Bytes lexicographic comparator could use compiler 
builtin (#21073)
9273cdc491d is described below

commit 9273cdc491d88c008c68c280c1af7395ab89c6ef
Author: Steven Schlansker <[email protected]>
AuthorDate: Fri Dec 12 12:32:44 2025 -0800

    MINOR: Bytes lexicographic comparator could use compiler builtin (#21073)
    
    The `Arrays.compare` method has a vectorized compiler intrinsic
    available and should offer better performance than a hand-rolled loop.
    
    The JDK implementation is at least able to compare 8 bytes at a time
    instead of 1 byte at a time, and avoids repeated bounds-checking.
    
    This operation is performance-sensitive if you use in-memory stores,
    since it is used to navigate the TreeMap backing the store.
    
    Reviewers: Sean Quah <[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../java/org/apache/kafka/common/utils/Bytes.java  |  10 +-
 .../org/apache/kafka/common/utils/BytesTest.java   |  35 +++++++
 .../kafka/jmh/util/BytesCompareBenchmark.java      | 105 +++++++++++++++++++++
 3 files changed, 141 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 5955312dbb0..67195fcf131 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -194,17 +194,9 @@ public class Bytes implements Comparable<Bytes> {
                 return 0;
             }
 
-            // similar to Arrays.compare() but considers offset and length
             int end1 = offset1 + length1;
             int end2 = offset2 + length2;
-            for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) 
{
-                int a = buffer1[i] & 0xff;
-                int b = buffer2[j] & 0xff;
-                if (a != b) {
-                    return a - b;
-                }
-            }
-            return length1 - length2;
+            return Arrays.compareUnsigned(buffer1, offset1, end1, buffer2, 
offset2, end2);
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
index 8bf8d872262..2c8b591c8db 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
 
 import org.junit.jupiter.api.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Comparator;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -25,6 +26,7 @@ import java.util.TreeMap;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class BytesTest {
 
@@ -81,4 +83,37 @@ public class BytesTest {
 
         assertEquals(subMapExpected.keySet(), subMapResults.keySet());
     }
+
+    @Test
+    public void testBytesLexicographicCases() {
+        assertEquals(0, cmp("", ""));
+        assertTrue(cmp("", "aaa") < 0);
+        assertTrue(cmp("aaa", "") > 0);
+
+        assertEquals(0, cmp("aaa", "aaa"));
+        assertTrue(cmp("aaa", "bbb") < 0);
+        assertTrue(cmp("bbb", "aaa") > 0);
+
+        assertTrue(cmp("aaaaaa", "bbb") < 0);
+        assertTrue(cmp("aaa", "bbbbbb") < 0);
+        assertTrue(cmp("bbbbbb", "aaa") > 0);
+        assertTrue(cmp("bbb", "aaaaaa") > 0);
+
+        assertTrue(cmp("common_prefix_aaa", "common_prefix_bbb") < 0);
+        assertTrue(cmp("common_prefix_bbb", "common_prefix_aaa") > 0);
+
+        assertTrue(cmp("common_prefix_aaaaaa", "common_prefix_bbb") < 0);
+        assertTrue(cmp("common_prefix_aaa", "common_prefix_bbbbbb") < 0);
+        assertTrue(cmp("common_prefix_bbbbbb", "common_prefix_aaa") > 0);
+        assertTrue(cmp("common_prefix_bbb", "common_prefix_aaaaaa") > 0);
+
+        assertTrue(cmp("common_prefix", "common_prefix_aaa") < 0);
+        assertTrue(cmp("common_prefix_aaa", "common_prefix") > 0);
+    }
+
+    private int cmp(String l, String r) {
+        return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+            l.getBytes(StandardCharsets.UTF_8),
+            r.getBytes(StandardCharsets.UTF_8));
+    }
 }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
new file mode 100644
index 00000000000..c994560f69e
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BytesCompareBenchmark.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(2)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public class BytesCompareBenchmark {
+    private static final int TREE_SIZE = 10240;
+
+    @Param({"8", "16", "32", "128", "1024"})
+    private int bytes;
+
+    private byte[][] tv;
+    private TreeMap<byte[], Integer> oldMap = new TreeMap<>(new 
HandwrittenLexicoComparator());
+    private TreeMap<byte[], Integer> newMap = new 
TreeMap<>(Bytes.BYTES_LEXICO_COMPARATOR);
+
+    @Setup
+    public void setup() {
+        tv = new byte[TREE_SIZE][bytes];
+        for (int i = 0; i < TREE_SIZE; i++) {
+            tv[i][bytes - 4] = (byte) (i >>> 24);
+            tv[i][bytes - 3] = (byte) (i >>> 16);
+            tv[i][bytes - 2] = (byte) (i >>> 8);
+            tv[i][bytes - 1] = (byte) i;
+            oldMap.put(tv[i], i);
+            newMap.put(tv[i], i);
+        }
+    }
+
+    @Benchmark
+    public void samePrefixLexicoCustom(Blackhole bh) {
+        for (int i = 0; i < TREE_SIZE; i++) {
+            bh.consume(oldMap.get(tv[i]));
+        }
+    }
+
+    @Benchmark
+    public void samePrefixLexicoJdk(Blackhole bh) {
+        for (int i = 0; i < TREE_SIZE; i++) {
+            bh.consume(newMap.get(tv[i]));
+        }
+    }
+
+    static class HandwrittenLexicoComparator implements 
Bytes.ByteArrayComparator {
+        @Override
+        public int compare(byte[] buffer1, byte[] buffer2) {
+            return compare(buffer1, 0, buffer1.length, buffer2, 0, 
buffer2.length);
+        }
+
+        public int compare(final byte[] buffer1, int offset1, int length1,
+                           final byte[] buffer2, int offset2, int length2) {
+
+            // short circuit equal case
+            if (buffer1 == buffer2 &&
+                    offset1 == offset2 &&
+                    length1 == length2) {
+                return 0;
+            }
+
+            int end1 = offset1 + length1;
+            int end2 = offset2 + length2;
+            for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) 
{
+                int a = buffer1[i] & 0xff;
+                int b = buffer2[j] & 0xff;
+                if (a != b) {
+                    return a - b;
+                }
+            }
+            return length1 - length2;
+        }
+    }
+}

Reply via email to