This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2b589a451a9 KAFKA-19244: Add support for kafka-streams-groups.sh
options (delete offsets) [4/N] (#19895)
2b589a451a9 is described below
commit 2b589a451a976cca7204b49e5fd5c9bdcd08b3ea
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Jun 10 14:55:48 2025 +0200
KAFKA-19244: Add support for kafka-streams-groups.sh options (delete
offsets) [4/N] (#19895)
This PR implements
`--delete-offsets --all-input-topics`
`--delete-offsets --input-topic String: topicName`
Testing: integration test
Reviewers: Lucas Brutschy <[email protected]>
---
.../kafka/tools/streams/StreamsGroupCommand.java | 574 +++++++++++++--------
.../tools/streams/StreamsGroupCommandOptions.java | 57 +-
.../streams/DeleteStreamsGroupOffsetTest.java | 440 ++++++++++++++++
.../tools/streams/DeleteStreamsGroupTest.java | 2 -
.../tools/streams/StreamsGroupCommandTest.java | 12 +-
5 files changed, 867 insertions(+), 218 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0c7b79d5685..f85f68d72c6 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -19,14 +19,18 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.GroupListing;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -39,15 +43,19 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -55,6 +63,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -62,15 +71,17 @@ import joptsimple.OptionException;
public class StreamsGroupCommand {
+ static final String MISSING_COLUMN_VALUE = "-";
+
public static void main(String[] args) {
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
try {
opts.checkArgs();
// should have exactly one action
- long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt,
opts.deleteOpt).filter(opts.options::has).count();
+ long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt,
opts.deleteOpt, opts.deleteOffsetsOpt).filter(opts.options::has).count();
if (numberOfActions != 1)
- CommandLineUtils.printUsageAndExit(opts.parser, "Command must
include exactly one action: --list, --describe, or --delete.");
+ CommandLineUtils.printUsageAndExit(opts.parser, "Command must
include exactly one action: --list, --describe, --delete, or
--delete-offsets.");
run(opts);
} catch (OptionException e) {
@@ -86,6 +97,8 @@ public class StreamsGroupCommand {
streamsGroupService.describeGroups();
} else if (opts.options.has(opts.deleteOpt)) {
streamsGroupService.deleteGroups();
+ } else if (opts.options.has(opts.deleteOffsetsOpt)) {
+ streamsGroupService.deleteOffsets();
} else {
throw new IllegalArgumentException("Unknown action!");
}
@@ -192,12 +205,328 @@ public class StreamsGroupCommand {
}
}
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? new ArrayList<>(listStreamsGroups())
+ : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+ // pre admin call checks
+ Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+ groupIds.removeAll(failed.keySet());
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, List<String>> internalTopics = new HashMap<>();
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!groupIds.isEmpty()) {
+ // retrieve internal topics before deleting groups
+ internalTopics = retrieveInternalTopics(groupIds);
+
+ // delete streams groups
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DeleteStreamsGroupsOptions())
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ DeleteTopicsResult deleteTopicsResult = null;
+ try {
+ deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException
e) {
+ if (deleteTopicsResult != null) {
+
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
+ try {
+ future.get();
+ } catch (Exception topicException) {
+ System.out.println("Failed to
delete internal topic: " + topic);
+ }
+ });
+ }
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+ }
+ // display outcome messages based on the results
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
+ }
+ }
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+ // for testing purpose: return all failures, including internal
topics deletion failures
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
+ List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+ Map<String, Throwable> failed = new HashMap<>();
+
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+ return failed;
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> sourceTopics =
description.subtopologies().stream()
+ .flatMap(subtopology ->
subtopology.sourceTopics().stream()).toList();
+
+ List<String> internalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+
subtopology.repartitionSourceTopics().keySet().stream(),
+
subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !sourceTopics.contains(topic))
+ .collect(Collectors.toList());
+ internalTopics.removeIf(topic -> {
+ if (!isInferredInternalTopic(topic,
description.groupId())) {
+ printError("The internal topic '" + topic + "' is
not inferred as internal " +
+ "and thus will not be deleted with the group
'" + description.groupId() + "'.", Optional.empty());
+ return true;
+ }
+ return false;
+ });
+ if (!internalTopics.isEmpty()) {
+ groupToInternalTopics.put(description.groupId(),
internalTopics);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof UnsupportedVersionException) {
+ printError("Retrieving internal topics is not supported by
the broker version. " +
+ "Use 'kafka-topics.sh' to list and delete the group's
internal topics.", Optional.of(e.getCause()));
+ } else {
+ printError("Retrieving internal topics failed due to " +
e.getMessage(), Optional.of(e));
+ }
+ }
+ return groupToInternalTopics;
+ }
+
+ private boolean isInferredInternalTopic(final String topicName, final
String applicationId) {
+ return topicName.startsWith(applicationId + "-") &&
matchesInternalTopicFormat(topicName);
+ }
+
+ public static boolean matchesInternalTopicFormat(final String
topicName) {
+ return topicName.endsWith("-changelog") ||
topicName.endsWith("-repartition")
+ || topicName.endsWith("-subscription-registration-topic")
+ || topicName.endsWith("-subscription-response-topic")
+ ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
+ ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
+ }
+
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join("', '", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
+ }
+ }
+ }
+
+ List<GroupListing> listDetailedStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map.Entry<Errors, Map<TopicPartition, Throwable>>
deleteOffsets(String groupId, List<String> topics) {
+ Map<TopicPartition, Throwable> partitionLevelResult = new
HashMap<>();
+ Set<String> topicWithPartitions = new HashSet<>();
+ Set<String> topicWithoutPartitions = new HashSet<>();
+
+ for (String topic : topics) {
+ if (topic.contains(":"))
+ topicWithPartitions.add(topic);
+ else
+ topicWithoutPartitions.add(topic);
+ }
+
+ List<TopicPartition> specifiedPartitions =
topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).toList();
+
+ // Get the partitions of topics that the user did not explicitly
specify the partitions
+ DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(
+ topicWithoutPartitions,
+ withTimeoutMs(new DescribeTopicsOptions()));
+
+ Iterator<TopicPartition> unspecifiedPartitions =
describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {
+ String topic = e.getKey();
+ try {
+ return
e.getValue().get().partitions().stream().map(partition ->
+ new TopicPartition(topic, partition.partition()));
+ } catch (ExecutionException | InterruptedException err) {
+ partitionLevelResult.put(new TopicPartition(topic, -1),
err);
+ return Stream.empty();
+ }
+ }).iterator();
+
+ Set<TopicPartition> partitions = new
HashSet<>(specifiedPartitions);
+
+ unspecifiedPartitions.forEachRemaining(partitions::add);
+
+ return deleteOffsets(groupId, partitions, partitionLevelResult);
+ }
+
+ private Map.Entry<Errors, Map<TopicPartition, Throwable>>
deleteOffsets(String groupId, Set<TopicPartition> partitions,
Map<TopicPartition, Throwable> partitionLevelResult) {
+
+ DeleteStreamsGroupOffsetsResult deleteResult =
adminClient.deleteStreamsGroupOffsets(
+ groupId,
+ partitions,
+ withTimeoutMs(new DeleteStreamsGroupOffsetsOptions())
+ );
+
+ Errors topLevelException = Errors.NONE;
+
+ try {
+ deleteResult.all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ topLevelException = Errors.forException(e.getCause());
+ }
+
+ partitions.forEach(partition -> {
+ try {
+ deleteResult.partitionResult(partition).get();
+ partitionLevelResult.put(partition, null);
+ } catch (ExecutionException | InterruptedException e) {
+ partitionLevelResult.put(partition, e);
+ }
+ });
+
+ return new AbstractMap.SimpleImmutableEntry<>(topLevelException,
partitionLevelResult);
+ }
+
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets() {
+ String groupId = opts.options.valueOf(opts.groupOpt);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
+ if (opts.options.has(opts.allInputTopicsOpt)) {
+ Set<TopicPartition> partitions =
getCommittedOffsets(groupId).keySet();
+ res = deleteOffsets(groupId, partitions, new HashMap<>());
+ } else if (opts.options.has(opts.inputTopicOpt)) {
+ List<String> topics =
opts.options.valuesOf(opts.inputTopicOpt);
+ res = deleteOffsets(groupId, topics);
+ } else {
+ CommandLineUtils.printUsageAndExit(opts.parser, "Option " +
opts.deleteOffsetsOpt +
+ " requires either" + opts.allInputTopicsOpt + " or " +
opts.inputTopicOpt + " to be specified.");
+ return null;
+ }
+
+
+ Errors topLevelResult = res.getKey();
+ Map<TopicPartition, Throwable> partitionLevelResult =
res.getValue();
+
+ switch (topLevelResult) {
+ case NONE:
+ System.out.println("Request succeeded for deleting offsets
from group " + groupId + ".");
+ break;
+ case INVALID_GROUP_ID:
+ case GROUP_ID_NOT_FOUND:
+ case GROUP_AUTHORIZATION_FAILED:
+ case NON_EMPTY_GROUP:
+ printError(topLevelResult.message(), Optional.empty());
+ break;
+ case GROUP_SUBSCRIBED_TO_TOPIC:
+ case TOPIC_AUTHORIZATION_FAILED:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ printError("Encountered some partition-level error, see
the follow-up details.", Optional.empty());
+ break;
+ default:
+ printError("Encountered some unknown error: " +
topLevelResult, Optional.empty());
+ }
+
+ int maxTopicLen = 15;
+ for (TopicPartition tp : partitionLevelResult.keySet()) {
+ maxTopicLen = Math.max(maxTopicLen, tp.topic().length());
+ }
+
+ String format = "%n%" + (-maxTopicLen) + "s %-10s %-15s";
+
+ System.out.printf(format, "TOPIC", "PARTITION", "STATUS");
+ partitionLevelResult.entrySet().stream()
+ .sorted(Comparator.comparing(e -> e.getKey().topic() +
e.getKey().partition()))
+ .forEach(e -> {
+ TopicPartition tp = e.getKey();
+ Throwable error = e.getValue();
+ System.out.printf(format,
+ tp.topic(),
+ tp.partition() >= 0 ? tp.partition() :
MISSING_COLUMN_VALUE,
+ error != null ? "Error: " + error.getMessage() :
"Successful"
+ );
+ });
+ System.out.println();
+ // testing purpose: return the result of the delete operation
+ return res;
+ }
+
StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
return descriptionMap.get(group);
}
+ private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
+ int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
+ return options.timeoutMs(t);
+ }
+
private void printMembers(StreamsGroupDescription description, boolean
verbose) {
final int groupLen = Math.max(15, description.groupId().length());
int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
@@ -230,6 +559,14 @@ public class StreamsGroupCommand {
}
}
+ GroupState collectGroupState(String groupId) throws Exception {
+ return getDescribeGroup(groupId).groupState();
+ }
+
+ Collection<StreamsGroupMemberDescription> collectGroupMembers(String
groupId) throws Exception {
+ return getDescribeGroup(groupId).members();
+ }
+
private String
prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String
taskType) {
if (tasks.isEmpty()) {
return "";
@@ -335,10 +672,40 @@ public class StreamsGroupCommand {
return output;
}
+ private Stream<TopicPartition> parseTopicsWithPartitions(String
topicArg) {
+ ToIntFunction<String> partitionNum = partition -> {
+ try {
+ return Integer.parseInt(partition);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid partition '" +
partition + "' specified in topic arg '" + topicArg + "'");
+ }
+ };
+
+ String[] arr = topicArg.split(":");
+
+ if (arr.length != 2)
+ throw new IllegalArgumentException("Invalid topic arg '" +
topicArg + "', expected topic name and partitions");
+
+ String topic = arr[0];
+ String partitions = arr[1];
+
+ return Arrays.stream(partitions.split(",")).
+ map(partition -> new TopicPartition(topic,
partitionNum.applyAsInt(partition)));
+ }
+
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String
groupId) {
try {
- return adminClient.listConsumerGroupOffsets(
- Map.of(groupId, new
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+ var sourceTopics =
adminClient.describeStreamsGroups(List.of(groupId))
+ .all().get().get(groupId)
+ .subtopologies().stream()
+ .flatMap(subtopology ->
subtopology.sourceTopics().stream())
+ .collect(Collectors.toSet());
+
+ var allTopicPartitions =
adminClient.listStreamsGroupOffsets(Map.of(groupId, new
ListStreamsGroupOffsetsSpec()))
+ .partitionsToOffsetAndMetadata(groupId).get();
+
+ allTopicPartitions.keySet().removeIf(tp ->
!sourceTopics.contains(tp.topic()));
+ return allTopicPartitions;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -405,203 +772,6 @@ public class StreamsGroupCommand {
props.putAll(configOverrides);
return Admin.create(props);
}
-
- Map<String, Throwable> deleteGroups() {
- List<String> groupIds = opts.options.has(opts.allGroupsOpt)
- ? new ArrayList<>(listStreamsGroups())
- : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
-
- // pre admin call checks
- Map<String, Throwable> failed = preAdminCallChecks(groupIds);
-
- groupIds.removeAll(failed.keySet());
- Map<String, Throwable> success = new HashMap<>();
- Map<String, List<String>> internalTopics = new HashMap<>();
- Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
- if (!groupIds.isEmpty()) {
- // retrieve internal topics before deleting groups
- internalTopics = retrieveInternalTopics(groupIds);
-
- // delete streams groups
- Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
- groupIds,
- withTimeoutMs(new DeleteStreamsGroupsOptions())
- ).deletedGroups();
-
- groupsToDelete.forEach((g, f) -> {
- try {
- f.get();
- success.put(g, null);
- } catch (InterruptedException ie) {
- failed.put(g, ie);
- } catch (ExecutionException e) {
- failed.put(g, e.getCause());
- }
- });
-
- // delete internal topics
- if (!success.isEmpty()) {
- for (String groupId : success.keySet()) {
- List<String> internalTopicsToDelete =
internalTopics.get(groupId);
- if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
- DeleteTopicsResult deleteTopicsResult = null;
- try {
- deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
- deleteTopicsResult.all().get();
- } catch (InterruptedException | ExecutionException
e) {
- if (deleteTopicsResult != null) {
-
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
- try {
- future.get();
- } catch (Exception topicException) {
- System.out.println("Failed to
delete internal topic: " + topic);
- }
- });
- }
- internalTopicsDeletionFailures.put(groupId,
e.getCause());
- }
- }
- }
- }
- }
-
- // display outcome messages based on the results
- if (failed.isEmpty()) {
- System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
- } else {
- printError("Deletion of some streams groups failed:",
Optional.empty());
- failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
-
- if (!success.isEmpty()) {
- System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
- }
- }
- if (!internalTopics.keySet().isEmpty()) {
- printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
- }
- // for testing purpose: return all failures, including internal
topics deletion failures
- failed.putAll(success);
- failed.putAll(internalTopicsDeletionFailures);
- return failed;
- }
-
- private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
- List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
- LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
-
- Map<String, Throwable> failed = new HashMap<>();
-
- for (String groupId : groupIdSet) {
- Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
- if (listing.isEmpty()) {
- failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
- } else {
- Optional<GroupState> groupState =
listing.get().groupState();
- groupState.ifPresent(state -> {
- if (state == GroupState.DEAD) {
- failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
- } else if (state != GroupState.EMPTY) {
- failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
- }
- });
- }
- }
- return failed;
- }
-
- List<GroupListing> listDetailedStreamsGroups() {
- try {
- ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
-
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
- .withTypes(Set.of(GroupType.STREAMS)));
- Collection<GroupListing> listings = result.all().get();
- return listings.stream().toList();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
- Set<String> deletedGroupIds,
- Set<String>
groupIdsWithInternalTopics) {
- if (!deletedGroupIds.isEmpty()) {
- if (internalTopicsDeletionFailures.isEmpty()) {
- List<String> successfulGroups = deletedGroupIds.stream()
- .filter(groupIdsWithInternalTopics::contains)
- .collect(Collectors.toList());
- System.out.println("Deletion of associated internal topics
of the streams groups ('" +
- String.join("', '", successfulGroups) + "') was
successful.");
- } else {
- System.out.println("Deletion of some associated internal
topics failed:");
- internalTopicsDeletionFailures.forEach((group, error) ->
- System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
- }
- }
- }
-
- // Visibility for testing
- Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
- Map<String, List<String>> groupToInternalTopics = new HashMap<>();
- try {
- Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
- for (StreamsGroupDescription description :
descriptionMap.values()) {
-
- List<String> sourceTopics =
description.subtopologies().stream()
- .flatMap(subtopology ->
subtopology.sourceTopics().stream()).toList();
-
- List<String> internalTopics =
description.subtopologies().stream()
- .flatMap(subtopology -> Stream.concat(
-
subtopology.repartitionSourceTopics().keySet().stream(),
-
subtopology.stateChangelogTopics().keySet().stream()))
- .filter(topic -> !sourceTopics.contains(topic))
- .collect(Collectors.toList());
- internalTopics.removeIf(topic -> {
- if (!isInferredInternalTopic(topic,
description.groupId())) {
- printError("The internal topic '" + topic + "' is
not inferred as internal " +
- "and thus will not be deleted with the group
'" + description.groupId() + "'.", Optional.empty());
- return true;
- }
- return false;
- });
- if (!internalTopics.isEmpty()) {
- groupToInternalTopics.put(description.groupId(),
internalTopics);
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- if (e.getCause() instanceof UnsupportedVersionException) {
- printError("Retrieving internal topics is not supported by
the broker version. " +
- "Use 'kafka-topics.sh' to list and delete the group's
internal topics.", Optional.of(e.getCause()));
- } else {
- printError("Retrieving internal topics failed due to " +
e.getMessage(), Optional.of(e));
- }
- }
- return groupToInternalTopics;
- }
-
- private boolean isInferredInternalTopic(final String topicName, final
String applicationId) {
- return topicName.startsWith(applicationId + "-") &&
matchesInternalTopicFormat(topicName);
- }
-
- public static boolean matchesInternalTopicFormat(final String
topicName) {
- return topicName.endsWith("-changelog") ||
topicName.endsWith("-repartition")
- || topicName.endsWith("-subscription-registration-topic")
- || topicName.endsWith("-subscription-response-topic")
- ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
- ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
- }
-
- Collection<StreamsGroupMemberDescription> collectGroupMembers(String
groupId) throws Exception {
- return getDescribeGroup(groupId).members();
- }
-
- GroupState collectGroupState(String groupId) throws Exception {
- return getDescribeGroup(groupId).groupState();
- }
-
- private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
- int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
- return options.timeoutMs(t);
- }
}
public record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer>
leaderEpoch, Long logEndOffset, Long lag) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index fab41806b15..d211ae32110 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -37,6 +37,11 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s)
to connect to.";
public static final String GROUP_DOC = "The streams group we wish to act
on.";
+ private static final String INPUT_TOPIC_DOC = "The topic whose streams
group information should be deleted or topic whose should be included in the
reset offset process. " +
+ "In `reset-offsets` case, partitions can be specified using this
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the
process. " +
+ "Reset-offsets also supports multiple topic inputs. All types of
topics are supported.";
+ private static final String ALL_INPUT_TOPICS_DOC = "Consider all input
topics assigned to a group in the `reset-offsets` and `delete-offsets`
process." +
+ " Only input topics are supported.";
public static final String LIST_DOC = "List all streams groups.";
public static final String DESCRIBE_DOC = "Describe streams group and list
offset lag related to given group.";
private static final String ALL_GROUPS_DOC = "Apply to all streams
groups.";
@@ -54,13 +59,17 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
Use with --describe --state to show group epoch and target assignment
epoch.
Use with --describe --members to show for each member the member
epoch, target assignment epoch, current assignment, target assignment, and
whether member is still using the classic rebalance protocol.
Use with --describe --offsets and --describe to show leader epochs
for each partition.""";
+ private static final String DELETE_OFFSETS_DOC = "Delete offsets of
streams group. Supports one streams group at the time, and multiple topics.";
public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> groupOpt;
+ public final OptionSpec<String> inputTopicOpt;
+ public final OptionSpec<Void> allInputTopicsOpt;
public final OptionSpec<Void> listOpt;
public final OptionSpec<Void> describeOpt;
- final OptionSpec<Void> allGroupsOpt;
- final OptionSpec<Void> deleteOpt;
+ public final OptionSpec<Void> allGroupsOpt;
+ public final OptionSpec<Void> deleteOpt;
+ public final OptionSpec<Void> deleteOffsetsOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<String> stateOpt;
@@ -68,6 +77,8 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
public final OptionSpec<Void> offsetsOpt;
public final OptionSpec<Void> verboseOpt;
+
+ final Set<OptionSpec<?>> allDeleteOffsetsOpts;
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
final Set<OptionSpec<?>> allStreamsGroupLevelOpts;
@@ -88,10 +99,16 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
.withRequiredArg()
.describedAs("streams group")
.ofType(String.class);
+ inputTopicOpt = parser.accepts("input-topic", INPUT_TOPIC_DOC)
+ .withRequiredArg()
+ .describedAs("topic")
+ .ofType(String.class);
+ allInputTopicsOpt = parser.accepts("all-input-topics",
ALL_INPUT_TOPICS_DOC);
listOpt = parser.accepts("list", LIST_DOC);
describeOpt = parser.accepts("describe", DESCRIBE_DOC);
allGroupsOpt = parser.accepts("all-groups", ALL_GROUPS_DOC);
deleteOpt = parser.accepts("delete", DELETE_DOC);
+ deleteOffsetsOpt = parser.accepts("delete-offsets",
DELETE_OFFSETS_DOC);
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
.availableIf(describeOpt)
.withRequiredArg()
@@ -113,9 +130,10 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
.availableIf(describeOpt);
+ options = parser.parse(args);
+ allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(inputTopicOpt,
allInputTopicsOpt));
allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt,
describeOpt, deleteOpt));
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt,
allGroupsOpt));
- options = parser.parse(args);
}
public void checkArgs() {
@@ -124,19 +142,16 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt);
if (options.has(describeOpt)) {
- List<OptionSpec<?>> mutuallyExclusiveOpts =
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
- if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ?
1 : 0).sum() > 1) {
- CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
- }
- if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
- CommandLineUtils.printUsageAndExit(parser,
- "Option " + describeOpt + " does not take a value for " +
stateOpt);
+ checkDescribeArgs();
} else {
if (options.has(timeoutMsOpt))
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only
when " + describeOpt + " is used.");
}
+ if (options.has(deleteOffsetsOpt)) {
+ checkDeleteOffsetsArgs();
+ }
+
if (options.has(deleteOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
@@ -146,4 +161,24 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
CommandLineUtils.checkInvalidArgs(parser, options, listOpt,
membersOpt, offsetsOpt);
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
minus(allStreamsGroupLevelOpts, describeOpt, deleteOpt));
}
+
+ private void checkDescribeArgs() {
+ List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt,
offsetsOpt, stateOpt);
+ if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 :
0).sum() > 1) {
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ }
+ if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + describeOpt + " does not take a value for " +
stateOpt);
+ }
+
+ private void checkDeleteOffsetsArgs() {
+ if ((!options.has(inputTopicOpt) && !options.has(allInputTopicsOpt))
|| !options.has(groupOpt))
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + deleteOffsetsOpt + " takes the " + groupOpt + "
and one of these options: " +
allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ if (options.valuesOf(groupOpt).size() > 1)
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + deleteOffsetsOpt + " supports only one " +
groupOpt + " at a time, but found: " + options.valuesOf(groupOpt));
+ }
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
new file mode 100644
index 00000000000..74079548675
--- /dev/null
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.GroupProtocol;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.GroupState.EMPTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(600)
+@Tag("integration")
+public class DeleteStreamsGroupOffsetTest {
+ private static final String TOPIC_PREFIX = "foo-";
+ private static final String APP_ID_PREFIX = "streams-group-command-test";
+
+ private static final int RECORD_TOTAL = 5;
+ public static EmbeddedKafkaCluster cluster;
+ private static String bootstrapServers;
+ private static final String OUTPUT_TOPIC_PREFIX = "output-topic-";
+
+ @BeforeAll
+ public static void startCluster() {
+ final Properties props = new Properties();
+ cluster = new EmbeddedKafkaCluster(2, props);
+ cluster.start();
+
+ bootstrapServers = cluster.bootstrapServers();
+ }
+
+ @AfterEach
+ public void deleteTopics() {
+ try (final Admin adminClient = cluster.createAdminClient()) {
+ // delete all topics
+ final Set<String> topics = adminClient.listTopics().names().get();
+ adminClient.deleteTopics(topics).all().get();
+ // delete all groups
+ List<String> groupIds =
+
adminClient.listGroups(ListGroupsOptions.forStreamsGroups().timeoutMs(1000)).all().get()
+ .stream().map(GroupListing::groupId).toList();
+ adminClient.deleteStreamsGroups(groupIds).all().get();
+ } catch (final UnknownTopicOrPartitionException ignored) {
+ } catch (final ExecutionException | InterruptedException e) {
+ if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Properties createStreamsConfig(String bootstrapServers, String
appId) {
+ final Properties configs = new Properties();
+ configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ configs.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ configs.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+ return configs;
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ cluster.stop();
+ }
+
+ @Test
+ public void testDeleteOffsetsNonExistingGroup() {
+ String group = "not-existing";
+ String topic = "foo:1";
+ String[] args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", topic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args)) {
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
+ }
+ }
+
+ @Test
+ public void testDeleteStreamsGroupOffsetsMultipleGroups() {
+ final String group1 = generateRandomAppId();
+ final String group2 = generateRandomAppId();
+ final String topic1 = generateRandomTopic();
+ final String topic2 = generateRandomTopic();
+
+ String[] args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group1, "--group", group2, "--input-topic",
topic1, "--input-topic", topic2};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [delete-offsets] supports only
one [group] at a time, but found:") &&
+ message.contains(group1) && message.contains(group2));
+ exited.set(true);
+ }));
+ try {
+ getStreamsGroupService(args);
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfStableStreamsGroupWithTopicPartition() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ String topicPartition = topic + ":0";
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", topicPartition};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, topic, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfStableStreamsGroupWithTopicOnly() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", topic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, topic, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void
testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicPartition() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ final String unknownTopic = "unknown-topic";
+ final String unknownTopicPartition = unknownTopic + ":0";
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, unknownTopic, 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicOnly() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ final String unknownTopic = "unknown-topic";
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", unknownTopic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, unknownTopic, -1, -1,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicPartition() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ final String topicPartition = topic + ":0";
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", topicPartition};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ stopKSApp(group, topic, streams, service);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, topic, 0, 0, Errors.NONE);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicOnly() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", topic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ stopKSApp(group, topic, streams, service);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, topic, -1, 0, Errors.NONE);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfEmptyStreamsGroupWithMultipleTopics() {
+ final String group = generateRandomAppId();
+ final String topic1 = generateRandomTopic();
+ final String unknownTopic = "unknown-topic";
+
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", topic1, "--input-topic",
unknownTopic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic1,
service)) {
+ stopKSApp(group, topic1, streams, service);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, topic1, -1, 0, Errors.NONE);
+ assertError(res, unknownTopic, -1, -1,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void
testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicPartition() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ final String unknownTopic = "unknown-topic";
+ final String unknownTopicPartition = unknownTopic + ":0";
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ stopKSApp(group, topic, streams, service);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, unknownTopic, 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicOnly() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ final String unknownTopic = "unknown-topic";
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--input-topic", unknownTopic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ stopKSApp(group, topic, streams, service);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, unknownTopic, -1, -1,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testDeleteOffsetsOfEmptyStreamsGroupWithAllTopics() {
+ final String group = generateRandomAppId();
+ final String topic = generateRandomTopic();
+ String[] args;
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--delete-offsets", "--group", group, "--all-input-topics", topic};
+ try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic,
service)) {
+ stopKSApp(group, topic, streams, service);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets();
+ assertError(res, topic, -1, 0, Errors.NONE);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void assertError(Map.Entry<Errors, Map<TopicPartition, Throwable>>
res,
+ String inputTopic,
+ int inputPartition,
+ int expectedPartition,
+ Errors expectedError) {
+ Errors topLevelError = res.getKey();
+ Map<TopicPartition, Throwable> partitions = res.getValue();
+ TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
+ // Partition level error should propagate to top level, unless this is
due to a missed partition attempt.
+ if (inputPartition >= 0) {
+ assertEquals(expectedError, topLevelError);
+ }
+ if (expectedError == Errors.NONE)
+ assertNull(partitions.get(tp));
+ else
+ assertEquals(expectedError.exception(),
partitions.get(tp).getCause());
+ }
+
+ private String generateRandomTopic() {
+ return TOPIC_PREFIX + TestUtils.randomString(10);
+ }
+
+ private String generateRandomAppId() {
+ return APP_ID_PREFIX + TestUtils.randomString(10);
+ }
+
+ private void stopKSApp(String appId, String topic, KafkaStreams streams,
StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
+ if (streams != null) {
+ KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
+ closeOptions.timeout(Duration.ofSeconds(30));
+ closeOptions.leaveGroup(true);
+ streams.close(closeOptions);
+ streams.cleanUp();
+
+ TestUtils.waitForCondition(
+ () -> checkGroupState(service, appId, EMPTY),
+ "The group did not become empty as expected."
+ );
+ TestUtils.waitForCondition(
+ () -> service.collectGroupMembers(appId).isEmpty(),
+ "The group size is not zero as expected."
+ );
+ }
+ }
+
+ private KafkaStreams startKSApp(String appId, String inputTopic,
StreamsGroupCommand.StreamsGroupService service) throws Exception {
+ String outputTopic = generateRandomTopicId(OUTPUT_TOPIC_PREFIX);
+ StreamsBuilder builder = builder(inputTopic, outputTopic);
+ produceMessages(inputTopic);
+
+ final KStream<String, String> inputStream = builder.stream(inputTopic);
+
+ final AtomicInteger recordCount = new AtomicInteger(0);
+ final KTable<String, String> valueCounts = inputStream
+ .groupByKey()
+ .aggregate(
+ () -> "()",
+ (key, value, aggregate) -> aggregate + ",(" + key + ": " +
value + ")",
+ Materialized.as("aggregated_value"));
+
+ valueCounts.toStream().peek((key, value) -> {
+ if (recordCount.incrementAndGet() > RECORD_TOTAL) {
+ throw new IllegalStateException("Crash on the " + RECORD_TOTAL
+ " record");
+ }
+ });
+
+ KafkaStreams streams =
IntegrationTestUtils.getStartedStreams(createStreamsConfig(bootstrapServers,
appId), builder, true);
+
+ TestUtils.waitForCondition(
+ () -> !service.collectGroupMembers(appId).isEmpty(),
+ "The group did not initialize as expected."
+ );
+ TestUtils.waitForCondition(
+ () -> checkGroupState(service, appId, GroupState.STABLE),
+ "The group did not become stable as expected."
+ );
+
+ return streams;
+ }
+
+ private String generateRandomTopicId(String prefix) {
+ return prefix + TestUtils.randomString(10);
+ }
+
+ private String generateGroupAppId() {
+ return APP_ID_PREFIX + TestUtils.randomString(10);
+ }
+
+ private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService
service, String groupId, GroupState state) throws Exception {
+ return Objects.equals(service.collectGroupState(groupId), state);
+ }
+
+ private static StreamsBuilder builder(String inputTopic, String
outputTopic) {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(inputTopic, Consumed.with(Serdes.String(),
Serdes.String()))
+ .flatMapValues(value ->
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+ .groupBy((key, value) -> value)
+ .count()
+ .toStream().to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
+ return builder;
+ }
+
+ private static void produceMessages(final String topic) {
+ List<KeyValueTimestamp<String, String>> data = new
ArrayList<>(RECORD_TOTAL);
+ for (long v = 0; v < RECORD_TOTAL; ++v) {
+ data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0",
cluster.time.milliseconds()));
+ }
+
+ IntegrationTestUtils.produceSynchronously(
+ TestUtils.producerConfig(bootstrapServers, StringSerializer.class,
StringSerializer.class),
+ false,
+ topic,
+ Optional.empty(),
+ data
+ );
+ }
+
+ private StreamsGroupCommand.StreamsGroupService
getStreamsGroupService(String[] args) {
+ StreamsGroupCommandOptions opts =
StreamsGroupCommandOptions.fromArgs(args);
+ return new StreamsGroupCommand.StreamsGroupService(
+ opts, cluster.createAdminClient());
+
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
index bb7fbe58677..394e4cf63d0 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@@ -89,7 +88,6 @@ public class DeleteStreamsGroupTest {
@BeforeAll
public static void startCluster() {
final Properties props = new Properties();
-
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,streams");
cluster = new EmbeddedKafkaCluster(2, props);
cluster.start();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 7a2b28d8e15..14c9bedfb88 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -23,10 +23,10 @@ import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -178,6 +178,7 @@ public class StreamsGroupCommandTest {
@Test
public void testDescribeStreamsGroupsGetOffsets() throws Exception {
+ String groupId = "group1";
Admin adminClient = mock(KafkaAdminClient.class);
ListOffsetsResult startOffset = mock(ListOffsetsResult.class);
@@ -193,11 +194,11 @@ public class StreamsGroupCommandTest {
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
endOffset);
- ListConsumerGroupOffsetsResult result =
mock(ListConsumerGroupOffsetsResult.class);
+ ListStreamsGroupOffsetsResult result =
mock(ListStreamsGroupOffsetsResult.class);
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new
HashMap<>();
committedOffsetsMap.put(new TopicPartition("topic1", 0), new
OffsetAndMetadata(12, Optional.of(0), ""));
-
when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
+
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
StreamsGroupMemberDescription description = new
StreamsGroupMemberDescription("foo", 0, Optional.empty(),
@@ -215,8 +216,13 @@ public class StreamsGroupCommandTest {
GroupState.STABLE,
new Node(0, "host", 0),
null);
+ DescribeStreamsGroupsResult describeStreamsGroupsResult =
mock(DescribeStreamsGroupsResult.class);
+
when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
x)));
+
when(adminClient.describeStreamsGroups(List.of(groupId))).thenReturn(describeStreamsGroupsResult);
+
StreamsGroupCommand.StreamsGroupService service = new
StreamsGroupCommand.StreamsGroupService(null, adminClient);
Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags =
service.getOffsets(x);
+
assertEquals(1, lags.size());
assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L),
Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));