This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 63a777a162f KAFKA-19891: Bump group epoch when member regex 
subscription transitions from non-empty to empty (#21013)
63a777a162f is described below

commit 63a777a162fc1e914f7206b1b180b5ba6f05653b
Author: Anton Vasanth <[email protected]>
AuthorDate: Wed Dec 3 23:04:46 2025 +0530

    KAFKA-19891: Bump group epoch when member regex subscription transitions 
from non-empty to empty (#21013)
    
    This PR fixes an issue in
    GroupMetadataManager#maybeUpdateRegularExpressions where a member’s
    regex subscription transition from non-empty → empty did not trigger a
    group epoch bump.  The method previously returned REGEX_UPDATED, which
    does not cause consumerGroupHeartbeat to increment the group epoch.
    
    Fix  The patch updates the logic to return: REGEX_UPDATED_AND_RESOLVED
    
    when:
    
    the updated regex subscription text is empty, and  the previous
    subscription was non-empty.  This ensures that consumerGroupHeartbeat
    correctly bumps the group epoch, keeping the group metadata consistent.
    
    Tests
    
    Several tests were updated to align with the corrected behavior.  Tests
    that previously expected no epoch bump were failing, and have now been
    adjusted to expect the new, correct logic.
    
    JIRA  https://issues.apache.org/jira/browse/KAFKA-19891
    
    Impact
    
    Fixes coordinator state correctness for regex-subscribing consumer
    groups  Ensures group epoch bumps happen for all relevant subscription
    transitions  Backward compatible  No public API changes
    
    Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  2 +
 .../group/GroupMetadataManagerTest.java            | 81 ++++++++++++++++++++++
 2 files changed, 83 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 35a5d562034..129d6794d69 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3314,6 +3314,8 @@ public class GroupMetadataManager {
                         updateRegularExpressionsResult = 
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
                     }
                 }
+            } else if (isNotEmpty(oldSubscribedTopicRegex)) {
+                updateRegularExpressionsResult = 
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
             }
         }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef81022dcee..3f1ca4770b0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21486,6 +21486,87 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testConsumerGroupMemberClearsRegex() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage(12345L);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 1 updates its new regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("")
+                .setServerAssignor("range")
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of())
+                ),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("")
+            .setServerAssignorName("range")
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
+            // previous expression is deleted
+            
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*"),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11, 0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, Map.of()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
     @Test
     public void 
testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
         String groupId = "fooup";

Reply via email to