This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 1b424d2f3da KAFKA-18916; Resolved regular expressions must update the
group by topics data structure (#19088)
1b424d2f3da is described below
commit 1b424d2f3dad79ffa33135a34d872d64a35d88fc
Author: David Jacot <[email protected]>
AuthorDate: Tue Mar 4 15:31:08 2025 +0100
KAFKA-18916; Resolved regular expressions must update the group by topics
data structure (#19088)
When regular expressions are resolved, they do not update the group by
topics data structure. Hence, topic changes (e.g. deletion) do not
trigger a rebalance of the group.
Reviewers: Lucas Brutschy <[email protected]>
---
.../server/ConsumerGroupHeartbeatRequestTest.scala | 32 ++++++++++++++++++++--
.../coordinator/group/GroupMetadataManager.java | 22 +++++++++------
.../group/GroupMetadataManagerTest.java | 15 ++++++++++
3 files changed, 58 insertions(+), 11 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 6b42c4f566a..e94bcbc56a3 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -20,7 +20,7 @@ import
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature,
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
-import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.{TopicCollection, Uuid}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
@@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
)
// Heartbeat request to join the group. Note that the member subscribes
- // to an nonexistent topic.
+ // to a nonexistent topic.
var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
@@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
).build()
// This is the expected assignment.
- val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ var expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
@@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Delete the topic.
+
admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+
+ // Heartbeats until the partitions are revoked.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions revoked. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 8c3f2dd4447..d53586efe4d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3944,9 +3944,18 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String regex = key.regularExpression();
+ ConsumerGroup consumerGroup;
+ try {
+ consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId,
value != null);
+ } catch (GroupIdNotFoundException ex) {
+ // If the group does not exist and a tombstone is replayed, we can
ignore it.
+ return;
+ }
+
+ Set<String> oldSubscribedTopicNames = new
HashSet<>(consumerGroup.subscribedTopicNames().keySet());
+
if (value != null) {
- ConsumerGroup group =
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
- group.updateResolvedRegularExpression(
+ consumerGroup.updateResolvedRegularExpression(
regex,
new ResolvedRegularExpression(
new HashSet<>(value.topics()),
@@ -3955,13 +3964,10 @@ public class GroupMetadataManager {
)
);
} else {
- try {
- ConsumerGroup group =
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
- group.removeResolvedRegularExpression(regex);
- } catch (GroupIdNotFoundException ex) {
- // If the group does not exist, we can ignore the tombstone.
- }
+ consumerGroup.removeResolvedRegularExpression(regex);
}
+
+ updateGroupsByTopics(groupId, oldSubscribedTopicNames,
consumerGroup.subscribedTopicNames().keySet());
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f40f51f7739..6375038c66c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -15398,6 +15398,11 @@ public class GroupMetadataManagerTest {
Optional.of(resolvedRegularExpression),
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
);
+
+ assertEquals(
+ Set.of("foo"),
+ context.groupMetadataManager.groupsSubscribedToTopic("abc")
+ );
}
@Test
@@ -15423,6 +15428,11 @@ public class GroupMetadataManagerTest {
resolvedRegularExpression
));
+ assertEquals(
+ Set.of("foo"),
+ context.groupMetadataManager.groupsSubscribedToTopic("abc")
+ );
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
"foo",
"abc*"
@@ -15432,6 +15442,11 @@ public class GroupMetadataManagerTest {
Optional.empty(),
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
);
+
+ assertEquals(
+ Set.of(),
+ context.groupMetadataManager.groupsSubscribedToTopic("abc")
+ );
}
@Test