This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new 1b028735119 KAFKA-17125 Add integration test for StreamsGroup in Admin 
API (#18911)
1b028735119 is described below

commit 1b0287351191eb50f9b4a1fb7193bea6e9965b96
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Feb 21 16:27:00 2025 +0100

    KAFKA-17125 Add integration test for StreamsGroup in Admin API (#18911)
    
    Integration test for both `--list` and `--describe` commands.
---
 .../kafka/tools/streams/StreamsGroupCommand.java   | 123 ++++---
 .../tools/streams/StreamsGroupCommandOptions.java  |   8 +-
 .../tools/streams/StreamsGroupCommandTest.java     | 366 +++++++++++++++++++++
 .../tools/streams/StreamsGroupCommandUnitTest.java |  13 +-
 4 files changed, 462 insertions(+), 48 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0543ee270e6..c8333b03bae 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
 import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
 import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
 import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.TopicPartition;
@@ -38,6 +40,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -169,17 +172,19 @@ public class StreamsGroupCommand {
         }
 
         public void describeGroups() throws ExecutionException, 
InterruptedException {
-            String group = opts.options.valueOf(opts.groupOpt);
-            StreamsGroupDescription description = getDescribeGroup(group);
-            if (description == null)
-                return;
-            boolean verbose =  opts.options.has(opts.verboseOpt);
-            if (opts.options.has(opts.membersOpt)) {
-                printMembers(description, verbose);
-            } else if (opts.options.has(opts.stateOpt)) {
-                printStates(description, verbose);
-            } else {
-                printOffsets(description, verbose);
+            List<String> groups = listStreamsGroups();
+            if (!groups.isEmpty()) {
+                StreamsGroupDescription description = 
getDescribeGroup(groups.get(0));
+                if (description == null)
+                    return;
+                boolean verbose = opts.options.has(opts.verboseOpt);
+                if (opts.options.has(opts.membersOpt)) {
+                    printMembers(description, verbose);
+                } else if (opts.options.has(opts.stateOpt)) {
+                    printStates(description, verbose);
+                } else {
+                    printOffsets(description, verbose);
+                }
             }
         }
 
@@ -201,43 +206,52 @@ public class StreamsGroupCommand {
                 }
 
                 if (!verbose) {
-                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
+                    System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID", "ASSIGNMENTS");
                     for (StreamsGroupMemberDescription member : members) {
-                        System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID");
-                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId());
-                        printTasks(member.assignment(), false);
-                        System.out.println();
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId(), 
getTasksForPrinting(member.assignment(), Optional.empty()));
                     }
                 } else {
-                    String fmt = "%" + -groupLen + "s %s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    String fmt = "%" + -groupLen + "s %-25s %-15s%" + 
-maxMemberIdLen + "s %-15s %-15s %" + -maxHostLen + "s %" + -maxClientIdLen + 
"s %s\n";
+                    System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", 
"TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", 
"CLIENT-ID", "ASSIGNMENTS");
+
                     for (StreamsGroupMemberDescription member : members) {
-                        System.out.printf(fmt, "GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID");
                         System.out.printf(fmt, description.groupId(), 
description.targetAssignmentEpoch(), description.topologyEpoch(), 
member.memberId(),
-                            member.isClassic() ? "classic" : "streams", 
member.memberEpoch(), member.processId(), member.clientId());
-                        printTasks(member.assignment(), false);
-                        printTasks(member.targetAssignment(), true);
-                        System.out.println();
+                            member.isClassic() ? "classic" : "streams", 
member.memberEpoch(), member.processId(), member.clientId(), 
getTasksForPrinting(member.assignment(), 
Optional.of(member.targetAssignment())));
                     }
                 }
             }
         }
 
