This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f309299f3cf KAFKA-14503: Implement ListGroups (#14271)
f309299f3cf is described below

commit f309299f3cf92c3ed6fe545c628117b9028c2917
Author: zhaohaidao <[email protected]>
AuthorDate: Fri Sep 15 14:45:03 2023 +0800

    KAFKA-14503: Implement ListGroups (#14271)
    
    This patch implements the ListGroups API in the new group coordinator.
    
    Reviewers: David Jacot <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/coordinator/group/Group.java  |  11 ++
 .../coordinator/group/GroupCoordinatorService.java |  34 ++++-
 .../coordinator/group/GroupCoordinatorShard.java   |  17 +++
 .../coordinator/group/GroupMetadataManager.java    |  20 +++
 .../coordinator/group/consumer/ConsumerGroup.java  |  26 ++++
 .../coordinator/group/generic/GenericGroup.java    |  14 +-
 .../group/GroupCoordinatorServiceTest.java         | 148 +++++++++++++++++++++
 .../group/GroupMetadataManagerTest.java            |  87 +++++++++++-
 .../group/consumer/ConsumerGroupTest.java          |  14 ++
 10 files changed, 365 insertions(+), 8 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6e946b1bead..9862a237fa9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -326,7 +326,7 @@
     <suppress checks="(NPathComplexity|MethodLength)"
               
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
-              files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
+              
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
     <suppress checks="ParameterNumber"
               files="(ConsumerGroupMember|GroupMetadataManager).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 44290ae7fe7..cbd25cb0700 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 
 /**
  * Interface common for all groups.
@@ -48,6 +49,16 @@ public interface Group {
      */
     String stateAsString();
 
+    /**
+     * @return The {{@link GroupType}}'s String representation based on the 
committed offset.
+     */
+    String stateAsString(long committedOffset);
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset);
+
     /**
      * @return The group id.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index ac3aa9d45b4..9e11cfa63e7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
@@ -73,12 +74,15 @@ import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.Timer;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.IntSupplier;
 import java.util.stream.Collectors;
 
@@ -460,9 +464,33 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final CompletableFuture<ListGroupsResponseData> future = new 
CompletableFuture<>();
+        final List<ListGroupsResponseData.ListedGroup> results = new 
ArrayList<>();
+        final Set<TopicPartition> existingPartitionSet = runtime.partitions();
+        final AtomicInteger cnt = new 
AtomicInteger(existingPartitionSet.size());
+
+        for (TopicPartition tp : existingPartitionSet) {
+            runtime.scheduleReadOperation(
+                "list-groups",
+                tp,
+                (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+            ).handle((groups, exception) -> {
+                if (exception == null) {
+                    synchronized (results) {
+                        results.addAll(groups);
+                    }
+                } else {
+                    if (!(exception instanceof NotCoordinatorException)) {
+                        future.complete(new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()));
+                    }
+                }
+                if (cnt.decrementAndGet() == 0) {
+                    future.complete(new 
ListGroupsResponseData().setGroups(results));
+                }
+                return null;
+            });
+        }
+        return future;
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index ed68bb22c5a..97327c9722f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
@@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -308,6 +310,21 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles a ListGroups request.
+     *
+     * @param statesFilter    The states of the groups we want to list.
+     *                        If empty all groups are returned with their 
state.
+     * @param committedOffset A specified committed offset corresponding to 
this shard
+     * @return A list containing the ListGroupsResponseData.ListedGroup
+     */
+    public List<ListGroupsResponseData.ListedGroup> listGroups(
+        List<String> statesFilter,
+        long committedOffset
+    ) throws ApiException {
+        return groupMetadataManager.listGroups(statesFilter, committedOffset);
+    }
+
     /**
      * Handles a LeaveGroup request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a174a0fb1ff..002b7956857 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -95,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
@@ -424,6 +426,24 @@ public class GroupMetadataManager {
         return group;
     }
 
+    /**
+     * Get the Group List.
+     *
+     * @param statesFilter The states of the groups we want to list.
+     *                     If empty all groups are returned with their state.
+     * @param committedOffset A specified committed offset corresponding to 
this shard
+     *
+     * @return A list containing the ListGroupsResponseData.ListedGroup
+     */
+
+    public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> 
statesFilter, long committedOffset) {
+        Stream<Group> groupStream = groups.values(committedOffset).stream();
+        if (!statesFilter.isEmpty()) {
+            groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString(committedOffset)));
+        }
+        return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
+    }
+
     /**
      * Gets or maybe creates a consumer group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 5328020baac..10222c3a3cc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.coordinator.group.consumer;
 
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.image.TopicImage;
@@ -179,6 +181,23 @@ public class ConsumerGroup implements Group {
         return state.get().toString();
     }
 
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString());
+    }
+
     /**
      * @return The group id.
      */
