This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new 1b028735119 KAFKA-17125 Add integration test for StreamsGroup in Admin
API (#18911)
1b028735119 is described below
commit 1b0287351191eb50f9b4a1fb7193bea6e9965b96
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Feb 21 16:27:00 2025 +0100
KAFKA-17125 Add integration test for StreamsGroup in Admin API (#18911)
Integration test for both `--list` and `--describe` commands.
---
.../kafka/tools/streams/StreamsGroupCommand.java | 123 ++++---
.../tools/streams/StreamsGroupCommandOptions.java | 8 +-
.../tools/streams/StreamsGroupCommandTest.java | 366 +++++++++++++++++++++
.../tools/streams/StreamsGroupCommandUnitTest.java | 13 +-
4 files changed, 462 insertions(+), 48 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0543ee270e6..c8333b03bae 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
@@ -38,6 +40,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -169,17 +172,19 @@ public class StreamsGroupCommand {
}
public void describeGroups() throws ExecutionException,
InterruptedException {
- String group = opts.options.valueOf(opts.groupOpt);
- StreamsGroupDescription description = getDescribeGroup(group);
- if (description == null)
- return;
- boolean verbose = opts.options.has(opts.verboseOpt);
- if (opts.options.has(opts.membersOpt)) {
- printMembers(description, verbose);
- } else if (opts.options.has(opts.stateOpt)) {
- printStates(description, verbose);
- } else {
- printOffsets(description, verbose);
+ List<String> groups = listStreamsGroups();
+ if (!groups.isEmpty()) {
+ StreamsGroupDescription description =
getDescribeGroup(groups.get(0));
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
}
}
@@ -201,43 +206,52 @@ public class StreamsGroupCommand {
}
if (!verbose) {
- String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ASSIGNMENTS");
for (StreamsGroupMemberDescription member : members) {
- System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID");
- System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId());
- printTasks(member.assignment(), false);
- System.out.println();
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
getTasksForPrinting(member.assignment(), Optional.empty()));
}
} else {
- String fmt = "%" + -groupLen + "s %s %-15s%" +
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+ String fmt = "%" + -groupLen + "s %-25s %-15s%" +
-maxMemberIdLen + "s %-15s %-15s %" + -maxHostLen + "s %" + -maxClientIdLen +
"s %s\n";
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS",
"CLIENT-ID", "ASSIGNMENTS");
+
for (StreamsGroupMemberDescription member : members) {
- System.out.printf(fmt, "GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID");
System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), description.topologyEpoch(),
member.memberId(),
- member.isClassic() ? "classic" : "streams",
member.memberEpoch(), member.processId(), member.clientId());
- printTasks(member.assignment(), false);
- printTasks(member.targetAssignment(), true);
- System.out.println();
+ member.isClassic() ? "classic" : "streams",
member.memberEpoch(), member.processId(), member.clientId(),
getTasksForPrinting(member.assignment(),
Optional.of(member.targetAssignment())));
}
}
}
}
- private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds>
tasks, String taskType) {
- System.out.printf("%s%n", taskType + ": " +
tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" +
taskId.partitions()).collect(Collectors.joining(",")) + "] ");
+ private String
prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String
taskType) {
+ if (tasks.isEmpty()) {
+ return "";
+ }
+ StringBuilder builder = new StringBuilder(taskType).append(": ");
+ for (StreamsGroupMemberAssignment.TaskIds taskIds : tasks) {
+ builder.append(taskIds.subtopologyId()).append(":[");
+
builder.append(taskIds.partitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
+ builder.append("]; ");
+ }
+ return builder.toString();
}
- private void printTasks(StreamsGroupMemberAssignment assignment,
boolean isTarget) {
- String typePrefix = isTarget ? "TARGET-" : "";
- printTaskType(assignment.activeTasks(), typePrefix +
"ACTIVE-TASKS:");
- printTaskType(assignment.standbyTasks(), typePrefix +
"STANDBY-TASKS:");
- printTaskType(assignment.warmupTasks(), typePrefix +
"WARMUP-TASKS:");
+ private String getTasksForPrinting(StreamsGroupMemberAssignment
assignment, Optional<StreamsGroupMemberAssignment> targetAssignment) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(prepareTaskType(assignment.activeTasks(), "ACTIVE"))
+ .append(prepareTaskType(assignment.standbyTasks(), "STANDBY"))
+ .append(prepareTaskType(assignment.warmupTasks(), "WARMUP"));
+ targetAssignment.ifPresent(target ->
builder.append(prepareTaskType(target.activeTasks(), "TARGET-ACTIVE"))
+ .append(prepareTaskType(target.standbyTasks(),
"TARGET-STANDBY"))
+ .append(prepareTaskType(target.warmupTasks(),
"TARGET-WARMUP")));
+ return builder.toString();
}
private void printStates(StreamsGroupDescription description, boolean
verbose) {
maybePrintEmptyGroupState(description.groupId(),
description.groupState(), 1);
int groupLen = Math.max(15, description.groupId().length());
- String coordinator = description.coordinator().host() + ":" +
description.coordinator().port() + " (" + description.coordinator().idString()
+ ")";
+ String coordinator = description.coordinator().host() + ":" +
description.coordinator().port() + " (" + description.coordinator().idString()
+ ")";
int coordinatorLen = Math.max(25, coordinator.length());
if (!verbose) {
@@ -245,14 +259,14 @@ public class StreamsGroupCommand {
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.members().size());
} else {
- String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s
%-15s %-15s %-15s %s%n";
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s
%-15s %-15s %-25s %s\n";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.groupEpoch(),
description.targetAssignmentEpoch(), description.members().size());
}
}
private void printOffsets(StreamsGroupDescription description, boolean
verbose) throws ExecutionException, InterruptedException {
- Map<TopicPartition, Long> offsets =
getOffsets(description.members(), description);
+ Map<TopicPartition, OffsetsInfo> offsets = getOffsets(description);
if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), offsets.size())) {
int groupLen = Math.max(15, description.groupId().length());
int maxTopicLen = 15;
@@ -261,22 +275,25 @@ public class StreamsGroupCommand {
}
if (!verbose) {
- String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %s%n";
+ String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %-15s%n";
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"OFFSET-LAG");
- for (Map.Entry<TopicPartition, Long> offset :
offsets.entrySet()) {
- System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
+ for (Map.Entry<TopicPartition, OffsetsInfo> offset :
offsets.entrySet()) {
+ System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), offset.getValue().lag);
}
} else {
- String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %-15s %s%n";
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "OFFSET-LAG");
- for (Map.Entry<TopicPartition, Long> offset :
offsets.entrySet()) {
- System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+ String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %-15s %-15s %-15s %-15s%n";
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
+ for (Map.Entry<TopicPartition, OffsetsInfo> offset :
offsets.entrySet()) {
+ System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(),
+
offset.getValue().currentOffset.map(Object::toString).orElse("-"),
offset.getValue().leaderEpoch.map(Object::toString).orElse("-"),
+ offset.getValue().logEndOffset,
offset.getValue().lag);
}
}
}
}
- Map<TopicPartition, Long>
getOffsets(Collection<StreamsGroupMemberDescription> members,
StreamsGroupDescription description) throws ExecutionException,
InterruptedException {
+ Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription
description) throws ExecutionException, InterruptedException {
+ final Collection<StreamsGroupMemberDescription> members =
description.members();
Set<TopicPartition> allTp = new HashSet<>();
for (StreamsGroupMemberDescription memberDescription : members) {
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(),
description));
@@ -291,14 +308,31 @@ public class StreamsGroupCommand {
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
earliestResult = adminClient.listOffsets(earliest).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
latestResult = adminClient.listOffsets(latest).all().get();
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
getCommittedOffsets(description.groupId());
- Map<TopicPartition, Long> lag = new HashMap<>();
+ Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
for (Map.Entry<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
- lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() -
earliestResult.get(tp.getKey()).offset());
+ final Optional<Long> currentOffset =
committedOffsets.containsKey(tp.getKey()) ?
Optional.of(committedOffsets.get(tp.getKey()).offset()) : Optional.empty();
+ final Optional<Integer> leaderEpoch =
committedOffsets.containsKey(tp.getKey()) ?
committedOffsets.get(tp.getKey()).leaderEpoch() : Optional.empty();
+ final long lag = currentOffset.map(current ->
latestResult.get(tp.getKey()).offset() - current).orElseGet(() ->
latestResult.get(tp.getKey()).offset() -
earliestResult.get(tp.getKey()).offset());
+ output.put(tp.getKey(),
+ new OffsetsInfo(
+ currentOffset,
+ leaderEpoch,
+ latestResult.get(tp.getKey()).offset(),
+ lag));
}
- return lag;
+ return output;
}
+ Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String
groupId) {
+ try {
+ return adminClient.listConsumerGroupOffsets(
+ Collections.singletonMap(groupId, new
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
/**
* Prints a summary of the state for situations where the group is
empty or dead.
@@ -347,4 +381,7 @@ public class StreamsGroupCommand {
return Admin.create(props);
}
}
+
+ record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer>
leaderEpoch, Long logEndOffset, Long lag) {
+ }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index c97e99e65a8..7982f96972d 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -60,6 +60,11 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
public final OptionSpec<Void> verboseOpt;
+ public static StreamsGroupCommandOptions fromArgs(String[] args) {
+ StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+ opts.checkArgs();
+ return opts;
+ }
public StreamsGroupCommandOptions(String[] args) {
super(args);
@@ -104,9 +109,6 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt);
if (options.has(describeOpt)) {
- if (!options.has(groupOpt))
- CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " takes the option: " +
groupOpt);
List<OptionSpec<?>> mutuallyExclusiveOpts =
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ?
1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
new file mode 100644
index 00000000000..e6f240f6350
--- /dev/null
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.GroupProtocol;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.ToolsTestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionException;
+
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+
+@Timeout(600)
+@Tag("integration")
+public class StreamsGroupCommandTest {
+
+ public static EmbeddedKafkaCluster cluster = null;
+ static KafkaStreams streams;
+ private static final String APP_ID = "streams-group-command-test";
+ private static final String INPUT_TOPIC = "customInputTopic";
+ private static final String OUTPUT_TOPIC = "customOutputTopic";
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ // start the cluster and create the input topic
+ final Properties props = new Properties();
+
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,streams");
+ cluster = new EmbeddedKafkaCluster(1, props);
+ cluster.start();
+ cluster.createTopic(INPUT_TOPIC, 2, 1);
+
+
+ // start kafka streams
+ Properties streamsProp = new Properties();
+ streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
+ streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
+ streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
+ streamsProp.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+ streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+
+ streams = new KafkaStreams(topology(), streamsProp);
+ startApplicationAndWaitUntilRunning(streams);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ streams.close();
+ cluster.stop();
+ cluster = null;
+ }
+
+ @Test
+ public void testListStreamsGroupWithoutFilters() throws Exception {
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--list"})) {
+ Set<String> expectedGroups = new
HashSet<>(Collections.singleton(APP_ID));
+
+ final AtomicReference<Set> foundGroups = new AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+ foundGroups.set(new HashSet<>(service.listStreamsGroups()));
+ return Objects.equals(expectedGroups, foundGroups.get());
+ }, "Expected --list to show streams groups " + expectedGroups + ",
but found " + foundGroups.get() + ".");
+
+ }
+ }
+
+ @Test
+ public void testListWithUnrecognizedNewOption() throws Exception {
+ String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server",
cluster.bootstrapServers(), "--list"};
+ Assertions.assertThrows(OptionException.class, () ->
getStreamsGroupService(cgcArgs));
+ }
+
+ @Test
+ public void testListStreamsGroupWithStates() throws Exception {
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--list", "--state"})) {
+ Set<GroupListing> expectedListing = Set.of(
+ new GroupListing(
+ APP_ID,
+ Optional.of(GroupType.STREAMS),
+ "streams",
+ Optional.of(GroupState.STABLE))
+ );
+
+ final AtomicReference<Set<GroupListing>> foundListing = new
AtomicReference<>();
+
+ TestUtils.waitForCondition(() -> {
+ foundListing.set(new
HashSet<>(service.listStreamsGroupsInStates(Collections.emptySet())));
+ return Objects.equals(expectedListing, foundListing.get());
+ }, "Expected --list to show streams groups " + expectedListing +
", but found " + foundListing.get() + ".");
+ }
+ }
+
+ @Test
+ public void testListStreamsGroupWithSpecifiedStates() throws Exception {
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--list", "--state", "stable"})) {
+ Set<GroupListing> expectedListing = Set.of(
+ new GroupListing(
+ APP_ID,
+ Optional.of(GroupType.STREAMS),
+ "streams",
+ Optional.of(GroupState.STABLE))
+ );
+
+ final AtomicReference<Set<GroupListing>> foundListing = new
AtomicReference<>();
+
+ TestUtils.waitForCondition(() -> {
+ foundListing.set(new
HashSet<>(service.listStreamsGroupsInStates(Collections.emptySet())));
+ return Objects.equals(expectedListing, foundListing.get());
+ }, "Expected --list to show streams groups " + expectedListing +
", but found " + foundListing.get() + ".");
+ }
+
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
+ Set<GroupListing> expectedListing = Collections.emptySet();
+
+ final AtomicReference<Set<GroupListing>> foundListing = new
AtomicReference<>();
+
+ TestUtils.waitForCondition(() -> {
+ foundListing.set(new
HashSet<>(service.listStreamsGroupsInStates(Collections.singleton(GroupState.PREPARING_REBALANCE))));
+ return Objects.equals(expectedListing, foundListing.get());
+ }, "Expected --list to show streams groups " + expectedListing +
", but found " + foundListing.get() + ".");
+ }
+ }
+
+ @Test
+ public void testListStreamsGroupOutput() throws Exception {
+ validateListOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--list"),
+ Collections.emptyList(),
+ Set.of(Collections.singletonList(APP_ID))
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--list", "--state"),
+ Arrays.asList("GROUP", "STATE"),
+ Set.of(Arrays.asList(APP_ID, "Stable"))
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--list", "--state", "Stable"),
+ Arrays.asList("GROUP", "STATE"),
+ Set.of(Arrays.asList(APP_ID, "Stable"))
+ );
+
+ // Check case-insensitivity in state filter.
+ validateListOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--list", "--state", "stable"),
+ Arrays.asList("GROUP", "STATE"),
+ Set.of(Arrays.asList(APP_ID, "Stable"))
+ );
+ }
+
+ @Test
+ public void testDescribeStreamsGroup() throws Exception {
+ final List<String> expectedHeader = List.of("GROUP", "TOPIC",
"PARTITION", "OFFSET-LAG");
+ final Set<List<String>> expectedRows = Set.of(
+ List.of(APP_ID, INPUT_TOPIC, "0", "0"),
+ List.of(APP_ID, INPUT_TOPIC, "1", "0"),
+ List.of(APP_ID,
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
"0", "0"),
+ List.of(APP_ID,
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
"1", "0"));
+
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe"), expectedHeader, expectedRows, List.of());
+ // --describe --offsets has the same output as --describe
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--offsets"), expectedHeader, expectedRows, List.of());
+ }
+
+ @Test
+ public void testDescribeStreamsGroupWithVerboseOption() throws Exception {
+ final List<String> expectedHeader = List.of("GROUP", "TOPIC",
"PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
+ final Set<List<String>> expectedRows = Set.of(
+ List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"),
+ List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"),
+ List.of(APP_ID,
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
"0", "-", "-", "0", "0"),
+ List.of(APP_ID,
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
"1", "-", "-", "0", "0"));
+ // The state-store-topic name is not deterministic, so we don't care
about topic names.
+
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--verbose"), expectedHeader, expectedRows, List.of());
+ // --describe --offsets has the same output as --describe
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--offsets", "--verbose"), expectedHeader, expectedRows,
List.of());
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--verbose", "--offsets"), expectedHeader, expectedRows,
List.of());
+ }
+
+ @Test
+ public void testDescribeStreamsGroupWithStateOption() throws Exception {
+ final List<String> expectedHeader = Arrays.asList("GROUP",
"COORDINATOR", "(ID)", "STATE", "#MEMBERS");
+ final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID,
"", "", "Stable", "2"));
+ // The coordinator is not deterministic, so we don't care about it.
+ final List<Integer> dontCares = List.of(1, 2);
+
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--state"), expectedHeader, expectedRows, dontCares);
+ }
+
+ @Test
+ public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws
Exception {
+ final List<String> expectedHeader = Arrays.asList("GROUP",
"COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH",
"#MEMBERS");
+ final Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID,
"", "", "Stable", "3", "3", "2"));
+ // The coordinator is not deterministic, so we don't care about it.
+ final List<Integer> dontCares = List.of(1, 2);
+
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--state", "--verbose"), expectedHeader, expectedRows, dontCares);
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--verbose", "--state"), expectedHeader, expectedRows, dontCares);
+ }
+
+ @Test
+ public void testDescribeStreamsGroupWithMembersOption() throws Exception {
+ final List<String> expectedHeader = List.of("GROUP", "MEMBER",
"PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+ final Set<List<String>> expectedRows = Set.of(
+ List.of(APP_ID, "", "", "", "ACTIVE:","0:[0,1];"),
+ List.of(APP_ID, "", "", "", "ACTIVE:","1:[0,1];"));
+ // The member and process names as well as client-id are not
deterministic, so we don't care about them.
+ final List<Integer> dontCares = List.of(1, 2, 3);
+
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--members"), expectedHeader, expectedRows, dontCares);
+ }
+
+ @Test
+ public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws
Exception {
+ final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+ final Set<List<String>> expectedRows = Set.of(
+ List.of(APP_ID, "3", "0", "", "streams", "3", "", "",
"ACTIVE:","0:[0,1];", "TARGET-ACTIVE:","0:[0,1];"),
+ List.of(APP_ID, "3", "0", "", "streams", "3", "", "",
"ACTIVE:","1:[0,1];", "TARGET-ACTIVE:","1:[0,1];"));
+ // The member and process names as well as client-id are not
deterministic, so we don't care about them.
+ final List<Integer> dontCares = List.of(3, 6, 7);
+
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--members", "--verbose"), expectedHeader, expectedRows,
dontCares);
+ validateDescribeOutput(
+ Arrays.asList("--bootstrap-server", cluster.bootstrapServers(),
"--describe", "--verbose", "--members"), expectedHeader, expectedRows,
dontCares);
+ }
+
+ private static Topology topology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .flatMapValues(value ->
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+ .groupBy((key, value) -> value)
+ .count()
+ .toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(),
Serdes.Long()));
+ return builder.build();
+ }
+
+ private StreamsGroupCommand.StreamsGroupService
getStreamsGroupService(String[] args) {
+ StreamsGroupCommandOptions opts =
StreamsGroupCommandOptions.fromArgs(args);
+ return new StreamsGroupCommand.StreamsGroupService(
+ opts,
+ Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE))
+ );
+ }
+
+ private static void validateListOutput(
+ List<String> args,
+ List<String> expectedHeader,
+ Set<List<String>> expectedRows
+ ) throws InterruptedException {
+ final AtomicReference<String> out = new AtomicReference<>("");
+ TestUtils.waitForCondition(() -> {
+ String output = ToolsTestUtils.grabConsoleOutput(() ->
StreamsGroupCommand.main(args.toArray(new String[0])));
+ out.set(output);
+
+ String[] lines = output.split("\n");
+ if (lines.length == 1 && lines[0].isEmpty()) lines = new
String[]{};
+
+ if (!expectedHeader.isEmpty() && lines.length > 0) {
+ List<String> header = Arrays.asList(lines[0].split("\\s+"));
+ if (!expectedHeader.equals(header)) return false;
+ }
+
+ Set<List<String>> groups = Arrays.stream(lines,
expectedHeader.isEmpty() ? 0 : 1, lines.length)
+ .map(line -> Arrays.asList(line.split("\\s+")))
+ .collect(Collectors.toSet());
+ return expectedRows.equals(groups);
+ }, () -> String.format("Expected header=%s and groups=%s, but
found:%n%s", expectedHeader, expectedRows, out.get()));
+ }
+
+ private static void validateDescribeOutput(
+ List<String> args,
+ List<String> expectedHeader,
+ Set<List<String>> expectedRows,
+ List<Integer> dontCareIndices
+ ) throws InterruptedException {
+ final AtomicReference<String> out = new AtomicReference<>("");
+ TestUtils.waitForCondition(() -> {
+ String output = ToolsTestUtils.grabConsoleOutput(() ->
StreamsGroupCommand.main(args.toArray(new String[0])));
+ out.set(output);
+
+ String[] lines = output.split("\n");
+ if (lines.length == 1 && lines[0].isEmpty()) lines = new
String[]{};
+
+ if (lines.length == 0) return false;
+ List<String> header = Arrays.asList(lines[0].split("\\s+"));
+ if (!expectedHeader.equals(header)) return false;
+
+ Set<List<String>> groupDesc =
Arrays.stream(Arrays.copyOfRange(lines, 1, lines.length))
+ .map(line -> Arrays.asList(line.split("\\s+")))
+ .collect(Collectors.toSet());
+ if (groupDesc.size() != expectedRows.size()) return false;
+ // clear the dontCare fields and then compare two sets
+ return expectedRows
+ .equals(
+ groupDesc.stream()
+ .map(list -> {
+ List<String> listCloned = new ArrayList<>(list);
+ dontCareIndices.forEach(index ->
listCloned.set(index, ""));
+ return listCloned;
+ }).collect(Collectors.toSet())
+ );
+ }, () -> String.format("Expected header=%s and groups=%s, but
found:%n%s", expectedHeader, expectedRows, out.get()));
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
index 2a1c3047d85..86ee20b41c6 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@@ -177,6 +179,13 @@ public class StreamsGroupCommandUnitTest {
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
endOffset);
+ ListConsumerGroupOffsetsResult result =
mock(ListConsumerGroupOffsetsResult.class);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new
HashMap<>();
+ committedOffsetsMap.put(new TopicPartition("topic1", 0), new
OffsetAndMetadata(12, Optional.of(0), ""));
+
+
when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
+
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+
StreamsGroupMemberDescription description = new
StreamsGroupMemberDescription("foo", 0, Optional.empty(),
Optional.empty(), "bar", "baz", 0, "qux",
Optional.empty(), Map.of(), List.of(), List.of(),
@@ -193,9 +202,9 @@ public class StreamsGroupCommandUnitTest {
new Node(0, "host", 0),
null);
StreamsGroupCommand.StreamsGroupService service = new
StreamsGroupCommand.StreamsGroupService(null, adminClient);
- Map<TopicPartition, Long> lags =
service.getOffsets(List.of(description), x);
+ Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags =
service.getOffsets(x);
assertEquals(1, lags.size());
- assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
+ assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L),
Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));
service.close();
}