-        private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds> 
tasks, String taskType) {
-            System.out.printf("%s%n", taskType + ": " + 
tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" + 
taskId.partitions()).collect(Collectors.joining(",")) + "] ");
+        private String 
prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String 
taskType) {
+            if (tasks.isEmpty()) {
+                return "";
+            }
+            StringBuilder builder = new StringBuilder(taskType).append(": ");
+            for (StreamsGroupMemberAssignment.TaskIds taskIds : tasks) {
+                builder.append(taskIds.subtopologyId()).append(":[");
+                
builder.append(taskIds.partitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
+                builder.append("]; ");
+            }
+            return builder.toString();
         }
 
-        private void printTasks(StreamsGroupMemberAssignment assignment, 
boolean isTarget) {
-            String typePrefix = isTarget ? "TARGET-" : "";
-            printTaskType(assignment.activeTasks(), typePrefix + 
"ACTIVE-TASKS:");
-            printTaskType(assignment.standbyTasks(), typePrefix + 
"STANDBY-TASKS:");
-            printTaskType(assignment.warmupTasks(), typePrefix + 
"WARMUP-TASKS:");
+        private String getTasksForPrinting(StreamsGroupMemberAssignment 
assignment, Optional<StreamsGroupMemberAssignment> targetAssignment) {
+            StringBuilder builder = new StringBuilder();
+            builder.append(prepareTaskType(assignment.activeTasks(), "ACTIVE"))
+                .append(prepareTaskType(assignment.standbyTasks(), "STANDBY"))
+                .append(prepareTaskType(assignment.warmupTasks(), "WARMUP"));
+            targetAssignment.ifPresent(target -> 
builder.append(prepareTaskType(target.activeTasks(), "TARGET-ACTIVE"))
+                .append(prepareTaskType(target.standbyTasks(), 
"TARGET-STANDBY"))
+                .append(prepareTaskType(target.warmupTasks(), 
"TARGET-WARMUP")));
+            return builder.toString();
         }
 
         private void printStates(StreamsGroupDescription description, boolean 
verbose) {
             maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), 1);
 
             int groupLen = Math.max(15, description.groupId().length());
-            String coordinator = description.coordinator().host() + ":" + 
description.coordinator().port() + "  (" + description.coordinator().idString() 
+ ")";
+            String coordinator = description.coordinator().host() + ":" + 
description.coordinator().port() + " (" + description.coordinator().idString() 
+ ")";
             int coordinatorLen = Math.max(25, coordinator.length());
 
             if (!verbose) {
@@ -245,14 +259,14 @@ public class StreamsGroupCommand {
                 System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", 
"#MEMBERS");
                 System.out.printf(fmt, description.groupId(), coordinator, 
description.groupState().toString(), description.members().size());
             } else {
-                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s 
%-15s %-15s %-15s %s%n";
+                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s 
%-15s %-15s %-25s %s\n";
                 System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", 
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
                 System.out.printf(fmt, description.groupId(), coordinator, 
description.groupState().toString(), description.groupEpoch(), 
description.targetAssignmentEpoch(), description.members().size());
             }
         }
 
         private void printOffsets(StreamsGroupDescription description, boolean 
verbose) throws ExecutionException, InterruptedException {
-            Map<TopicPartition, Long> offsets = 
getOffsets(description.members(), description);
+            Map<TopicPartition, OffsetsInfo> offsets = getOffsets(description);
             if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), offsets.size())) {
                 int groupLen = Math.max(15, description.groupId().length());
                 int maxTopicLen = 15;
@@ -261,22 +275,25 @@ public class StreamsGroupCommand {
                 }
 
                 if (!verbose) {
-                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %s%n";
+                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %-15s%n";
                     System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"OFFSET-LAG");
-                    for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
-                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
+                    for (Map.Entry<TopicPartition, OffsetsInfo> offset : 
offsets.entrySet()) {
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), offset.getValue().lag);
                     }
                 } else {
-                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %-15s %s%n";
-                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "OFFSET-LAG");
-                    for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
-                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %-15s %-15s %-15s %-15s%n";
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
+                    for (Map.Entry<TopicPartition, OffsetsInfo> offset : 
offsets.entrySet()) {
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(),
+                            
offset.getValue().currentOffset.map(Object::toString).orElse("-"), 
offset.getValue().leaderEpoch.map(Object::toString).orElse("-"),
+                            offset.getValue().logEndOffset, 
offset.getValue().lag);
                     }
                 }
             }
         }
 
