This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1008dc85dc KAFKA-17747: [2/N] Add compute topic and group hash
(#19523)
a1008dc85dc is described below
commit a1008dc85dcf1490a0e5ef3339f601705f194e4f
Author: PoAn Yang <[email protected]>
AuthorDate: Thu May 15 03:48:45 2025 -0500
KAFKA-17747: [2/N] Add compute topic and group hash (#19523)
* Add `com.dynatrace.hash4j:hash4j:0.22.0` to dependencies.
* Add `computeTopicHash` to `org.apache.kafka.coordinator.group.Utils`.
* If topic name is non-existent, return 0.
* If topic name is existent, use streaming XXH3 to compute topic hash
with magic byte, topic id, topic name, number of partitions, partition
id and sorted racks.
* Add `computeGroupHash` to `org.apache.kafka.coordinator.group.Utils`.
* If topic map is empty, return 0.
* If topic map is not empty, use streaming XXH3 to compute group
metadata hash with sorted topic hashes by topic names.
* Add related unit test.
Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai
<[email protected]>, Sean Quah <[email protected]>, David Jacot
<[email protected]>
---------
Signed-off-by: PoAn Yang <[email protected]>
---
LICENSE-binary | 1 +
build.gradle | 1 +
checkstyle/import-control-group-coordinator.xml | 2 +
gradle/dependencies.gradle | 6 +-
.../org/apache/kafka/coordinator/group/Utils.java | 100 +++++++++
.../apache/kafka/coordinator/group/UtilsTest.java | 237 +++++++++++++++++++++
6 files changed, 345 insertions(+), 2 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 7a35e39889e..09e226835e6 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -212,6 +212,7 @@ License Version 2.0:
- commons-lang3-3.12.0
- commons-logging-1.3.2
- commons-validator-1.9.0
+- hash4j-0.22.0
- jackson-annotations-2.16.2
- jackson-core-2.16.2
- jackson-databind-2.16.2
diff --git a/build.gradle b/build.gradle
index d2ac9aa1dc4..36ced29d0bd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1418,6 +1418,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
+ implementation libs.hash4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
diff --git a/checkstyle/import-control-group-coordinator.xml
b/checkstyle/import-control-group-coordinator.xml
index 8b6a8d99f5e..1f0e91de144 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -51,6 +51,7 @@
<subpackage name="coordinator">
<subpackage name="group">
+ <allow pkg="net.jpountz.xxhash" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
@@ -76,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common" />
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" />
+ <allow pkg="com.dynatrace.hash4j.hashing" />
<allow pkg="org.apache.kafka.metadata" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index fba1023fe48..df2e7221731 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -127,7 +127,8 @@ versions += [
// Also make sure the compression levels in
org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-10",
junitPlatform: "1.10.2",
- hdrHistogram: "2.2.2"
+ hdrHistogram: "2.2.2",
+ hash4j: "0.22.0"
]
libs += [
@@ -225,5 +226,6 @@ libs += [
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
zstd: "com.github.luben:zstd-jni:$versions.zstd",
httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient",
- hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram"
+ hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
+ hash4j: "com.dynatrace.hash4j:hash4j:$versions.hash4j",
]
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
index 1736aab9d88..f7441fce0b1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
@@ -25,10 +25,15 @@ import
org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.ApiMessage;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import
org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import com.dynatrace.hash4j.hashing.HashStream64;
+import com.dynatrace.hash4j.hashing.Hashing;
import com.google.re2j.Pattern;
import com.google.re2j.PatternSyntaxException;
@@ -324,4 +329,99 @@ public class Utils {
regex, ex.getDescription()));
}
}
+
+ /**
+ * The magic byte used to identify the version of topic hash function.
+ */
+ static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+ /**
+ * Computes the hash of the topics in a group.
+ * <p>
+ * The computed hash value is stored as the metadata hash in the
*GroupMetadataValue.
+ * <p>
+ * If there is no topic, the hash value is set to 0.
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value
is the topic hash.
+ * @return The hash of the group.
+ */
+ static long computeGroupHash(Map<String, Long> topicHashes) {
+ if (topicHashes.isEmpty()) {
+ return 0;
+ }
+
+ // Sort entries by topic name
+ List<Map.Entry<String, Long>> sortedEntries = new
ArrayList<>(topicHashes.entrySet());
+ sortedEntries.sort(Map.Entry.comparingByKey());
+
+ HashStream64 hasher = Hashing.xxh3_64().hashStream();
+ for (Map.Entry<String, Long> entry : sortedEntries) {
+ hasher.putLong(entry.getValue());
+ }
+
+ return hasher.getAsLong();
+ }
+
+ /**
+ * Computes the hash of the topic id, name, number of partitions, and
partition racks by streaming XXH3.
+ * <p>
+ * The computed hash value for the topic is utilized in conjunction with
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the
magic byte must be updated to reflect the
+ * new hash version.
+ * <p>
+ * For non-existent topics, the hash value is set to 0.
+ * For existent topics, the hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID with mostSignificantBits and
leastSignificantBits.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack
identifiers.
+ * - Rack identifiers are formatted as
"<length1><value1><length2><value2>" to prevent issues with simple separators.
+ *
+ * @param topicName The topic image.
+ * @param metadataImage The cluster image.
+ * @return The hash of the topic.
+ */
+ static long computeTopicHash(String topicName, MetadataImage
metadataImage) {
+ TopicImage topicImage = metadataImage.topics().getTopic(topicName);
+ if (topicImage == null) {
+ return 0;
+ }
+
+ HashStream64 hasher = Hashing.xxh3_64().hashStream();
+ hasher = hasher
+ .putByte(TOPIC_HASH_MAGIC_BYTE)
+ .putLong(topicImage.id().getMostSignificantBits())
+ .putLong(topicImage.id().getLeastSignificantBits())
+ .putString(topicImage.name())
+ .putInt(topicImage.partitions().size());
+
+ ClusterImage clusterImage = metadataImage.cluster();
+ List<String> racks = new ArrayList<>();
+ for (int i = 0; i < topicImage.partitions().size(); i++) {
+ hasher = hasher.putInt(i);
+ racks.clear(); // Clear the list for reuse
+ for (int replicaId : topicImage.partitions().get(i).replicas) {
+ BrokerRegistration broker = clusterImage.broker(replicaId);
+ if (broker != null) {
+ broker.rack().ifPresent(racks::add);
+ }
+ }
+
+ Collections.sort(racks);
+ for (String rack : racks) {
+ // Format: "<length><value>"
+ // The rack string combination cannot use simple separator
like ",", because there is no limitation for rack character.
+ // If using simple separator like "," it may hit edge case
like ",," and ",,," / ",,," and ",,".
+ // Add length before the rack string to avoid the edge case.
+ hasher = hasher.putInt(rack.length()).putString(rack);
+ }
+ }
+
+ return hasher.getAsLong();
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java
new file mode 100644
index 00000000000..20766b623b5
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+
+import com.dynatrace.hash4j.hashing.Hashing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+ private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+ private static final String FOO_TOPIC_NAME = "foo";
+ private static final String BAR_TOPIC_NAME = "bar";
+ private static final int FOO_NUM_PARTITIONS = 2;
+ private static final MetadataImage FOO_METADATA_IMAGE = new
MetadataImageBuilder()
+ .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+ .addRacks()
+ .build();
+
+ @Test
+ void testNonExistingTopicName() {
+ assertEquals(0, Utils.computeTopicHash("unknown", FOO_METADATA_IMAGE));
+ }
+
+ @Test
+ void testComputeTopicHash() {
+ long result = Utils.computeTopicHash(FOO_TOPIC_NAME,
FOO_METADATA_IMAGE);
+
+ long expected = Hashing.xxh3_64().hashStream()
+ .putByte((byte) 0)
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+ .putString(FOO_TOPIC_NAME)
+ .putInt(FOO_NUM_PARTITIONS)
+ .putInt(0) // partition 0
+ .putInt(5) // length of rack0
+ .putString("rack0") // The first rack in partition 0
+ .putInt(5) // length of rack1
+ .putString("rack1") // The second rack in partition 0
+ .putInt(1) // partition 1
+ .putInt(5) // length of rack0
+ .putString("rack1") // The first rack in partition 1
+ .putInt(5) // length of rack1
+ .putString("rack2") // The second rack in partition 1
+ .getAsLong();
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testComputeTopicHashWithDifferentMagicByte() {
+ long result = Utils.computeTopicHash(FOO_TOPIC_NAME,
FOO_METADATA_IMAGE);
+
+ long expected = Hashing.xxh3_64().hashStream()
+ .putByte((byte) 1) // different magic byte
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+ .putString(FOO_TOPIC_NAME)
+ .putInt(FOO_NUM_PARTITIONS)
+ .putInt(0) // partition 0
+ .putInt(5) // length of rack0
+ .putString("rack0") // The first rack in partition 0
+ .putInt(5) // length of rack1
+ .putString("rack1") // The second rack in partition 0
+ .putInt(1) // partition 1
+ .putInt(5) // length of rack0
+ .putString("rack1") // The first rack in partition 1
+ .putInt(5) // length of rack1
+ .putString("rack2") // The second rack in partition 1
+ .getAsLong();
+ assertNotEquals(expected, result);
+ }
+
+ @Test
+ void testComputeTopicHashWithLeastSignificantBitsFirst() {
+ long result = Utils.computeTopicHash(FOO_TOPIC_NAME,
FOO_METADATA_IMAGE);
+
+ long expected = Hashing.xxh3_64().hashStream()
+ .putByte((byte) 0)
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // different order
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+ .putString(FOO_TOPIC_NAME)
+ .putInt(FOO_NUM_PARTITIONS)
+ .putInt(0) // partition 0
+ .putInt(5) // length of rack0
+ .putString("rack0") // The first rack in partition 0
+ .putInt(5) // length of rack1
+ .putString("rack1") // The second rack in partition 0
+ .putInt(1) // partition 1
+ .putInt(5) // length of rack0
+ .putString("rack1") // The first rack in partition 1
+ .putInt(5) // length of rack1
+ .putString("rack2") // The second rack in partition 1
+ .getAsLong();
+ assertNotEquals(expected, result);
+ }
+
+ @Test
+ void testComputeTopicHashWithDifferentPartitionOrder() {
+ long result = Utils.computeTopicHash(FOO_TOPIC_NAME,
FOO_METADATA_IMAGE);
+
+ long expected = Hashing.xxh3_64().hashStream()
+ .putByte((byte) 1)
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+ .putString(FOO_TOPIC_NAME)
+ .putInt(FOO_NUM_PARTITIONS)
+ .putInt(1) // partition 1
+ .putInt(5) // length of rack0
+ .putString("rack1") // The first rack in partition 1
+ .putInt(5) // length of rack1
+ .putString("rack2") // The second rack in partition 1
+ .putInt(0) // partition 0
+ .putInt(5) // length of rack0
+ .putString("rack0") // The first rack in partition 0
+ .putInt(5) // length of rack1
+ .putString("rack1") // The second rack in partition 0
+ .getAsLong();
+ assertNotEquals(expected, result);
+ }
+
+ @Test
+ void testComputeTopicHashWithDifferentRackOrder() {
+ long result = Utils.computeTopicHash(FOO_TOPIC_NAME,
FOO_METADATA_IMAGE);
+
+ long expected = Hashing.xxh3_64().hashStream()
+ .putByte((byte) 0)
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits())
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits())
+ .putString(FOO_TOPIC_NAME)
+ .putInt(FOO_NUM_PARTITIONS)
+ .putInt(0) // partition 0
+ .putInt(5) // length of rack0
+ .putString("rack1") // The second rack in partition 0
+ .putInt(5) // length of rack1
+ .putString("rack0") // The first rack in partition 0
+ .putInt(1) // partition 1
+ .putInt(5) // length of rack0
+ .putString("rack1") // The first rack in partition 1
+ .putInt(5) // length of rack1
+ .putString("rack2") // The second rack in partition 1
+ .getAsLong();
+ assertNotEquals(expected, result);
+ }
+
+ @ParameterizedTest
+ @MethodSource("differentFieldGenerator")
+ void testComputeTopicHashWithDifferentField(MetadataImage differentImage) {
+ long result = Utils.computeTopicHash(FOO_TOPIC_NAME,
FOO_METADATA_IMAGE);
+
+ assertNotEquals(
+ Utils.computeTopicHash(FOO_TOPIC_NAME, differentImage),
+ result
+ );
+ }
+
+ private static Stream<Arguments> differentFieldGenerator() {
+ return Stream.of(
+ Arguments.of(
+ new MetadataImageBuilder() // different topic id
+ .addTopic(Uuid.randomUuid(), FOO_TOPIC_NAME,
FOO_NUM_PARTITIONS)
+ .addRacks()
+ .build()
+ ),
+ Arguments.of(new MetadataImageBuilder() // different topic name
+ .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
+ .addRacks()
+ .build()
+ ),
+ Arguments.of(new MetadataImageBuilder() // different partitions
+ .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
+ .addRacks()
+ .build()
+ ),
+ Arguments.of(new MetadataImageBuilder() // different racks
+ .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+ .build()
+ )
+ );
+ }
+
+ @Test
+ void testComputeGroupHashWithEmptyMap() {
+ assertEquals(0, Utils.computeGroupHash(Map.of()));
+ }
+
+ @Test
+ void testComputeGroupHashWithDifferentOrder() {
+ Map<String, Long> ascendTopicHashes = new LinkedHashMap<>();
+ ascendTopicHashes.put(BAR_TOPIC_NAME, 123L);
+ ascendTopicHashes.put(FOO_TOPIC_NAME, 456L);
+
+ Map<String, Long> descendTopicHashes = new LinkedHashMap<>();
+ descendTopicHashes.put(FOO_TOPIC_NAME, 456L);
+ descendTopicHashes.put(BAR_TOPIC_NAME, 123L);
+ assertEquals(Utils.computeGroupHash(ascendTopicHashes),
Utils.computeGroupHash(descendTopicHashes));
+ }
+
+ @Test
+ void testComputeGroupHashWithSameKeyButDifferentValue() {
+ Map<String, Long> map1 = Map.of(
+ BAR_TOPIC_NAME, 123L,
+ FOO_TOPIC_NAME, 456L
+ );
+
+ Map<String, Long> map2 = Map.of(
+ BAR_TOPIC_NAME, 456L,
+ FOO_TOPIC_NAME, 123L
+ );
+ assertNotEquals(Utils.computeGroupHash(map1),
Utils.computeGroupHash(map2));
+ }
+}