This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new a05156fa961 KAFKA-19955: Fix performance regression in server-side 
assignors (#21058)
a05156fa961 is described below

commit a05156fa96117f92afc46338ca6bc1accaab7839
Author: Sean Quah <[email protected]>
AuthorDate: Wed Dec 3 14:07:39 2025 +0000

    KAFKA-19955: Fix performance regression in server-side assignors (#21058)
    
    The server-side assignors are designed to re-use the previous assignment
    maps where possible and clone them for modification lazily. #20097
    introduced a bug where the assignments would always be re-wrapped in an
    unmodifiable map and so we would end up cloning them repeatedly in the
    assignors when multiple changes need to be made to a member assignment.
    Fix the bug by removing the unmodifiable map wrapping.
    
    Reviewers: David Jacot <[email protected]>
---
 .../group/modern/MemberAssignmentImpl.java         |  5 ++-
 .../group/modern/MemberAssignmentImplTest.java     | 49 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
index 1cbad4b783b..251912036f5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.modern;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -28,10 +27,12 @@ import java.util.Set;
  * The partition assignment for a modern group member.
  *
  * @param partitions The partitions assigned to this member keyed by topicId.
+ *                   The map will not be made immutable, since the server-side 
assignors rely on
+ *                   being able to mutate the map while building new 
assignments.
  */
 public record MemberAssignmentImpl(Map<Uuid, Set<Integer>> partitions) 
implements MemberAssignment {
     public MemberAssignmentImpl {
-        partitions = 
Collections.unmodifiableMap(Objects.requireNonNull(partitions));
+        Objects.requireNonNull(partitions);
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImplTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImplTest.java
new file mode 100644
index 00000000000..fb0d3042ed4
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImplTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+public class MemberAssignmentImplTest {
+
+    @Test
+    public void testPartitionsMutable() {
+        // We depend on the the map inside MemberAssignmentImpl remaining 
mutable in the server-side
+        // assignors, otherwise we end up deep copying the map unnecessarily.
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+
+        HashMap<Uuid, Set<Integer>> partitions = new HashMap<>();
+        partitions.put(topicId1, new HashSet<>());
+
+        MemberAssignmentImpl memberAssignment = new 
MemberAssignmentImpl(partitions);
+
+        // The map should remain mutable.
+        assertDoesNotThrow(() -> memberAssignment.partitions().put(topicId2, 
Set.of(3, 4, 5)));
+
+        // The sets inside the map should remain mutable.
+        assertDoesNotThrow(() -> 
memberAssignment.partitions().get(topicId1).add(3));
+    }
+}

Reply via email to