-        Map<TopicPartition, Long> 
getOffsets(Collection<StreamsGroupMemberDescription> members, 
StreamsGroupDescription description) throws ExecutionException, 
InterruptedException {
+        Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription 
description) throws ExecutionException, InterruptedException {
+            final Collection<StreamsGroupMemberDescription> members = 
description.members();
             Set<TopicPartition> allTp = new HashSet<>();
             for (StreamsGroupMemberDescription memberDescription : members) {
                 
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(), 
description));
@@ -291,14 +308,31 @@ public class StreamsGroupCommand {
             }
             Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
earliestResult = adminClient.listOffsets(earliest).all().get();
             Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
latestResult = adminClient.listOffsets(latest).all().get();
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
getCommittedOffsets(description.groupId());
 
-            Map<TopicPartition, Long> lag = new HashMap<>();
+            Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
             for (Map.Entry<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
-                lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - 
earliestResult.get(tp.getKey()).offset());
+                final Optional<Long> currentOffset = 
committedOffsets.containsKey(tp.getKey()) ? 
Optional.of(committedOffsets.get(tp.getKey()).offset()) : Optional.empty();
+                final Optional<Integer> leaderEpoch = 
committedOffsets.containsKey(tp.getKey()) ? 
committedOffsets.get(tp.getKey()).leaderEpoch() : Optional.empty();
+                final long lag = currentOffset.map(current -> 
latestResult.get(tp.getKey()).offset() - current).orElseGet(() -> 
latestResult.get(tp.getKey()).offset() - 
earliestResult.get(tp.getKey()).offset());
+                output.put(tp.getKey(),
+                    new OffsetsInfo(
+                        currentOffset,
+                        leaderEpoch,
+                        latestResult.get(tp.getKey()).offset(),
+                        lag));
             }
-            return lag;
+            return output;
         }
 
