This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 1b424d2f3da KAFKA-18916; Resolved regular expressions must update the 
group by topics data structure (#19088)
1b424d2f3da is described below

commit 1b424d2f3dad79ffa33135a34d872d64a35d88fc
Author: David Jacot <[email protected]>
AuthorDate: Tue Mar 4 15:31:08 2025 +0100

    KAFKA-18916; Resolved regular expressions must update the group by topics 
data structure (#19088)
    
    When regular expressions are resolved, they do not update the group by
    topics data structure. Hence, topic changes (e.g. deletion) do not
    trigger a rebalance of the group.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 32 ++++++++++++++++++++--
 .../coordinator/group/GroupMetadataManager.java    | 22 +++++++++------
 .../group/GroupMetadataManagerTest.java            | 15 ++++++++++
 3 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 6b42c4f566a..e94bcbc56a3 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -20,7 +20,7 @@ import 
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature,
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
-import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.{TopicCollection, Uuid}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData}
 import org.apache.kafka.common.protocol.Errors
@@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       )
 
       // Heartbeat request to join the group. Note that the member subscribes
-      // to an nonexistent topic.
+      // to a nonexistent topic.
       var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
         new ConsumerGroupHeartbeatRequestData()
           .setGroupId("grp")
@@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       ).build()
 
       // This is the expected assignment.
-      val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      var expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
         .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
           .setTopicId(topicId)
           .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
@@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       // Verify the response.
       assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+
+      // Delete the topic.
+      
admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get
+
+      // Prepare the next heartbeat.
+      consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+        new ConsumerGroupHeartbeatRequestData()
+          .setGroupId("grp")
+          .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+          .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+      ).build()
+
+      // This is the expected assignment.
+      expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+
+      // Heartbeats until the partitions are revoked.
+      consumerGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+        consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+          consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+      }, msg = s"Could not get partitions revoked. Last response 
$consumerGroupHeartbeatResponse.")
+
+      // Verify the response.
+      assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
     } finally {
       admin.close()
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 8c3f2dd4447..d53586efe4d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3944,9 +3944,18 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
         String regex = key.regularExpression();
 
+        ConsumerGroup consumerGroup;
+        try {
+            consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, 
value != null);
+        } catch (GroupIdNotFoundException ex) {
+            // If the group does not exist and a tombstone is replayed, we can 
ignore it.
+            return;
+        }
+
+        Set<String> oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames().keySet());
+
         if (value != null) {
-            ConsumerGroup group = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
-            group.updateResolvedRegularExpression(
+            consumerGroup.updateResolvedRegularExpression(
                 regex,
                 new ResolvedRegularExpression(
                     new HashSet<>(value.topics()),
@@ -3955,13 +3964,10 @@ public class GroupMetadataManager {
                 )
             );
         } else {
-            try {
-                ConsumerGroup group = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
-                group.removeResolvedRegularExpression(regex);
-            } catch (GroupIdNotFoundException ex) {
-                // If the group does not exist, we can ignore the tombstone.
-            }
+            consumerGroup.removeResolvedRegularExpression(regex);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames().keySet());
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f40f51f7739..6375038c66c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -15398,6 +15398,11 @@ public class GroupMetadataManagerTest {
             Optional.of(resolvedRegularExpression),
             
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
         );
+
+        assertEquals(
+            Set.of("foo"),
+            context.groupMetadataManager.groupsSubscribedToTopic("abc")
+        );
     }
 
     @Test
@@ -15423,6 +15428,11 @@ public class GroupMetadataManagerTest {
             resolvedRegularExpression
         ));
 
+        assertEquals(
+            Set.of("foo"),
+            context.groupMetadataManager.groupsSubscribedToTopic("abc")
+        );
+
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
             "foo",
             "abc*"
@@ -15432,6 +15442,11 @@ public class GroupMetadataManagerTest {
             Optional.empty(),
             
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
         );
+
+        assertEquals(
+            Set.of(),
+            context.groupMetadataManager.groupsSubscribedToTopic("abc")
+        );
     }
 
     @Test

Reply via email to