This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 0832c2ceb13 KAFKA-19195: Only send the right group ID subset to each 
GC shard (#19555)
0832c2ceb13 is described below

commit 0832c2ceb132850d93220154d2afb496963eebc6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Apr 28 15:33:20 2025 +0200

    KAFKA-19195: Only send the right group ID subset to each GC shard (#19555)
    
    Cherry-picked from
    
[e79f5f0](https://github.com/apache/kafka/commit/e79f5f0f651aba4a797da8234814bf40458beb19)
    
    If a share or consumer group is described, all group IDs sent to all
    shards of the group coordinator. This change fixes it. It tested in the
    unit tests, since it's somewhat inconvenient to test the passed read
    operation lambda.
---
 .../coordinator/group/GroupCoordinatorService.java |  4 +--
 .../group/GroupCoordinatorServiceTest.java         | 33 ++++++++++++++++++----
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index de830076bfb..893b82fa7a5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -647,7 +647,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 runtime.scheduleReadOperation(
                     "consumer-group-describe",
                     topicPartition,
-                    (coordinator, lastCommittedOffset) -> 
coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
+                    (coordinator, lastCommittedOffset) -> 
coordinator.consumerGroupDescribe(groupList, lastCommittedOffset)
                 ).exceptionally(exception -> handleOperationException(
                     "consumer-group-describe",
                     groupList,
@@ -698,7 +698,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 runtime.scheduleReadOperation(
                     "share-group-describe",
                     topicPartition,
-                    (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDescribe(groupIds, lastCommittedOffset)
+                    (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDescribe(groupList, lastCommittedOffset)
                 ).exceptionally(exception -> handleOperationException(
                     "share-group-describe",
                     groupList,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1974a796d4a..6d1120c5c93 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -82,6 +82,7 @@ import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.NullSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 
 import java.net.InetAddress;
@@ -1415,6 +1416,10 @@ public class GroupCoordinatorServiceTest {
         int partitionCount = 2;
         service.startup(() -> partitionCount);
 
+        @SuppressWarnings("unchecked")
+        
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard,
 List<ConsumerGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
+            
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
+
         ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId("group-id-1");
         ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
@@ -1427,14 +1432,14 @@ public class GroupCoordinatorServiceTest {
         when(runtime.scheduleReadOperation(
             ArgumentMatchers.eq("consumer-group-describe"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
-            ArgumentMatchers.any()
+            readOperationCaptor.capture()
         
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
 
-        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
describedGroupFuture = new CompletableFuture<>();
         when(runtime.scheduleReadOperation(
             ArgumentMatchers.eq("consumer-group-describe"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
-            ArgumentMatchers.any()
+            readOperationCaptor.capture()
         )).thenReturn(describedGroupFuture);
 
         
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
future =
@@ -1443,6 +1448,12 @@ public class GroupCoordinatorServiceTest {
         assertFalse(future.isDone());
         
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
         assertEquals(expectedDescribedGroups, future.get());
+
+        // Validate that the captured read operations, on the first and the 
second partition
+        GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
+        readOperationCaptor.getAllValues().forEach(x -> 
x.generateResponse(shard, 100));
+        verify(shard).consumerGroupDescribe(List.of("group-id-2"), 100);
+        verify(shard).consumerGroupDescribe(List.of("group-id-1"), 100);
     }
 
     @Test
@@ -2282,6 +2293,10 @@ public class GroupCoordinatorServiceTest {
         int partitionCount = 2;
         service.startup(() -> partitionCount);
 
+        @SuppressWarnings("unchecked")
+        
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard,
 List<ShareGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
+            
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
+
         ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new 
ShareGroupDescribeResponseData.DescribedGroup()
             .setGroupId("share-group-id-1");
         ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new 
ShareGroupDescribeResponseData.DescribedGroup()
@@ -2294,14 +2309,14 @@ public class GroupCoordinatorServiceTest {
         when(runtime.scheduleReadOperation(
             ArgumentMatchers.eq("share-group-describe"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
-            ArgumentMatchers.any()
+            readOperationCaptor.capture()
         
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
 
-        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
describedGroupFuture = new CompletableFuture<>();
         when(runtime.scheduleReadOperation(
             ArgumentMatchers.eq("share-group-describe"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
-            ArgumentMatchers.any()
+            readOperationCaptor.capture()
         )).thenReturn(describedGroupFuture);
 
         CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
future =
@@ -2310,6 +2325,12 @@ public class GroupCoordinatorServiceTest {
         assertFalse(future.isDone());
         
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
         assertEquals(expectedDescribedGroups, future.get());
+
+        // Validate that the captured read operations, on the first and the 
second partition
+        GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
+        readOperationCaptor.getAllValues().forEach(x -> 
x.generateResponse(shard, 100));
+        verify(shard).shareGroupDescribe(List.of("share-group-id-2"), 100);
+        verify(shard).shareGroupDescribe(List.of("share-group-id-1"), 100);
     }
 
     @Test

Reply via email to