This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 0832c2ceb13 KAFKA-19195: Only send the right group ID subset to each
GC shard (#19555)
0832c2ceb13 is described below
commit 0832c2ceb132850d93220154d2afb496963eebc6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Apr 28 15:33:20 2025 +0200
KAFKA-19195: Only send the right group ID subset to each GC shard (#19555)
Cherry-picked from
[e79f5f0](https://github.com/apache/kafka/commit/e79f5f0f651aba4a797da8234814bf40458beb19)
If a share or consumer group is described, all group IDs sent to all
shards of the group coordinator. This change fixes it. It tested in the
unit tests, since it's somewhat inconvenient to test the passed read
operation lambda.
---
.../coordinator/group/GroupCoordinatorService.java | 4 +--
.../group/GroupCoordinatorServiceTest.java | 33 ++++++++++++++++++----
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index de830076bfb..893b82fa7a5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -647,7 +647,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
runtime.scheduleReadOperation(
"consumer-group-describe",
topicPartition,
- (coordinator, lastCommittedOffset) ->
coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
+ (coordinator, lastCommittedOffset) ->
coordinator.consumerGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"consumer-group-describe",
groupList,
@@ -698,7 +698,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
runtime.scheduleReadOperation(
"share-group-describe",
topicPartition,
- (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDescribe(groupIds, lastCommittedOffset)
+ (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"share-group-describe",
groupList,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1974a796d4a..6d1120c5c93 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -82,6 +82,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
@@ -1415,6 +1416,10 @@ public class GroupCoordinatorServiceTest {
int partitionCount = 2;
service.startup(() -> partitionCount);
+ @SuppressWarnings("unchecked")
+
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard,
List<ConsumerGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
+
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
+
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new
ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1");
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new
ConsumerGroupDescribeResponseData.DescribedGroup()
@@ -1427,14 +1432,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
- ArgumentMatchers.any()
+ readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
- CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
- ArgumentMatchers.any()
+ readOperationCaptor.capture()
)).thenReturn(describedGroupFuture);
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
future =
@@ -1443,6 +1448,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone());
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
assertEquals(expectedDescribedGroups, future.get());
+
+ // Validate that the captured read operations, on the first and the
second partition
+ GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
+ readOperationCaptor.getAllValues().forEach(x ->
x.generateResponse(shard, 100));
+ verify(shard).consumerGroupDescribe(List.of("group-id-2"), 100);
+ verify(shard).consumerGroupDescribe(List.of("group-id-1"), 100);
}
@Test
@@ -2282,6 +2293,10 @@ public class GroupCoordinatorServiceTest {
int partitionCount = 2;
service.startup(() -> partitionCount);
+ @SuppressWarnings("unchecked")
+
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard,
List<ShareGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
+
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
+
ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new
ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new
ShareGroupDescribeResponseData.DescribedGroup()
@@ -2294,14 +2309,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
- ArgumentMatchers.any()
+ readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
- CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
- ArgumentMatchers.any()
+ readOperationCaptor.capture()
)).thenReturn(describedGroupFuture);
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
future =
@@ -2310,6 +2325,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone());
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
assertEquals(expectedDescribedGroups, future.get());
+
+ // Validate that the captured read operations, on the first and the
second partition
+ GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
+ readOperationCaptor.getAllValues().forEach(x ->
x.generateResponse(shard, 100));
+ verify(shard).shareGroupDescribe(List.of("share-group-id-2"), 100);
+ verify(shard).shareGroupDescribe(List.of("share-group-id-1"), 100);
}
@Test