This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 474bd6f7f18 KAFKA-19899: Bumping group epoch when member regex 
subscription changes from non empty to empty (#21075)
474bd6f7f18 is described below

commit 474bd6f7f182fde1fce583f560f74c3d56b3da1b
Author: Anton Vasanth <[email protected]>
AuthorDate: Thu Dec 4 18:09:55 2025 +0530

    KAFKA-19899: Bumping group epoch when member regex subscription changes 
from non empty to empty (#21075)
    
    This patch fixes a case in GroupMetadataManager where a member’s
    regular expression subscription transitions from a non-empty regex  to
    an empty regex. In this scenario, maybeUpdateRegularExpressions  should
    return true so that the group epoch is bumped.
    
    
    
[https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-19899?filter=allissues](url)
    
    Target branch: 4.1
    
    Reviewers: Sean Quah <[email protected]>, David Jacot
     <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  2 +
 .../group/GroupMetadataManagerTest.java            | 81 ++++++++++++++++++++++
 2 files changed, 83 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 53b5f4ba1d5..5b20c4ba4ba 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3149,6 +3149,8 @@ public class GroupMetadataManager {
                     // by bumping the group epoch.
                     bumpGroupEpoch = 
group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
                 }
+            } else if (isNotEmpty(oldSubscribedTopicRegex)) {
+                bumpGroupEpoch = true;
             }
         }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 135752fec61..3c876284515 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20790,6 +20790,87 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testConsumerGroupMemberClearsRegex() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build(12345L);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 1 updates its new regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("")
+                .setServerAssignor("range")
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of())
+                ),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("")
+            .setServerAssignorName("range")
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
+            // previous expression is deleted
+            
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*"),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11, 0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, Map.of()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+    
     @Test
     public void 
testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
         String groupId = "fooup";

Reply via email to