This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 63a777a162f KAFKA-19891: Bump group epoch when member regex
subscription transitions from non-empty to empty (#21013)
63a777a162f is described below
commit 63a777a162fc1e914f7206b1b180b5ba6f05653b
Author: Anton Vasanth <[email protected]>
AuthorDate: Wed Dec 3 23:04:46 2025 +0530
KAFKA-19891: Bump group epoch when member regex subscription transitions
from non-empty to empty (#21013)
This PR fixes an issue in
GroupMetadataManager#maybeUpdateRegularExpressions where a member’s
regex subscription transition from non-empty → empty did not trigger a
group epoch bump. The method previously returned REGEX_UPDATED, which
does not cause consumerGroupHeartbeat to increment the group epoch.
Fix The patch updates the logic to return: REGEX_UPDATED_AND_RESOLVED
when:
the updated regex subscription text is empty, and the previous
subscription was non-empty. This ensures that consumerGroupHeartbeat
correctly bumps the group epoch, keeping the group metadata consistent.
Tests
Several tests were updated to align with the corrected behavior. Tests
that previously expected no epoch bump were failing, and have now been
adjusted to expect the new, correct logic.
JIRA https://issues.apache.org/jira/browse/KAFKA-19891
Impact
Fixes coordinator state correctness for regex-subscribing consumer
groups Ensures group epoch bumps happen for all relevant subscription
transitions Backward compatible No public API changes
Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 2 +
.../group/GroupMetadataManagerTest.java | 81 ++++++++++++++++++++++
2 files changed, 83 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 35a5d562034..129d6794d69 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3314,6 +3314,8 @@ public class GroupMetadataManager {
updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
}
}
+ } else if (isNotEmpty(oldSubscribedTopicRegex)) {
+ updateRegularExpressionsResult =
UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef81022dcee..3f1ca4770b0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21486,6 +21486,87 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testConsumerGroupMemberClearsRegex() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage(12345L);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 1 updates its new regular expression.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of())
+ ),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("")
+ .setServerAssignorName("range")
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
+ // previous expression is deleted
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, Map.of()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
@Test
public void
testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
String groupId = "fooup";