This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c2a0965b5e KAFKA-20066: Implement KIP-1251: Assignment epochs for 
consumer groups [1/N] (#21557)
9c2a0965b5e is described below

commit 9c2a0965b5e38f4ee7eb5c4d4906d5fe0bd92356
Author: Lucy Liu <[email protected]>
AuthorDate: Fri Feb 27 11:02:08 2026 -0600

    KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups 
[1/N] (#21557)
    
    # Summary
    This PR moves `assignedPartitions` out of `ModernConsumerMember`
    interface, add it as independent properties for `ShareGroupMember` and
    `ConsumerGroupMember`.
    
    ## Reason for the change
    In an upcoming PR, the structure of
    `ConsumerGroupMember#assignedPartitions` and
    `ConsumerGroupMember#partitionsPendingRevocation` will be changed to
    include epoch information as
    ```
    Map<Uuid, Map<Integer, Integer>>
    ```
    This differs from the `ShareGroupMember#assignedPartitions` structure,
    which remains `Map<Uuid, Set<Integer>>`. Therefore, it is no longer
    appropriate to have this as a shared field in the base class.
    
    Reviewers: Sean Quah <[email protected]>, Lucas Brutschy
     <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  5 +--
 .../group/modern/ModernGroupMember.java            | 29 +------------
 .../group/modern/consumer/ConsumerGroupMember.java | 26 +++++++++++-
 .../group/modern/share/ShareGroupMember.java       | 26 +++++++++++-
 .../kafka/jmh/assignor/AssignorBenchmarkUtils.java | 48 ++++++++++++++++++----
 .../jmh/assignor/ServerSideAssignorBenchmark.java  |  2 +-
 .../jmh/assignor/ShareGroupAssignorBenchmark.java  |  2 +-
 .../assignor/TargetAssignmentBuilderBenchmark.java |  2 +-
 8 files changed, 94 insertions(+), 46 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e21304178c3..4494a666aef 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -239,7 +239,6 @@ import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
-import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
@@ -2436,7 +2435,7 @@ public class GroupMetadataManager {
         //    to detect a full request as those must be set in a full request.
         // 2. The member's assignment has been updated.
         boolean isFullRequest = rebalanceTimeoutMs != -1 && 
(subscribedTopicNames != null || subscribedTopicRegex != null) && 
ownedTopicPartitions != null;
-        if (memberEpoch == 0 || isFullRequest || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+        if (memberEpoch == 0 || isFullRequest || 
ConsumerGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
             
response.setAssignment(ConsumerGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
         }
 
@@ -2808,7 +2807,7 @@ public class GroupMetadataManager {
         //    (subscribedTopicNames) to detect a full request as those must be 
set in a full request.
         // 2. The member's assignment has been updated.
         boolean isFullRequest = subscribedTopicNames != null;
-        if (memberEpoch == 0 || isFullRequest || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+        if (memberEpoch == 0 || isFullRequest || 
ShareGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
             
response.setAssignment(ShareGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
         }
         return new CoordinatorResult<>(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
index 1b844d24ccd..12e652c8136 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.coordinator.group.modern;
 
-import org.apache.kafka.common.Uuid;
-
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -71,11 +68,6 @@ public abstract class ModernGroupMember {
      */
     protected Set<String> subscribedTopicNames;
 
-    /**
-     * The partitions assigned to this member.
-     */
-    protected Map<Uuid, Set<Integer>> assignedPartitions;
-
     protected ModernGroupMember(
         String memberId,
         int memberEpoch,
@@ -85,8 +77,7 @@ public abstract class ModernGroupMember {
         String clientId,
         String clientHost,
         Set<String> subscribedTopicNames,
-        MemberState state,
-        Map<Uuid, Set<Integer>> assignedPartitions
+        MemberState state
     ) {
         this.memberId = memberId;
         this.memberEpoch = memberEpoch;
@@ -97,7 +88,6 @@ public abstract class ModernGroupMember {
         this.clientId = clientId;
         this.clientHost = clientHost;
         this.subscribedTopicNames = subscribedTopicNames;
-        this.assignedPartitions = assignedPartitions;
     }
 
     /**
@@ -169,21 +159,4 @@ public abstract class ModernGroupMember {
     public boolean isReconciledTo(int targetAssignmentEpoch) {
         return state == MemberState.STABLE && memberEpoch == 
targetAssignmentEpoch;
     }
-
-    /**
-     * @return The set of assigned partitions.
-     */
-    public Map<Uuid, Set<Integer>> assignedPartitions() {
-        return assignedPartitions;
-    }
-
-    /**
-     * @return True of the two provided members have different assigned 
partitions.
-     */
-    public static boolean hasAssignedPartitionsChanged(
-        ModernGroupMember member1,
-        ModernGroupMember member2
-    ) {
-        return 
!member1.assignedPartitions().equals(member2.assignedPartitions());
-    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 9b5f0c1f6c8..b2d9d0ea492 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -263,6 +263,11 @@ public class ConsumerGroupMember extends ModernGroupMember 
{
      */
     private final String serverAssignorName;
 
+    /**
+     * The partitions assigned to this member.
+     */
+    private final Map<Uuid, Set<Integer>> assignedPartitions;
+
     /**
      * The partitions being revoked by this member.
      */
@@ -299,12 +304,12 @@ public class ConsumerGroupMember extends 
ModernGroupMember {
             clientId,
             clientHost,
             subscribedTopicNames,
-            state,
-            assignedPartitions
+            state
         );
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.subscribedTopicRegex = subscribedTopicRegex;
         this.serverAssignorName = serverAssignorName;
+        this.assignedPartitions = assignedPartitions;
         this.partitionsPendingRevocation = partitionsPendingRevocation;
         this.classicMemberMetadata = classicMemberMetadata;
     }
@@ -330,6 +335,13 @@ public class ConsumerGroupMember extends ModernGroupMember 
{
         return Optional.ofNullable(serverAssignorName);
     }
 
+    /**
+     * @return The set of assigned partitions.
+     */
+    public Map<Uuid, Set<Integer>> assignedPartitions() {
+        return assignedPartitions;
+    }
+
     /**
      * @return The set of partitions awaiting revocation from the member.
      */
@@ -337,6 +349,16 @@ public class ConsumerGroupMember extends ModernGroupMember 
{
         return partitionsPendingRevocation;
     }
 
+    /**
+     * @return True if the two provided members have different assigned 
partitions.
+     */
+    public static boolean hasAssignedPartitionsChanged(
+        ConsumerGroupMember member1,
+        ConsumerGroupMember member2
+    ) {
+        return 
!member1.assignedPartitions().equals(member2.assignedPartitions());
+    }
+
     /**
      * @return The supported classic protocols converted to 
JoinGroupRequestProtocolCollection.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
index 2bb75578c7b..bc40d5025fc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java
@@ -170,6 +170,11 @@ public class ShareGroupMember extends ModernGroupMember {
         }
     }
 
+    /**
+     * The partitions assigned to this member.
+     */
+    private final Map<Uuid, Set<Integer>> assignedPartitions;
+
     private ShareGroupMember(
           String memberId,
           int memberEpoch,
@@ -190,9 +195,26 @@ public class ShareGroupMember extends ModernGroupMember {
             clientId,
             clientHost,
             subscribedTopicNames,
-            state,
-            assignedPartitions
+            state
         );
+        this.assignedPartitions = assignedPartitions;
+    }
+
+    /**
+     * @return The partitions assigned to this member.
+     */
+    public Map<Uuid, Set<Integer>> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    /**
+     * @return True if the two provided members have different assigned 
partitions.
+     */
+    public static boolean hasAssignedPartitionsChanged(
+        ShareGroupMember member1,
+        ShareGroupMember member2
+    ) {
+        return 
!member1.assignedPartitions().equals(member2.assignedPartitions());
     }
 
     /**
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index 555c92457f8..83e9ba23da2 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -28,7 +28,6 @@ import 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
 import 
org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
-import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
 import org.apache.kafka.coordinator.group.modern.TopicIds;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
@@ -115,24 +114,23 @@ public class AssignorBenchmarkUtils {
     }
 
     /**
-     * Creates a GroupSpec from the given ModernGroupMembers.
+     * Creates a GroupSpec from the given ConsumerGroupMembers.
      *
-     * @param members               The ModernGroupMembers.
+     * @param members               The ConsumerGroupMembers.
      * @param subscriptionType      The group's subscription type.
      * @param topicResolver         The TopicResolver to use.
      * @return The new GroupSpec.
      */
-    public static GroupSpec createGroupSpec(
-        Map<String, ? extends ModernGroupMember> members,
+    public static GroupSpec createConsumerGroupSpec(
+        Map<String, ConsumerGroupMember> members,
         SubscriptionType subscriptionType,
         TopicIds.TopicResolver topicResolver
     ) {
         Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new 
HashMap<>();
 
-        // Prepare the member spec for all members.
-        for (Map.Entry<String, ? extends ModernGroupMember> memberEntry : 
members.entrySet()) {
+        for (Map.Entry<String, ConsumerGroupMember> memberEntry : 
members.entrySet()) {
             String memberId = memberEntry.getKey();
-            ModernGroupMember member = memberEntry.getValue();
+            ConsumerGroupMember member = memberEntry.getValue();
 
             memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
                 Optional.ofNullable(member.rackId()),
@@ -149,6 +147,40 @@ public class AssignorBenchmarkUtils {
         );
     }
 
+    /**
+     * Creates a GroupSpec from the given ShareGroupMembers.
+     *
+     * @param members               The ShareGroupMembers.
+     * @param subscriptionType      The group's subscription type.
+     * @param topicResolver         The TopicResolver to use.
+     * @return The new GroupSpec.
+     */
+    public static GroupSpec createShareGroupSpec(
+        Map<String, ShareGroupMember> members,
+        SubscriptionType subscriptionType,
+        TopicIds.TopicResolver topicResolver
+    ) {
+        Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new 
HashMap<>();
+
+        for (Map.Entry<String, ShareGroupMember> memberEntry : 
members.entrySet()) {
+            String memberId = memberEntry.getKey();
+            ShareGroupMember member = memberEntry.getValue();
+
+            memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
+                Optional.ofNullable(member.rackId()),
+                Optional.empty(),
+                new TopicIds(member.subscribedTopicNames(), topicResolver),
+                new Assignment(member.assignedPartitions())
+            ));
+        }
+
+        return new GroupSpecImpl(
+            memberSpecs,
+            subscriptionType,
+            Map.of()
+        );
+    }
+
     /**
      * Creates a ConsumerGroupMembers map where all members have the same 
topic subscriptions.
      *
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index b96d718c654..38e19447444 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -135,7 +135,7 @@ public class ServerSideAssignorBenchmark {
         setupTopics();
 
         Map<String, ConsumerGroupMember> members = createMembers();
-        this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, 
subscriptionType, topicResolver);
+        this.groupSpec = 
AssignorBenchmarkUtils.createConsumerGroupSpec(members, subscriptionType, 
topicResolver);
 
         if (assignmentType == AssignmentType.INCREMENTAL) {
             simulateIncrementalRebalance();
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
index 84a59560e0a..95148a2c140 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
@@ -126,7 +126,7 @@ public class ShareGroupAssignorBenchmark {
         setupTopics();
 
         Map<String, ShareGroupMember> members = createMembers();
-        this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, 
subscriptionType, topicResolver);
+        this.groupSpec = AssignorBenchmarkUtils.createShareGroupSpec(members, 
subscriptionType, topicResolver);
 
         if (assignmentType == AssignmentType.INCREMENTAL) {
             simulateIncrementalRebalance();
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index c669737de36..8b73cf11555 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -135,7 +135,7 @@ public class TargetAssignmentBuilderBenchmark {
     private Map<String, Assignment> 
generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(
         Map<String, ConsumerGroupMember> members
     ) {
-        this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
+        this.groupSpec = AssignorBenchmarkUtils.createConsumerGroupSpec(
             members,
             subscriptionType,
             topicResolver

Reply via email to