This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a9dff623cc7 KAFKA-19955: Fix performance regression in server-side
assignors (#21058)
a9dff623cc7 is described below
commit a9dff623cc75a7a7cfb958051e24768d624c2370
Author: Sean Quah <[email protected]>
AuthorDate: Wed Dec 3 14:07:39 2025 +0000
KAFKA-19955: Fix performance regression in server-side assignors (#21058)
The server-side assignors are designed to re-use the previous assignment
maps where possible and clone them for modification lazily. #20097
introduced a bug where the assignments would always be re-wrapped in an
unmodifiable map and so we would end up cloning them repeatedly in the
assignors when multiple changes need to be made to a member assignment.
Fix the bug by removing the unmodifiable map wrapping.
Reviewers: David Jacot <[email protected]>
---
.../group/modern/MemberAssignmentImpl.java | 5 ++-
.../group/modern/MemberAssignmentImplTest.java | 49 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 2 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
index 1cbad4b783b..251912036f5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.modern;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -28,10 +27,12 @@ import java.util.Set;
* The partition assignment for a modern group member.
*
* @param partitions The partitions assigned to this member keyed by topicId.
+ * The map will not be made immutable, since the server-side
assignors rely on
+ * being able to mutate the map while building new
assignments.
*/
public record MemberAssignmentImpl(Map<Uuid, Set<Integer>> partitions)
implements MemberAssignment {
public MemberAssignmentImpl {
- partitions =
Collections.unmodifiableMap(Objects.requireNonNull(partitions));
+ Objects.requireNonNull(partitions);
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImplTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImplTest.java
new file mode 100644
index 00000000000..fb0d3042ed4
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImplTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+public class MemberAssignmentImplTest {
+
+ @Test
+ public void testPartitionsMutable() {
+ // We depend on the the map inside MemberAssignmentImpl remaining
mutable in the server-side
+ // assignors, otherwise we end up deep copying the map unnecessarily.
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ HashMap<Uuid, Set<Integer>> partitions = new HashMap<>();
+ partitions.put(topicId1, new HashSet<>());
+
+ MemberAssignmentImpl memberAssignment = new
MemberAssignmentImpl(partitions);
+
+ // The map should remain mutable.
+ assertDoesNotThrow(() -> memberAssignment.partitions().put(topicId2,
Set.of(3, 4, 5)));
+
+ // The sets inside the map should remain mutable.
+ assertDoesNotThrow(() ->
memberAssignment.partitions().get(topicId1).add(3));
+ }
+}