+        Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
+            try {
+                return adminClient.listConsumerGroupOffsets(
+                    Collections.singletonMap(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
 
         /**
          * Prints a summary of the state for situations where the group is 
empty or dead.
@@ -347,4 +381,7 @@ public class StreamsGroupCommand {
             return Admin.create(props);
         }
     }
+
+    record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> 
leaderEpoch, Long logEndOffset, Long lag) {
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index c97e99e65a8..7982f96972d 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -60,6 +60,11 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
     public final OptionSpec<Void> verboseOpt;
 
 
+    public static StreamsGroupCommandOptions fromArgs(String[] args) {
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+        opts.checkArgs();
+        return opts;
+    }
 
     public StreamsGroupCommandOptions(String[] args) {
         super(args);
@@ -104,9 +109,6 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
 
         if (options.has(describeOpt)) {
-            if (!options.has(groupOpt))
-                CommandLineUtils.printUsageAndExit(parser,
-                    "Option " + describeOpt + " takes the option: " + 
groupOpt);
             List<OptionSpec<?>> mutuallyExclusiveOpts = 
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
             if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 
1 : 0).sum() > 1) {
                 CommandLineUtils.printUsageAndExit(parser,
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
new file mode 100644
index 00000000000..e6f240f6350
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.GroupProtocol;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ToolsTestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionException;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+
+@Timeout(600)
+@Tag("integration")
+public class StreamsGroupCommandTest {
+
+    public static EmbeddedKafkaCluster cluster = null;
+    static KafkaStreams streams;
+    private static final String APP_ID = "streams-group-command-test";
+    private static final String INPUT_TOPIC = "customInputTopic";
+    private static final String OUTPUT_TOPIC = "customOutputTopic";
+
+    @BeforeAll
+    public static void setup() throws Exception {
+        // start the cluster and create the input topic
+        final Properties props = new Properties();
+        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
+        cluster = new EmbeddedKafkaCluster(1, props);
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC, 2, 1);
+
+
+        // start kafka streams
+        Properties streamsProp = new Properties();
+        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+        streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+
+        streams = new KafkaStreams(topology(), streamsProp);
+        startApplicationAndWaitUntilRunning(streams);
+    }
+
+    @AfterAll
+    public static void closeCluster() {
+        streams.close();
+        cluster.stop();
+        cluster = null;
+    }
+
+    @Test
+    public void testListStreamsGroupWithoutFilters() throws Exception {
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list"})) {
+            Set<String> expectedGroups = new 
HashSet<>(Collections.singleton(APP_ID));
+
+            final AtomicReference<Set> foundGroups = new AtomicReference<>();
+            TestUtils.waitForCondition(() -> {
+                foundGroups.set(new HashSet<>(service.listStreamsGroups()));
+                return Objects.equals(expectedGroups, foundGroups.get());
+            }, "Expected --list to show streams groups " + expectedGroups + ", 
but found " + foundGroups.get() + ".");
+
+        }
+    }
+
+    @Test
+    public void testListWithUnrecognizedNewOption() throws Exception {
+        String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server", 
cluster.bootstrapServers(), "--list"};
+        Assertions.assertThrows(OptionException.class, () -> 
getStreamsGroupService(cgcArgs));
+    }
+
+    @Test
+    public void testListStreamsGroupWithStates() throws Exception {
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list", "--state"})) {
+            Set<GroupListing> expectedListing = Set.of(
+                new GroupListing(
+                    APP_ID,
+                    Optional.of(GroupType.STREAMS),
+                    "streams",
+                    Optional.of(GroupState.STABLE))
+            );
+
+            final AtomicReference<Set<GroupListing>> foundListing = new 
AtomicReference<>();
+
+            TestUtils.waitForCondition(() -> {
+                foundListing.set(new 
HashSet<>(service.listStreamsGroupsInStates(Collections.emptySet())));
+                return Objects.equals(expectedListing, foundListing.get());
+            }, "Expected --list to show streams groups " + expectedListing + 
", but found " + foundListing.get() + ".");
+        }
+    }
+
+    @Test
+    public void testListStreamsGroupWithSpecifiedStates() throws Exception {
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list", "--state", "stable"})) {
+            Set<GroupListing> expectedListing = Set.of(
+                new GroupListing(
+                    APP_ID,
+                    Optional.of(GroupType.STREAMS),
+                    "streams",
+                    Optional.of(GroupState.STABLE))
+            );
+
+            final AtomicReference<Set<GroupListing>> foundListing = new 
AtomicReference<>();
+
+            TestUtils.waitForCondition(() -> {
+                foundListing.set(new 
HashSet<>(service.listStreamsGroupsInStates(Collections.emptySet())));
+                return Objects.equals(expectedListing, foundListing.get());
+            }, "Expected --list to show streams groups " + expectedListing + 
", but found " + foundListing.get() + ".");
+        }
+
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
+            Set<GroupListing> expectedListing = Collections.emptySet();
+
+            final AtomicReference<Set<GroupListing>> foundListing = new 
AtomicReference<>();
+
+            TestUtils.waitForCondition(() -> {
+                foundListing.set(new 
HashSet<>(service.listStreamsGroupsInStates(Collections.singleton(GroupState.PREPARING_REBALANCE))));
+                return Objects.equals(expectedListing, foundListing.get());
+            }, "Expected --list to show streams groups " + expectedListing + 
", but found " + foundListing.get() + ".");
+        }
+    }
+
+    @Test
+    public void testListStreamsGroupOutput() throws Exception {
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list"),
+            Collections.emptyList(),
+            Set.of(Collections.singletonList(APP_ID))
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list", "--state"),
+            Arrays.asList("GROUP", "STATE"),
+            Set.of(Arrays.asList(APP_ID, "Stable"))
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list", "--state", "Stable"),
+            Arrays.asList("GROUP", "STATE"),
+            Set.of(Arrays.asList(APP_ID, "Stable"))
+        );
+
+        // Check case-insensitivity in state filter.
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--list", "--state", "stable"),
+            Arrays.asList("GROUP", "STATE"),
+            Set.of(Arrays.asList(APP_ID, "Stable"))
+        );
+    }
+
+    @Test
+    public void testDescribeStreamsGroup() throws Exception {
+        final List<String> expectedHeader = List.of("GROUP", "TOPIC", 
"PARTITION", "OFFSET-LAG");
+        final Set<List<String>> expectedRows = Set.of(
+            List.of(APP_ID, INPUT_TOPIC, "0", "0"),
+            List.of(APP_ID, INPUT_TOPIC, "1", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "0", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "1", "0"));
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe"), expectedHeader, expectedRows, List.of());
+        // --describe --offsets has the same output as --describe
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--offsets"), expectedHeader, expectedRows, List.of());
+    }
+
+    @Test
+    public void testDescribeStreamsGroupWithVerboseOption() throws Exception {
+        final List<String> expectedHeader = List.of("GROUP", "TOPIC", 
"PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
+        final Set<List<String>> expectedRows = Set.of(
+            List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"),
+            List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "0", "-", "-", "0", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "1", "-", "-", "0", "0"));
+        // The state-store-topic name is not deterministic, so we don't care 
about topic names.
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose"), expectedHeader, expectedRows, List.of());
+        // --describe --offsets has the same output as --describe
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--offsets", "--verbose"), expectedHeader, expectedRows, 
List.of());
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--offsets"), expectedHeader, expectedRows, 
List.of());
+    }
+
+    @Test
+    public void testDescribeStreamsGroupWithStateOption() throws Exception {
+        final List<String> expectedHeader = Arrays.asList("GROUP", 
"COORDINATOR", "(ID)", "STATE", "#MEMBERS");
+        final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, 
"", "", "Stable", "2"));
+        // The coordinator is not deterministic, so we don't care about it.
+        final List<Integer> dontCares = List.of(1, 2);
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--state"), expectedHeader, expectedRows, dontCares);
+    }
+
+    @Test
+    public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws 
Exception {
+        final List<String> expectedHeader = Arrays.asList("GROUP", 
"COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", 
"#MEMBERS");
+        final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, 
"", "", "Stable", "3", "3", "2"));
+        // The coordinator is not deterministic, so we don't care about it.
+        final List<Integer> dontCares = List.of(1, 2);
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--state", "--verbose"), expectedHeader, expectedRows, dontCares);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--state"), expectedHeader, expectedRows, dontCares);
+    }
+
+    @Test
+    public void testDescribeStreamsGroupWithMembersOption() throws Exception {
+        final List<String> expectedHeader = List.of("GROUP", "MEMBER", 
"PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+        final Set<List<String>> expectedRows = Set.of(
+            List.of(APP_ID, "", "", "", "ACTIVE:","0:[0,1];"),
+            List.of(APP_ID, "", "", "", "ACTIVE:","1:[0,1];"));
+        // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
+        final List<Integer> dontCares = List.of(1, 2, 3);
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--members"), expectedHeader, expectedRows, dontCares);
+    }
+
+    @Test
+    public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws 
Exception {
+        final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+        final Set<List<String>> expectedRows = Set.of(
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", 
"ACTIVE:","0:[0,1];", "TARGET-ACTIVE:","0:[0,1];"),
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", 
"ACTIVE:","1:[0,1];", "TARGET-ACTIVE:","1:[0,1];"));
+        // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
+        final List<Integer> dontCares = List.of(3, 6, 7);
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--members", "--verbose"), expectedHeader, expectedRows, 
dontCares);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--members"), expectedHeader, expectedRows, 
dontCares);
+    }
+
+    private static Topology topology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+            .groupBy((key, value) -> value)
+            .count()
+            .toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), 
Serdes.Long()));
+        return builder.build();
+    }
+
+    private StreamsGroupCommand.StreamsGroupService 
getStreamsGroupService(String[] args) {
+        StreamsGroupCommandOptions opts = 
StreamsGroupCommandOptions.fromArgs(args);
+        return new StreamsGroupCommand.StreamsGroupService(
+            opts,
+            Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, 
Integer.toString(Integer.MAX_VALUE))
+        );
+    }
+
+    private static void validateListOutput(
+        List<String> args,
+        List<String> expectedHeader,
+        Set<List<String>> expectedRows
+    ) throws InterruptedException {
+        final AtomicReference<String> out = new AtomicReference<>("");
+        TestUtils.waitForCondition(() -> {
+            String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+            out.set(output);
+
+            String[] lines = output.split("\n");
+            if (lines.length == 1 && lines[0].isEmpty()) lines = new 
String[]{};
+
+            if (!expectedHeader.isEmpty() && lines.length > 0) {
+                List<String> header = Arrays.asList(lines[0].split("\\s+"));
+                if (!expectedHeader.equals(header)) return false;
+            }
+
+            Set<List<String>> groups = Arrays.stream(lines, 
expectedHeader.isEmpty() ? 0 : 1, lines.length)
+                .map(line -> Arrays.asList(line.split("\\s+")))
+                .collect(Collectors.toSet());
+            return expectedRows.equals(groups);
+        }, () -> String.format("Expected header=%s and groups=%s, but 
found:%n%s", expectedHeader, expectedRows, out.get()));
+    }
+
+    private static void validateDescribeOutput(
+        List<String> args,
+        List<String> expectedHeader,
+        Set<List<String>> expectedRows,
+        List<Integer> dontCareIndices
+    ) throws InterruptedException {
+        final AtomicReference<String> out = new AtomicReference<>("");
+        TestUtils.waitForCondition(() -> {
+            String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+            out.set(output);
+
+            String[] lines = output.split("\n");
+            if (lines.length == 1 && lines[0].isEmpty()) lines = new 
String[]{};
+
+            if (lines.length == 0) return false;
+            List<String> header = Arrays.asList(lines[0].split("\\s+"));
+            if (!expectedHeader.equals(header)) return false;
+
+            Set<List<String>> groupDesc = 
Arrays.stream(Arrays.copyOfRange(lines, 1, lines.length))
+                .map(line -> Arrays.asList(line.split("\\s+")))
+                .collect(Collectors.toSet());
+            if (groupDesc.size() != expectedRows.size()) return false;
+            // clear the dontCare fields and then compare two sets
+            return expectedRows
+                .equals(
+                    groupDesc.stream()
+                        .map(list -> {
+                            List<String> listCloned = new ArrayList<>(list);
+                            dontCareIndices.forEach(index -> 
listCloned.set(index, ""));
+                            return listCloned;
+                        }).collect(Collectors.toSet())
+                );
+        }, () -> String.format("Expected header=%s and groups=%s, but 
found:%n%s", expectedHeader, expectedRows, out.get()));
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
index 2a1c3047d85..86ee20b41c6 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
 import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
 import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
@@ -177,6 +179,13 @@ public class StreamsGroupCommandUnitTest {
 
         
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
 
+        ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new 
HashMap<>();
+        committedOffsetsMap.put(new TopicPartition("topic1", 0), new 
OffsetAndMetadata(12, Optional.of(0), ""));
+
+        
when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
+        
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+
         StreamsGroupMemberDescription description = new 
StreamsGroupMemberDescription("foo", 0, Optional.empty(),
             Optional.empty(), "bar", "baz", 0, "qux",
             Optional.empty(), Map.of(), List.of(), List.of(),
@@ -193,9 +202,9 @@ public class StreamsGroupCommandUnitTest {
             new Node(0, "host", 0),
             null);
         StreamsGroupCommand.StreamsGroupService service = new 
StreamsGroupCommand.StreamsGroupService(null, adminClient);
-        Map<TopicPartition, Long> lags = 
service.getOffsets(List.of(description), x);
+        Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags = 
service.getOffsets(x);
         assertEquals(1, lags.size());
-        assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
+        assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L), 
Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));
         service.close();
     }
 


Reply via email to