@@ -194,6 +213,13 @@ public class ConsumerGroup implements Group {
         return state.get();
     }
 
+    /**
+     * @return The current state based on committed offset.
+     */
+    public ConsumerGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
     /**
      * @return The group epoch.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index d163f385fe0..18f96f2f78e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -242,6 +242,16 @@ public class GenericGroup implements Group {
         return this.state.toString();
     }
 
+    /**
+     * The state of this group based on the committed offset.
+     *
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString(long committedOffset) {
+        return this.state.toString();
+    }
+
     /**
      * @return the group id.
      */
@@ -1167,9 +1177,9 @@ public class GenericGroup implements Group {
     }
 
     /**
-     * @return the group formatted as a list group response.
+     * @return the group formatted as a list group response based on the 
committed offset.
      */
-    public ListGroupsResponseData.ListedGroup asListedGroup() {
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
         return new ListGroupsResponseData.ListedGroup()
             .setGroupId(groupId)
             .setProtocolType(protocolType.orElse(""))
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 61af5582824..8f39f524484 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
@@ -23,6 +24,7 @@ import 
org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -36,6 +38,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -63,10 +67,12 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentMatchers;
+import org.mockito.internal.util.collections.Sets;
 
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -603,6 +609,148 @@ public class GroupCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 3;
+        service.startup(() -> partitionCount);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        List<ListGroupsResponseData.ListedGroup> expectedResults = 
Arrays.asList(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group0")
+                .setGroupState("Stable")
+                .setProtocolType("protocol1"),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group1")
+                .setGroupState("Empty")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group2")
+                .setGroupState("Dead")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+        );
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+        for (int i = 0; i < partitionCount; i++) {
+            when(runtime.scheduleReadOperation(
+                ArgumentMatchers.eq("list-groups"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+                ArgumentMatchers.any()
+            
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
+        }
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+
+        List<ListGroupsResponseData.ListedGroup> actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+        assertEquals(expectedResults, actualResults);
+    }
+
+    @Test
+    public void testListGroupsFailedWithNotCoordinatorException()
+        throws InterruptedException, ExecutionException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 3;
+        service.startup(() -> partitionCount);
+
+        List<ListGroupsResponseData.ListedGroup> expectedResults = 
Arrays.asList(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group0")
+                .setGroupState("Stable")
+                .setProtocolType("protocol1"),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group1")
+                .setGroupState("Empty")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+        );
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+        for (int i = 0; i < 2; i++) {
+            when(runtime.scheduleReadOperation(
+                ArgumentMatchers.eq("list-groups"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+                ArgumentMatchers.any()
+            
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
+        }
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(new 
NotCoordinatorException("")));
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+        List<ListGroupsResponseData.ListedGroup> actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+        assertEquals(expectedResults, actualResults);
+    }
+
+    @Test
+    public void testListGroupsFailedImmediately()
+        throws InterruptedException, ExecutionException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 3;
+        service.startup(() -> partitionCount);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+        for (int i = 0; i < 2; i++) {
+            when(runtime.scheduleReadOperation(
+                ArgumentMatchers.eq("list-groups"),
+                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+                ArgumentMatchers.any()
+            )).thenReturn(new CompletableFuture<>());
+        }
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(new 
CoordinatorLoadInProgressException("")));
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+        ListGroupsResponseData listGroupsResponseData = responseFuture.get(5, 
TimeUnit.SECONDS);
+
+        assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), 
listGroupsResponseData.errorCode());
+        assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testFetchOffsets(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3a735db37e3..a1b25575e8e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMe
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -115,8 +116,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@@ -455,8 +458,6 @@ public class GroupMetadataManagerTest {
         public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupHeartbeat(
             ConsumerGroupHeartbeatRequestData request
         ) {
-            snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-
             RequestContext context = new RequestContext(
                 new RequestHeader(
                     ApiKeys.CONSUMER_GROUP_HEARTBEAT,
@@ -1023,6 +1024,10 @@ public class GroupMetadataManagerTest {
             );
         }
 
+        public List<ListGroupsResponseData.ListedGroup> 
sendListGroups(List<String> statesFilter) {
+            return groupMetadataManager.listGroups(statesFilter, 
lastCommittedOffset);
+        }
+
         public void verifyHeartbeat(
             String groupId,
             JoinGroupResponseData joinResponse,
@@ -1202,6 +1207,7 @@ public class GroupMetadataManagerTest {
             }
 
             lastWrittenOffset++;
+            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
         }
     }
 
@@ -8603,6 +8609,83 @@ public class GroupMetadataManagerTest {
         assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
     }
 
+    @Test
+    public void testListGroups() {
+        String consumerGroupId = "consumer-group-id";
+        String genericGroupId = "generic-group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String genericGroupType = "generic";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+            .build();
+        context.replay(newGroupMetadataRecord(
+            genericGroupId,
+            new GroupMetadataValue()
+                .setMembers(Collections.emptyList())
+                .setGeneration(2)
+                .setLeader(null)
+                .setProtocolType(genericGroupType)
+                .setProtocol("range")
+                .setCurrentStateTimestamp(context.time.milliseconds()),
+            MetadataVersion.latest()));
+        context.commit();
+        GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+        
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new 
ConsumerGroupMember.Builder(memberId1)
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .build()));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
+
+        Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
+            context.sendListGroups(Collections.emptyList())
+                
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+        Map<String, ListGroupsResponseData.ListedGroup> expectAllGroupMap =
+            Stream.of(
+                new ListGroupsResponseData.ListedGroup()
+                    .setGroupId(genericGroup.groupId())
+                    .setProtocolType(genericGroupType)
+                    .setGroupState(EMPTY.toString()),
+                new ListGroupsResponseData.ListedGroup()
+                    .setGroupId(consumerGroupId)
+                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                    
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
+            
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+        assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+        context.commit();
+        actualAllGroupMap = 
context.sendListGroups(Collections.emptyList()).stream()
+            
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+        expectAllGroupMap =
+            Stream.of(
+                new ListGroupsResponseData.ListedGroup()
+                    .setGroupId(genericGroup.groupId())
+                    .setProtocolType(genericGroupType)
+                    .setGroupState(EMPTY.toString()),
+                new ListGroupsResponseData.ListedGroup()
+                    .setGroupId(consumerGroupId)
+                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                    
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
+            
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+        assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+        actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty")).stream()
+            
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+        expectAllGroupMap = Stream.of(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId(genericGroup.groupId())
+                .setProtocolType(genericGroupType)
+                .setGroupState(EMPTY.toString())
+        
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+        assertEquals(expectAllGroupMap, actualAllGroupMap);
+    }
+
     public static <T> void assertUnorderedListEquals(
         List<T> expected,
         List<T> actual
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index aa848aa3f5d..9c9421958c5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -632,6 +632,20 @@ public class ConsumerGroupTest {
         group.validateOffsetCommit("member-id", "", 0);
     }
 
+    @Test
+    public void testAsListedGroup() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+        snapshotRegistry.getOrCreateSnapshot(0);
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
+        group.updateMember(new ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Collections.singletonList("foo"))
+            .build());
+        snapshotRegistry.getOrCreateSnapshot(1);
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
+        assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(), 
group.stateAsString(1));
+    }
+
     @Test
     public void testValidateOffsetFetch() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());

Reply via email to