This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6551e87815c KAFKA-18925: Add streams groups support to 
Admin.listGroups (#19155)
6551e87815c is described below

commit 6551e87815caddee02658534efccba5dc95d2dab
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 11 15:48:07 2025 +0100

    KAFKA-18925: Add streams groups support to Admin.listGroups (#19155)
    
    Add support so that Admin.listGroups can represent
    streams groups and their states.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../java/org/apache/kafka/common/GroupState.java   |  24 +--
 .../java/org/apache/kafka/common/GroupType.java    |   3 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 184 +++++++++++++++++++++
 3 files changed, 200 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/GroupState.java 
b/clients/src/main/java/org/apache/kafka/common/GroupState.java
index c0bcfb999b0..aa2565abf24 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupState.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupState.java
@@ -32,17 +32,18 @@ import java.util.stream.Collectors;
  * The following table shows the correspondence between the group states and 
types.
  * <table>
  *     <thead>
- *         <tr><th>State</th><th>Classic group</th><th>Consumer 
group</th><th>Share group</th></tr>
+ *         <tr><th>State</th><th>Classic group</th><th>Consumer 
group</th><th>Share group</th><th>Streams group</th></tr>
  *     </thead>
  *     <tbody>
- *         <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- *         
<tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
- *         
<tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
- *         <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- *         <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- *         <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- *         <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td></tr>
- *         <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td></tr>
+ *         
<tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *         
<tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
+ *         
<tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
+ *         
<tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *         
<tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *         
<tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *         
<tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
+ *         
<tr><td>RECONCILING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
+ *         <tr><td>NOT_READY</td><td></td><td></td><td></td><td>Yes</td></tr>
  *     </tbody>
  * </table>
  */
@@ -55,7 +56,8 @@ public enum GroupState {
     DEAD("Dead"),
     EMPTY("Empty"),
     ASSIGNING("Assigning"),
-    RECONCILING("Reconciling");
+    RECONCILING("Reconciling"),
+    NOT_READY("NotReady");
 
     private static final Map<String, GroupState> NAME_TO_ENUM = 
Arrays.stream(values())
             .collect(Collectors.toMap(state -> 
state.name.toUpperCase(Locale.ROOT), Function.identity()));
@@ -79,6 +81,8 @@ public enum GroupState {
             return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, 
DEAD, EMPTY);
         } else if (type == GroupType.CONSUMER) {
             return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, 
DEAD, EMPTY, ASSIGNING, RECONCILING);
+        } else if (type == GroupType.STREAMS) {
+            return Set.of(STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING, 
NOT_READY);
         } else if (type == GroupType.SHARE) {
             return Set.of(STABLE, DEAD, EMPTY);
         } else {
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupType.java 
b/clients/src/main/java/org/apache/kafka/common/GroupType.java
index eeb79ea2825..4c3aeac93fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupType.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupType.java
@@ -26,7 +26,8 @@ public enum GroupType {
     UNKNOWN("Unknown"),
     CONSUMER("Consumer"),
     CLASSIC("Classic"),
-    SHARE("Share");
+    SHARE("Share"),
+    STREAMS("Streams");
 
     private static final Map<String, GroupType> NAME_TO_ENUM = 
Arrays.stream(values())
         .collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT), 
Function.identity()));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 17569ed3956..5e3a884742d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -140,6 +140,7 @@ import 
org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
@@ -5991,6 +5992,189 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListStreamsGroups() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Empty metadata response should be retried
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    Collections.emptyList(),
+                    env.cluster().clusterResource().clusterId(),
+                    -1,
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(singletonList(
+                            new ListedGroup()
+                                .setGroupId("streams-group-1")
+                                .setGroupType(GroupType.STREAMS.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(0));
+
+            // handle retriable errors
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                        .setGroups(Collections.emptyList())
+                ),
+                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                        .setGroups(Collections.emptyList())
+                ),
+                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("streams-group-2")
+                                .setGroupType(GroupType.STREAMS.toString())
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("streams-group-3")
+                                .setGroupType(GroupType.STREAMS.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(1));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(singletonList(
+                            new ListedGroup()
+                                .setGroupId("streams-group-4")
+                                .setGroupType(GroupType.STREAMS.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(2));
+
+            // fatal error
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setGroups(Collections.emptyList())),
+                env.cluster().nodeById(3));
+
+            final ListGroupsResult result = env.adminClient().listGroups(new 
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+            TestUtils.assertFutureThrows(UnknownServerException.class, 
result.all());
+
+            Collection<GroupListing> listings = result.valid().get();
+            assertEquals(4, listings.size());
+
+            Set<String> groupIds = new HashSet<>();
+            for (GroupListing listing : listings) {
+                groupIds.add(listing.groupId());
+                assertTrue(listing.groupState().isPresent());
+            }
+
+            assertEquals(Set.of("streams-group-1", "streams-group-2", 
"streams-group-3", "streams-group-4"), groupIds);
+            assertEquals(1, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListStreamsGroupsMetadataFailure() throws Exception {
+        final Cluster cluster = mockCluster(3, 0);
+        final Time time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
+            AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Empty metadata causes the request to fail since we have no list 
of brokers
+            // to send the ListGroups requests to
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    Collections.emptyList(),
+                    env.cluster().clusterResource().clusterId(),
+                    -1,
+                    Collections.emptyList()));
+
+            final ListGroupsResult result = env.adminClient().listGroups(new 
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+            TestUtils.assertFutureThrows(KafkaException.class, result.all());
+        }
+    }
+
+    @Test
+    public void testListStreamsGroupsWithStates() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Arrays.asList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("streams-group-1")
+                            .setGroupType(GroupType.STREAMS.toString())
+                            .setProtocolType("streams")
+                            .setGroupState("Stable"),
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("streams-group-2")
+                            .setGroupType(GroupType.STREAMS.toString())
+                            .setProtocolType("streams")
+                            .setGroupState("NotReady")))),
+                env.cluster().nodeById(0));
+
+            final ListGroupsResult result = env.adminClient().listGroups(new 
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+            Collection<GroupListing> listings = result.valid().get();
+
+            assertEquals(2, listings.size());
+            List<GroupListing> expected = new ArrayList<>();
+            expected.add(new GroupListing("streams-group-1", 
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
+            expected.add(new GroupListing("streams-group-2", 
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListStreamsGroupsWithStatesOlderBrokerVersion() {
+        ApiVersion listGroupV4 = new ApiVersion()
+            .setApiKey(ApiKeys.LIST_GROUPS.id)
+            .setMinVersion((short) 0)
+            .setMaxVersion((short) 4);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            // Check we should not be able to list streams groups with broker 
having version < 5
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Collections.singletonList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("streams-group-1")))),
+                env.cluster().nodeById(0));
+            ListGroupsResult result = env.adminClient().listGroups(new 
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+            TestUtils.assertFutureThrows(UnsupportedVersionException.class, 
result.all());
+        }
+    }
+    
     @Test
     public void testDescribeShareGroups() throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {

Reply via email to