This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1008dc85dc KAFKA-17747: [2/N] Add compute topic and group hash 
(#19523)
a1008dc85dc is described below

commit a1008dc85dcf1490a0e5ef3339f601705f194e4f
Author: PoAn Yang <[email protected]>
AuthorDate: Thu May 15 03:48:45 2025 -0500

    KAFKA-17747: [2/N] Add compute topic and group hash (#19523)
    
    * Add `com.dynatrace.hash4j:hash4j:0.22.0` to dependencies.
    * Add `computeTopicHash` to `org.apache.kafka.coordinator.group.Utils`.
      * If topic name is non-existent, return 0.
      * If topic name is existent, use streaming XXH3 to compute topic hash
    with magic byte, topic id, topic name, number of partitions, partition
    id and sorted racks.
    * Add `computeGroupHash` to `org.apache.kafka.coordinator.group.Utils`.
      * If topic map is empty, return 0.
      * If topic map is not empty, use streaming XXH3 to compute group
    metadata hash with sorted topic hashes by topic names.
    * Add related unit test.
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Sean Quah <[email protected]>, David Jacot 
<[email protected]>
    
    ---------
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 LICENSE-binary                                     |   1 +
 build.gradle                                       |   1 +
 checkstyle/import-control-group-coordinator.xml    |   2 +
 gradle/dependencies.gradle                         |   6 +-
 .../org/apache/kafka/coordinator/group/Utils.java  | 100 +++++++++
 .../apache/kafka/coordinator/group/UtilsTest.java  | 237 +++++++++++++++++++++
 6 files changed, 345 insertions(+), 2 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 7a35e39889e..09e226835e6 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -212,6 +212,7 @@ License Version 2.0:
 - commons-lang3-3.12.0
 - commons-logging-1.3.2
 - commons-validator-1.9.0
+- hash4j-0.22.0
 - jackson-annotations-2.16.2
 - jackson-core-2.16.2
 - jackson-databind-2.16.2
diff --git a/build.gradle b/build.gradle
index d2ac9aa1dc4..36ced29d0bd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1418,6 +1418,7 @@ project(':group-coordinator') {
     implementation libs.hdrHistogram
     implementation libs.re2j
     implementation libs.slf4jApi
+    implementation libs.hash4j
 
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':server-common').sourceSets.test.output
diff --git a/checkstyle/import-control-group-coordinator.xml 
b/checkstyle/import-control-group-coordinator.xml
index 8b6a8d99f5e..1f0e91de144 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -51,6 +51,7 @@
 
     <subpackage name="coordinator">
         <subpackage name="group">
+            <allow pkg="net.jpountz.xxhash" />
             <allow pkg="org.apache.kafka.clients.consumer" />
             <allow pkg="org.apache.kafka.common.annotation" />
             <allow pkg="org.apache.kafka.common.config" />
@@ -76,6 +77,7 @@
             <allow pkg="org.apache.kafka.coordinator.common" />
             <allow pkg="org.apache.kafka.coordinator.common.runtime" />
             <allow pkg="com.google.re2j" />
+            <allow pkg="com.dynatrace.hash4j.hashing" />
             <allow pkg="org.apache.kafka.metadata" />
             <subpackage name="metrics">
                 <allow pkg="com.yammer.metrics"/>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index fba1023fe48..df2e7221731 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -127,7 +127,8 @@ versions += [
   // Also make sure the compression levels in 
org.apache.kafka.common.record.CompressionType are still valid
   zstd: "1.5.6-10",
   junitPlatform: "1.10.2",
-  hdrHistogram: "2.2.2"
+  hdrHistogram: "2.2.2",
+  hash4j: "0.22.0"
 ]
 
 libs += [
@@ -225,5 +226,6 @@ libs += [
   mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
   zstd: "com.github.luben:zstd-jni:$versions.zstd",
   httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient",
-  hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram"
+  hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
+  hash4j: "com.dynatrace.hash4j:hash4j:$versions.hash4j",
 ]
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
index 1736aab9d88..f7441fce0b1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
@@ -25,10 +25,15 @@ import 
org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.protocol.ApiMessage;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
+import com.dynatrace.hash4j.hashing.HashStream64;
+import com.dynatrace.hash4j.hashing.Hashing;
 import com.google.re2j.Pattern;
 import com.google.re2j.PatternSyntaxException;
 
@@ -324,4 +329,99 @@ public class Utils {
                     regex, ex.getDescription()));
         }
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+    /**
+     * Computes the hash of the topics in a group.
+     * <p>
+     * The computed hash value is stored as the metadata hash in the 
*GroupMetadataValue.
+     * <p>
+     * If there is no topic, the hash value is set to 0.
+     * The hashing process involves the following steps:
+     * 1. Sort the topic hashes by topic name.
+     * 2. Write each topic hash in order.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        if (topicHashes.isEmpty()) {
+            return 0;
+        }
+
+        // Sort entries by topic name
+        List<Map.Entry<String, Long>> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+        sortedEntries.sort(Map.Entry.comparingByKey());
+
+        HashStream64 hasher = Hashing.xxh3_64().hashStream();
+        for (Map.Entry<String, Long> entry : sortedEntries) {
+            hasher.putLong(entry.getValue());
+        }
+
+        return hasher.getAsLong();
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by streaming XXH3.
+     * <p>
+     * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+     * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+     * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+     * new hash version.
+     * <p>
+     * For non-existent topics, the hash value is set to 0.
+     * For existent topics, the hashing process involves the following steps:
+     * 1. Write a magic byte to denote the version of the hash function.
+     * 2. Write the hash code of the topic ID with mostSignificantBits and 
leastSignificantBits.
+     * 3. Write the topic name.
+     * 4. Write the number of partitions associated with the topic.
+     * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+     * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
+     *
+     * @param topicName     The topic image.
+     * @param metadataImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
+        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
+        if (topicImage == null) {
+            return 0;
+        }
+
+        HashStream64 hasher = Hashing.xxh3_64().hashStream();
+        hasher = hasher
+            .putByte(TOPIC_HASH_MAGIC_BYTE)
+            .putLong(topicImage.id().getMostSignificantBits())
+            .putLong(topicImage.id().getLeastSignificantBits())
+            .putString(topicImage.name())
+            .putInt(topicImage.partitions().size());
+
+        ClusterImage clusterImage = metadataImage.cluster();
+        List<String> racks = new ArrayList<>();
+        for (int i = 0; i < topicImage.partitions().size(); i++) {
+            hasher = hasher.putInt(i);
+            racks.clear(); // Clear the list for reuse
+            for (int replicaId : topicImage.partitions().get(i).replicas) {
+                BrokerRegistration broker = clusterImage.broker(replicaId);
+                if (broker != null) {
+                    broker.rack().ifPresent(racks::add);
+                }
+            }
+
+            Collections.sort(racks);
+            for (String rack : racks) {
+                // Format: "<length><value>"
+                // The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
+                // If using simple separator like "," it may hit edge case 
like ",," and ",,," / ",,," and ",,".
+                // Add length before the rack string to avoid the edge case.
+                hasher = hasher.putInt(rack.length()).putString(rack);
+            }
+        }
+
+        return hasher.getAsLong();
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java
new file mode 100644
index 00000000000..20766b623b5
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+
+import com.dynatrace.hash4j.hashing.Hashing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+    private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+    private static final String FOO_TOPIC_NAME = "foo";
+    private static final String BAR_TOPIC_NAME = "bar";
+    private static final int FOO_NUM_PARTITIONS = 2;
+    private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+        .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+        .addRacks()
+        .build();
+
+    @Test
+    void testNonExistingTopicName() {
+        assertEquals(0, Utils.computeTopicHash("unknown", FOO_METADATA_IMAGE));
+    }
+
+    @Test
+    void testComputeTopicHash() {
+        long result = Utils.computeTopicHash(FOO_TOPIC_NAME, 
FOO_METADATA_IMAGE);
+
+        long expected = Hashing.xxh3_64().hashStream()
+            .putByte((byte) 0)
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+            .putString(FOO_TOPIC_NAME)
+            .putInt(FOO_NUM_PARTITIONS)
+            .putInt(0) // partition 0
+            .putInt(5) // length of rack0
+            .putString("rack0") // The first rack in partition 0
+            .putInt(5) // length of rack1
+            .putString("rack1") // The second rack in partition 0
+            .putInt(1) // partition 1
+            .putInt(5) // length of rack0
+            .putString("rack1") // The first rack in partition 1
+            .putInt(5) // length of rack1
+            .putString("rack2") // The second rack in partition 1
+            .getAsLong();
+        assertEquals(expected, result);
+    }
+
+    @Test
+    void testComputeTopicHashWithDifferentMagicByte() {
+        long result = Utils.computeTopicHash(FOO_TOPIC_NAME, 
FOO_METADATA_IMAGE);
+
+        long expected = Hashing.xxh3_64().hashStream()
+            .putByte((byte) 1) // different magic byte
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+            .putString(FOO_TOPIC_NAME)
+            .putInt(FOO_NUM_PARTITIONS)
+            .putInt(0) // partition 0
+            .putInt(5) // length of rack0
+            .putString("rack0") // The first rack in partition 0
+            .putInt(5) // length of rack1
+            .putString("rack1") // The second rack in partition 0
+            .putInt(1) // partition 1
+            .putInt(5) // length of rack0
+            .putString("rack1") // The first rack in partition 1
+            .putInt(5) // length of rack1
+            .putString("rack2") // The second rack in partition 1
+            .getAsLong();
+        assertNotEquals(expected, result);
+    }
+
+    @Test
+    void testComputeTopicHashWithLeastSignificantBitsFirst() {
+        long result = Utils.computeTopicHash(FOO_TOPIC_NAME, 
FOO_METADATA_IMAGE);
+
+        long expected = Hashing.xxh3_64().hashStream()
+            .putByte((byte) 0)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // different order
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+            .putString(FOO_TOPIC_NAME)
+            .putInt(FOO_NUM_PARTITIONS)
+            .putInt(0) // partition 0
+            .putInt(5) // length of rack0
+            .putString("rack0") // The first rack in partition 0
+            .putInt(5) // length of rack1
+            .putString("rack1") // The second rack in partition 0
+            .putInt(1) // partition 1
+            .putInt(5) // length of rack0
+            .putString("rack1") // The first rack in partition 1
+            .putInt(5) // length of rack1
+            .putString("rack2") // The second rack in partition 1
+            .getAsLong();
+        assertNotEquals(expected, result);
+    }
+
+    @Test
+    void testComputeTopicHashWithDifferentPartitionOrder() {
+        long result = Utils.computeTopicHash(FOO_TOPIC_NAME, 
FOO_METADATA_IMAGE);
+
+        long expected = Hashing.xxh3_64().hashStream()
+            .putByte((byte) 1)
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+            .putString(FOO_TOPIC_NAME)
+            .putInt(FOO_NUM_PARTITIONS)
+            .putInt(1) // partition 1
+            .putInt(5) // length of rack0
+            .putString("rack1") // The first rack in partition 1
+            .putInt(5) // length of rack1
+            .putString("rack2") // The second rack in partition 1
+            .putInt(0) // partition 0
+            .putInt(5) // length of rack0
+            .putString("rack0") // The first rack in partition 0
+            .putInt(5) // length of rack1
+            .putString("rack1") // The second rack in partition 0
+            .getAsLong();
+        assertNotEquals(expected, result);
+    }
+
+    @Test
+    void testComputeTopicHashWithDifferentRackOrder() {
+        long result = Utils.computeTopicHash(FOO_TOPIC_NAME, 
FOO_METADATA_IMAGE);
+
+        long expected = Hashing.xxh3_64().hashStream()
+            .putByte((byte) 0)
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+            .putString(FOO_TOPIC_NAME)
+            .putInt(FOO_NUM_PARTITIONS)
+            .putInt(0) // partition 0
+            .putInt(5) // length of rack0
+            .putString("rack1") // The second rack in partition 0
+            .putInt(5) // length of rack1
+            .putString("rack0") // The first rack in partition 0
+            .putInt(1) // partition 1
+            .putInt(5) // length of rack0
+            .putString("rack1") // The first rack in partition 1
+            .putInt(5) // length of rack1
+            .putString("rack2") // The second rack in partition 1
+            .getAsLong();
+        assertNotEquals(expected, result);
+    }
+
+    @ParameterizedTest
+    @MethodSource("differentFieldGenerator")
+    void testComputeTopicHashWithDifferentField(MetadataImage differentImage) {
+        long result = Utils.computeTopicHash(FOO_TOPIC_NAME, 
FOO_METADATA_IMAGE);
+
+        assertNotEquals(
+            Utils.computeTopicHash(FOO_TOPIC_NAME, differentImage),
+            result
+        );
+    }
+
+    private static Stream<Arguments> differentFieldGenerator() {
+        return Stream.of(
+            Arguments.of(
+                new MetadataImageBuilder() // different topic id
+                    .addTopic(Uuid.randomUuid(), FOO_TOPIC_NAME, 
FOO_NUM_PARTITIONS)
+                    .addRacks()
+                    .build()
+            ),
+            Arguments.of(new MetadataImageBuilder() // different topic name
+                    .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
+                    .addRacks()
+                    .build()
+            ),
+            Arguments.of(new MetadataImageBuilder() // different partitions
+                    .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
+                    .addRacks()
+                    .build()
+            ),
+            Arguments.of(new MetadataImageBuilder() // different racks
+                    .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+                    .build()
+            )
+        );
+    }
+
+    @Test
+    void testComputeGroupHashWithEmptyMap() {
+        assertEquals(0, Utils.computeGroupHash(Map.of()));
+    }
+
+    @Test
+    void testComputeGroupHashWithDifferentOrder() {
+        Map<String, Long> ascendTopicHashes = new LinkedHashMap<>();
+        ascendTopicHashes.put(BAR_TOPIC_NAME, 123L);
+        ascendTopicHashes.put(FOO_TOPIC_NAME, 456L);
+
+        Map<String, Long> descendTopicHashes = new LinkedHashMap<>();
+        descendTopicHashes.put(FOO_TOPIC_NAME, 456L);
+        descendTopicHashes.put(BAR_TOPIC_NAME, 123L);
+        assertEquals(Utils.computeGroupHash(ascendTopicHashes), 
Utils.computeGroupHash(descendTopicHashes));
+    }
+
+    @Test
+    void testComputeGroupHashWithSameKeyButDifferentValue() {
+        Map<String, Long> map1 = Map.of(
+            BAR_TOPIC_NAME, 123L,
+            FOO_TOPIC_NAME, 456L
+        );
+
+        Map<String, Long> map2 = Map.of(
+            BAR_TOPIC_NAME, 456L,
+            FOO_TOPIC_NAME, 123L
+        );
+        assertNotEquals(Utils.computeGroupHash(map1), 
Utils.computeGroupHash(map2));
+    }
+}

Reply via email to