This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b589a451a9 KAFKA-19244: Add support for kafka-streams-groups.sh 
options (delete offsets) [4/N] (#19895)
2b589a451a9 is described below

commit 2b589a451a976cca7204b49e5fd5c9bdcd08b3ea
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Jun 10 14:55:48 2025 +0200

    KAFKA-19244: Add support for kafka-streams-groups.sh options (delete 
offsets) [4/N] (#19895)
    
    This PR implements
    `--delete-offsets --all-input-topics`
    `--delete-offsets --input-topic String: topicName`
    
    Testing: integration test
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../kafka/tools/streams/StreamsGroupCommand.java   | 574 +++++++++++++--------
 .../tools/streams/StreamsGroupCommandOptions.java  |  57 +-
 .../streams/DeleteStreamsGroupOffsetTest.java      | 440 ++++++++++++++++
 .../tools/streams/DeleteStreamsGroupTest.java      |   2 -
 .../tools/streams/StreamsGroupCommandTest.java     |  12 +-
 5 files changed, 867 insertions(+), 218 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0c7b79d5685..f85f68d72c6 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -19,14 +19,18 @@ package org.apache.kafka.tools.streams;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AbstractOptions;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.GroupListing;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -39,15 +43,19 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +63,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -62,15 +71,17 @@ import joptsimple.OptionException;
 
 public class StreamsGroupCommand {
 
+    static final String MISSING_COLUMN_VALUE = "-";
+
     public static void main(String[] args) {
         StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
         try {
             opts.checkArgs();
 
             // should have exactly one action
-            long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt, 
opts.deleteOpt).filter(opts.options::has).count();
+            long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt, 
opts.deleteOpt, opts.deleteOffsetsOpt).filter(opts.options::has).count();
             if (numberOfActions != 1)
-                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list, --describe, or --delete.");
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list, --describe, --delete, or 
--delete-offsets.");
 
             run(opts);
         } catch (OptionException e) {
@@ -86,6 +97,8 @@ public class StreamsGroupCommand {
                 streamsGroupService.describeGroups();
             } else if (opts.options.has(opts.deleteOpt)) {
                 streamsGroupService.deleteGroups();
+            } else if (opts.options.has(opts.deleteOffsetsOpt)) {
+                streamsGroupService.deleteOffsets();
             } else {
                 throw new IllegalArgumentException("Unknown action!");
             }
@@ -192,12 +205,328 @@ public class StreamsGroupCommand {
             }
         }
 
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            DeleteTopicsResult deleteTopicsResult = null;
+                            try {
+                                deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                if (deleteTopicsResult != null) {
+                                    
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
+                                        try {
+                                            future.get();
+                                        } catch (Exception topicException) {
+                                            System.out.println("Failed to 
delete internal topic: " + topic);
+                                        }
+                                    });
+                                }
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+            return failed;
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> sourceTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> 
subtopology.sourceTopics().stream()).toList();
+
+                    List<String> internalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            
subtopology.repartitionSourceTopics().keySet().stream(),
+                            
subtopology.stateChangelogTopics().keySet().stream()))
+                        .filter(topic -> !sourceTopics.contains(topic))
+                        .collect(Collectors.toList());
+                    internalTopics.removeIf(topic -> {
+                        if (!isInferredInternalTopic(topic, 
description.groupId())) {
+                            printError("The internal topic '" + topic + "' is 
not inferred as internal " +
+                                "and thus will not be deleted with the group 
'" + description.groupId() + "'.", Optional.empty());
+                            return true;
+                        }
+                        return false;
+                    });
+                    if (!internalTopics.isEmpty()) {
+                        groupToInternalTopics.put(description.groupId(), 
internalTopics);
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                if (e.getCause() instanceof UnsupportedVersionException) {
+                    printError("Retrieving internal topics is not supported by 
the broker version. " +
+                        "Use 'kafka-topics.sh' to list and delete the group's 
internal topics.", Optional.of(e.getCause()));
+                } else {
+                    printError("Retrieving internal topics failed due to " + 
e.getMessage(), Optional.of(e));
+                }
+            }
+            return groupToInternalTopics;
+        }
+
+        private boolean isInferredInternalTopic(final String topicName, final 
String applicationId) {
+            return topicName.startsWith(applicationId + "-") && 
matchesInternalTopicFormat(topicName);
+        }
+
+        public static boolean matchesInternalTopicFormat(final String 
topicName) {
+            return topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition")
+                || topicName.endsWith("-subscription-registration-topic")
+                || topicName.endsWith("-subscription-response-topic")
+                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
+                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
+        }
+
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join("', '", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
+                }
+            }
+        }
+
+        List<GroupListing> listDetailedStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private Map.Entry<Errors, Map<TopicPartition, Throwable>> 
deleteOffsets(String groupId, List<String> topics) {
+            Map<TopicPartition, Throwable> partitionLevelResult = new 
HashMap<>();
+            Set<String> topicWithPartitions = new HashSet<>();
+            Set<String> topicWithoutPartitions = new HashSet<>();
+
+            for (String topic : topics) {
+                if (topic.contains(":"))
+                    topicWithPartitions.add(topic);
+                else
+                    topicWithoutPartitions.add(topic);
+            }
+
+            List<TopicPartition> specifiedPartitions = 
topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).toList();
+
+            // Get the partitions of topics that the user did not explicitly 
specify the partitions
+            DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(
+                topicWithoutPartitions,
+                withTimeoutMs(new DescribeTopicsOptions()));
+
+            Iterator<TopicPartition> unspecifiedPartitions = 
describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {
+                String topic = e.getKey();
+                try {
+                    return 
e.getValue().get().partitions().stream().map(partition ->
+                        new TopicPartition(topic, partition.partition()));
+                } catch (ExecutionException | InterruptedException err) {
+                    partitionLevelResult.put(new TopicPartition(topic, -1), 
err);
+                    return Stream.empty();
+                }
+            }).iterator();
+
+            Set<TopicPartition> partitions = new 
HashSet<>(specifiedPartitions);
+
+            unspecifiedPartitions.forEachRemaining(partitions::add);
+
+            return deleteOffsets(groupId, partitions, partitionLevelResult);
+        }
+
+        private Map.Entry<Errors, Map<TopicPartition, Throwable>> 
deleteOffsets(String groupId, Set<TopicPartition> partitions, 
Map<TopicPartition, Throwable> partitionLevelResult) {
+
+            DeleteStreamsGroupOffsetsResult deleteResult = 
adminClient.deleteStreamsGroupOffsets(
+                groupId,
+                partitions,
+                withTimeoutMs(new DeleteStreamsGroupOffsetsOptions())
+            );
+
+            Errors topLevelException = Errors.NONE;
+
+            try {
+                deleteResult.all().get();
+            } catch (ExecutionException | InterruptedException e) {
+                topLevelException = Errors.forException(e.getCause());
+            }
+
+            partitions.forEach(partition -> {
+                try {
+                    deleteResult.partitionResult(partition).get();
+                    partitionLevelResult.put(partition, null);
+                } catch (ExecutionException | InterruptedException e) {
+                    partitionLevelResult.put(partition, e);
+                }
+            });
+
+            return new AbstractMap.SimpleImmutableEntry<>(topLevelException, 
partitionLevelResult);
+        }
+
+        Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets() {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
+            if (opts.options.has(opts.allInputTopicsOpt)) {
+                Set<TopicPartition> partitions = 
getCommittedOffsets(groupId).keySet();
+                res = deleteOffsets(groupId, partitions, new HashMap<>());
+            } else if (opts.options.has(opts.inputTopicOpt)) {
+                List<String> topics = 
opts.options.valuesOf(opts.inputTopicOpt);
+                res = deleteOffsets(groupId, topics);
+            } else {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Option " + 
opts.deleteOffsetsOpt +
+                    " requires either" + opts.allInputTopicsOpt + " or " + 
opts.inputTopicOpt + " to be specified.");
+                return null;
+            }
+
+
+            Errors topLevelResult = res.getKey();
+            Map<TopicPartition, Throwable> partitionLevelResult = 
res.getValue();
+
+            switch (topLevelResult) {
+                case NONE:
+                    System.out.println("Request succeeded for deleting offsets 
from group " + groupId + ".");
+                    break;
+                case INVALID_GROUP_ID:
+                case GROUP_ID_NOT_FOUND:
+                case GROUP_AUTHORIZATION_FAILED:
+                case NON_EMPTY_GROUP:
+                    printError(topLevelResult.message(), Optional.empty());
+                    break;
+                case GROUP_SUBSCRIBED_TO_TOPIC:
+                case TOPIC_AUTHORIZATION_FAILED:
+                case UNKNOWN_TOPIC_OR_PARTITION:
+                    printError("Encountered some partition-level error, see 
the follow-up details.", Optional.empty());
+                    break;
+                default:
+                    printError("Encountered some unknown error: " + 
topLevelResult, Optional.empty());
+            }
+
+            int maxTopicLen = 15;
+            for (TopicPartition tp : partitionLevelResult.keySet()) {
+                maxTopicLen = Math.max(maxTopicLen, tp.topic().length());
+            }
+
+            String format = "%n%" + (-maxTopicLen) + "s %-10s %-15s";
+
+            System.out.printf(format, "TOPIC", "PARTITION", "STATUS");
+            partitionLevelResult.entrySet().stream()
+                .sorted(Comparator.comparing(e -> e.getKey().topic() + 
e.getKey().partition()))
+                .forEach(e -> {
+                    TopicPartition tp = e.getKey();
+                    Throwable error = e.getValue();
+                    System.out.printf(format,
+                        tp.topic(),
+                        tp.partition() >= 0 ? tp.partition() : 
MISSING_COLUMN_VALUE,
+                        error != null ? "Error: " + error.getMessage() : 
"Successful"
+                    );
+                });
+            System.out.println();
+            // testing purpose: return the result of the delete operation
+            return res;
+        }
+
         StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
             DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
             Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
             return descriptionMap.get(group);
         }
 
+        private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
+            int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
+            return options.timeoutMs(t);
+        }
+
         private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
             final int groupLen = Math.max(15, description.groupId().length());
             int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
@@ -230,6 +559,14 @@ public class StreamsGroupCommand {
             }
         }
 
+        GroupState collectGroupState(String groupId) throws Exception {
+            return getDescribeGroup(groupId).groupState();
+        }
+
+        Collection<StreamsGroupMemberDescription> collectGroupMembers(String 
groupId) throws Exception {
+            return getDescribeGroup(groupId).members();
+        }
+
         private String 
prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String 
taskType) {
             if (tasks.isEmpty()) {
                 return "";
@@ -335,10 +672,40 @@ public class StreamsGroupCommand {
             return output;
         }
 
+        private Stream<TopicPartition> parseTopicsWithPartitions(String 
topicArg) {
+            ToIntFunction<String> partitionNum = partition -> {
+                try {
+                    return Integer.parseInt(partition);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid partition '" + 
partition + "' specified in topic arg '" + topicArg + "'");
+                }
+            };
+
+            String[] arr = topicArg.split(":");
+
+            if (arr.length != 2)
+                throw new IllegalArgumentException("Invalid topic arg '" + 
topicArg + "', expected topic name and partitions");
+
+            String topic = arr[0];
+            String partitions = arr[1];
+
+            return Arrays.stream(partitions.split(",")).
+                map(partition -> new TopicPartition(topic, 
partitionNum.applyAsInt(partition)));
+        }
+
         Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
             try {
-                return adminClient.listConsumerGroupOffsets(
-                    Map.of(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+                var sourceTopics = 
adminClient.describeStreamsGroups(List.of(groupId))
+                    .all().get().get(groupId)
+                    .subtopologies().stream()
+                    .flatMap(subtopology -> 
subtopology.sourceTopics().stream())
+                    .collect(Collectors.toSet());
+
+                var allTopicPartitions = 
adminClient.listStreamsGroupOffsets(Map.of(groupId, new 
ListStreamsGroupOffsetsSpec()))
+                    .partitionsToOffsetAndMetadata(groupId).get();
+
+                allTopicPartitions.keySet().removeIf(tp -> 
!sourceTopics.contains(tp.topic()));
+                return allTopicPartitions;
             } catch (InterruptedException | ExecutionException e) {
                 throw new RuntimeException(e);
             }
@@ -405,203 +772,6 @@ public class StreamsGroupCommand {
             props.putAll(configOverrides);
             return Admin.create(props);
         }
-
-        Map<String, Throwable> deleteGroups() {
-            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
-                ? new ArrayList<>(listStreamsGroups())
-                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
-
-            // pre admin call checks
-            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
-
-            groupIds.removeAll(failed.keySet());
-            Map<String, Throwable> success = new HashMap<>();
-            Map<String, List<String>> internalTopics = new HashMap<>();
-            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
-            if (!groupIds.isEmpty()) {
-                // retrieve internal topics before deleting groups
-                internalTopics = retrieveInternalTopics(groupIds);
-
-                // delete streams groups
-                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
-                    groupIds,
-                    withTimeoutMs(new DeleteStreamsGroupsOptions())
-                ).deletedGroups();
-
-                groupsToDelete.forEach((g, f) -> {
-                    try {
-                        f.get();
-                        success.put(g, null);
-                    } catch (InterruptedException ie) {
-                        failed.put(g, ie);
-                    } catch (ExecutionException e) {
-                        failed.put(g, e.getCause());
-                    }
-                });
-
-                // delete internal topics
-                if (!success.isEmpty()) {
-                    for (String groupId : success.keySet()) {
-                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
-                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
-                            DeleteTopicsResult deleteTopicsResult = null;
-                            try {
-                                deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
-                                deleteTopicsResult.all().get();
-                            } catch (InterruptedException | ExecutionException 
e) {
-                                if (deleteTopicsResult != null) {
-                                    
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
-                                        try {
-                                            future.get();
-                                        } catch (Exception topicException) {
-                                            System.out.println("Failed to 
delete internal topic: " + topic);
-                                        }
-                                    });
-                                }
-                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
-                            }
-                        }
-                    }
-                }
-            }
-
-            // display outcome messages based on the results
-            if (failed.isEmpty()) {
-                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
-            } else {
-                printError("Deletion of some streams groups failed:", 
Optional.empty());
-                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
-
-                if (!success.isEmpty()) {
-                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
-                }
-            }
-            if (!internalTopics.keySet().isEmpty()) {
-                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
-            }
-            // for testing purpose: return all failures, including internal 
topics deletion failures
-            failed.putAll(success);
-            failed.putAll(internalTopicsDeletionFailures);
-            return failed;
-        }
-
-        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
-            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
-            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
-
-            Map<String, Throwable> failed = new HashMap<>();
-
-            for (String groupId : groupIdSet) {
-                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
-                if (listing.isEmpty()) {
-                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
-                } else {
-                    Optional<GroupState> groupState = 
listing.get().groupState();
-                    groupState.ifPresent(state -> {
-                        if (state == GroupState.DEAD) {
-                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
-                        } else if (state != GroupState.EMPTY) {
-                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
-                        }
-                    });
-                }
-            }
-            return failed;
-        }
-
-        List<GroupListing> listDetailedStreamsGroups() {
-            try {
-                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
-                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
-                    .withTypes(Set.of(GroupType.STREAMS)));
-                Collection<GroupListing> listings = result.all().get();
-                return listings.stream().toList();
-            } catch (InterruptedException | ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
-                                              Set<String> deletedGroupIds,
-                                              Set<String> 
groupIdsWithInternalTopics) {
-            if (!deletedGroupIds.isEmpty()) {
-                if (internalTopicsDeletionFailures.isEmpty()) {
-                    List<String> successfulGroups = deletedGroupIds.stream()
-                        .filter(groupIdsWithInternalTopics::contains)
-                        .collect(Collectors.toList());
-                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
-                        String.join("', '", successfulGroups) + "') was 
successful.");
-                } else {
-                    System.out.println("Deletion of some associated internal 
topics failed:");
-                    internalTopicsDeletionFailures.forEach((group, error) ->
-                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
-                }
-            }
-        }
-
-        // Visibility for testing
-        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
-            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
-            try {
-                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
-                for (StreamsGroupDescription description : 
descriptionMap.values()) {
-
-                    List<String> sourceTopics = 
description.subtopologies().stream()
-                        .flatMap(subtopology -> 
subtopology.sourceTopics().stream()).toList();
-
-                    List<String> internalTopics = 
description.subtopologies().stream()
-                        .flatMap(subtopology -> Stream.concat(
-                            
subtopology.repartitionSourceTopics().keySet().stream(),
-                            
subtopology.stateChangelogTopics().keySet().stream()))
-                        .filter(topic -> !sourceTopics.contains(topic))
-                        .collect(Collectors.toList());
-                    internalTopics.removeIf(topic -> {
-                        if (!isInferredInternalTopic(topic, 
description.groupId())) {
-                            printError("The internal topic '" + topic + "' is 
not inferred as internal " +
-                                "and thus will not be deleted with the group 
'" + description.groupId() + "'.", Optional.empty());
-                            return true;
-                        }
-                        return false;
-                    });
-                    if (!internalTopics.isEmpty()) {
-                        groupToInternalTopics.put(description.groupId(), 
internalTopics);
-                    }
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                if (e.getCause() instanceof UnsupportedVersionException) {
-                    printError("Retrieving internal topics is not supported by 
the broker version. " +
-                        "Use 'kafka-topics.sh' to list and delete the group's 
internal topics.", Optional.of(e.getCause()));
-                } else {
-                    printError("Retrieving internal topics failed due to " + 
e.getMessage(), Optional.of(e));
-                }
-            }
-            return groupToInternalTopics;
-        }
-
-        private boolean isInferredInternalTopic(final String topicName, final 
String applicationId) {
-            return topicName.startsWith(applicationId + "-") && 
matchesInternalTopicFormat(topicName);
-        }
-
-        public static boolean matchesInternalTopicFormat(final String 
topicName) {
-            return topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition")
-                || topicName.endsWith("-subscription-registration-topic")
-                || topicName.endsWith("-subscription-response-topic")
-                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
-                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
-        }
-
-        Collection<StreamsGroupMemberDescription> collectGroupMembers(String 
groupId) throws Exception {
-            return getDescribeGroup(groupId).members();
-        }
-
-        GroupState collectGroupState(String groupId) throws Exception {
-            return getDescribeGroup(groupId).groupState();
-        }
-
-        private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
-            int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
-            return options.timeoutMs(t);
-        }
     }
 
     public record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> 
leaderEpoch, Long logEndOffset, Long lag) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index fab41806b15..d211ae32110 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -37,6 +37,11 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
 
     public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
     public static final String GROUP_DOC = "The streams group we wish to act 
on.";
+    private static final String INPUT_TOPIC_DOC = "The topic whose streams 
group information should be deleted or topic whose should be included in the 
reset offset process. " +
+        "In `reset-offsets` case, partitions can be specified using this 
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the 
process. " +
+        "Reset-offsets also supports multiple topic inputs. All types of 
topics are supported.";
+    private static final String ALL_INPUT_TOPICS_DOC = "Consider all input 
topics assigned to a group in the `reset-offsets` and `delete-offsets` 
process." +
+        " Only input topics are supported.";
     public static final String LIST_DOC = "List all streams groups.";
     public static final String DESCRIBE_DOC = "Describe streams group and list 
offset lag related to given group.";
     private static final String ALL_GROUPS_DOC = "Apply to all streams 
groups.";
@@ -54,13 +59,17 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         Use with --describe --state  to show group epoch and target assignment 
epoch.
         Use with --describe --members to show for each member the member 
epoch, target assignment epoch, current assignment, target assignment, and 
whether member is still using the classic rebalance protocol.
         Use with --describe --offsets  and --describe  to show leader epochs 
for each partition.""";
+    private static final String DELETE_OFFSETS_DOC = "Delete offsets of 
streams group. Supports one streams group at the time, and multiple topics.";
 
     public final OptionSpec<String> bootstrapServerOpt;
     public final OptionSpec<String> groupOpt;
+    public final OptionSpec<String> inputTopicOpt;
+    public final OptionSpec<Void> allInputTopicsOpt;
     public final OptionSpec<Void> listOpt;
     public final OptionSpec<Void> describeOpt;
-    final OptionSpec<Void> allGroupsOpt;
-    final OptionSpec<Void> deleteOpt;
+    public final OptionSpec<Void> allGroupsOpt;
+    public final OptionSpec<Void> deleteOpt;
+    public final OptionSpec<Void> deleteOffsetsOpt;
     public final OptionSpec<Long> timeoutMsOpt;
     public final OptionSpec<String> commandConfigOpt;
     public final OptionSpec<String> stateOpt;
@@ -68,6 +77,8 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
     public final OptionSpec<Void> offsetsOpt;
     public final OptionSpec<Void> verboseOpt;
 
+
+    final Set<OptionSpec<?>> allDeleteOffsetsOpts;
     final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
     final Set<OptionSpec<?>> allStreamsGroupLevelOpts;
 
@@ -88,10 +99,16 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
             .withRequiredArg()
             .describedAs("streams group")
             .ofType(String.class);
+        inputTopicOpt = parser.accepts("input-topic", INPUT_TOPIC_DOC)
+            .withRequiredArg()
+            .describedAs("topic")
+            .ofType(String.class);
+        allInputTopicsOpt = parser.accepts("all-input-topics", 
ALL_INPUT_TOPICS_DOC);
         listOpt = parser.accepts("list", LIST_DOC);
         describeOpt = parser.accepts("describe", DESCRIBE_DOC);
         allGroupsOpt = parser.accepts("all-groups", ALL_GROUPS_DOC);
         deleteOpt = parser.accepts("delete", DELETE_DOC);
+        deleteOffsetsOpt = parser.accepts("delete-offsets", 
DELETE_OFFSETS_DOC);
         timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
             .availableIf(describeOpt)
             .withRequiredArg()
@@ -113,9 +130,10 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
             .availableIf(describeOpt);
 
+        options = parser.parse(args);
+        allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(inputTopicOpt, 
allInputTopicsOpt));
         allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt));
         allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, 
allGroupsOpt));
-        options = parser.parse(args);
     }
 
     public void checkArgs() {
@@ -124,19 +142,16 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
 
         if (options.has(describeOpt)) {
-            List<OptionSpec<?>> mutuallyExclusiveOpts = 
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
-            if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 
1 : 0).sum() > 1) {
-                CommandLineUtils.printUsageAndExit(parser,
-                    "Option " + describeOpt + " takes at most one of these 
options: " + 
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
-            }
-            if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
-                CommandLineUtils.printUsageAndExit(parser,
-                    "Option " + describeOpt + " does not take a value for " + 
stateOpt);
+            checkDescribeArgs();
         } else {
             if (options.has(timeoutMsOpt))
                 LOGGER.debug("Option " + timeoutMsOpt + " is applicable only 
when " + describeOpt + " is used.");
         }
 
+        if (options.has(deleteOffsetsOpt)) {
+            checkDeleteOffsetsArgs();
+        }
+
         if (options.has(deleteOpt)) {
             if (!options.has(groupOpt) && !options.has(allGroupsOpt))
                 CommandLineUtils.printUsageAndExit(parser,
@@ -146,4 +161,24 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         CommandLineUtils.checkInvalidArgs(parser, options, listOpt, 
membersOpt, offsetsOpt);
         CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
minus(allStreamsGroupLevelOpts, describeOpt, deleteOpt));
     }
+
+    private void checkDescribeArgs() {
+        List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, 
offsetsOpt, stateOpt);
+        if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 
0).sum() > 1) {
+            CommandLineUtils.printUsageAndExit(parser,
+                "Option " + describeOpt + " takes at most one of these 
options: " + 
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+        }
+        if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
+            CommandLineUtils.printUsageAndExit(parser,
+                "Option " + describeOpt + " does not take a value for " + 
stateOpt);
+    }
+
+    private void checkDeleteOffsetsArgs() {
+        if ((!options.has(inputTopicOpt) && !options.has(allInputTopicsOpt)) 
|| !options.has(groupOpt))
+            CommandLineUtils.printUsageAndExit(parser,
+                "Option " + deleteOffsetsOpt + " takes the " + groupOpt + " 
and one of these options: " + 
allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+        if (options.valuesOf(groupOpt).size() > 1)
+            CommandLineUtils.printUsageAndExit(parser,
+                "Option " + deleteOffsetsOpt + " supports only one " + 
groupOpt + " at a time, but found: " + options.valuesOf(groupOpt));
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
new file mode 100644
index 00000000000..74079548675
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.GroupProtocol;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.GroupState.EMPTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(600)
+@Tag("integration")
+public class DeleteStreamsGroupOffsetTest {
+    private static final String TOPIC_PREFIX = "foo-";
+    private static final String APP_ID_PREFIX = "streams-group-command-test";
+
+    private static final int RECORD_TOTAL = 5;
+    public static EmbeddedKafkaCluster cluster;
+    private static String bootstrapServers;
+    private static final String OUTPUT_TOPIC_PREFIX = "output-topic-";
+
+    @BeforeAll
+    public static void startCluster() {
+        final Properties props = new Properties();
+        cluster = new EmbeddedKafkaCluster(2, props);
+        cluster.start();
+
+        bootstrapServers = cluster.bootstrapServers();
+    }
+
+    @AfterEach
+    public void deleteTopics() {
+        try (final Admin adminClient = cluster.createAdminClient()) {
+            // delete all topics
+            final Set<String> topics = adminClient.listTopics().names().get();
+            adminClient.deleteTopics(topics).all().get();
+            // delete all groups
+            List<String> groupIds =
+                
adminClient.listGroups(ListGroupsOptions.forStreamsGroups().timeoutMs(1000)).all().get()
+                    .stream().map(GroupListing::groupId).toList();
+            adminClient.deleteStreamsGroups(groupIds).all().get();
+        } catch (final UnknownTopicOrPartitionException ignored) {
+        } catch (final ExecutionException | InterruptedException e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private Properties createStreamsConfig(String bootstrapServers, String 
appId) {
+        final Properties configs = new Properties();
+        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        configs.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        return configs;
+    }
+
+    @AfterAll
+    public static void closeCluster() {
+        cluster.stop();
+    }
+
+    @Test
+    public void testDeleteOffsetsNonExistingGroup() {
+        String group = "not-existing";
+        String topic = "foo:1";
+        String[] args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", topic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args)) {
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
+        }
+    }
+
+    @Test
+    public void testDeleteStreamsGroupOffsetsMultipleGroups() {
+        final String group1 = generateRandomAppId();
+        final String group2 = generateRandomAppId();
+        final String topic1 = generateRandomTopic();
+        final String topic2 = generateRandomTopic();
+
+        String[] args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group1, "--group", group2, "--input-topic", 
topic1, "--input-topic", topic2};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] supports only 
one [group] at a time, but found:") &&
+                message.contains(group1) && message.contains(group2));
+            exited.set(true);
+        }));
+        try {
+            getStreamsGroupService(args);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfStableStreamsGroupWithTopicPartition() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        String topicPartition = topic + ":0";
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", topicPartition};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, topic, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfStableStreamsGroupWithTopicOnly() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", topic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, topic, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void 
testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicPartition() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        final String unknownTopic = "unknown-topic";
+        final String unknownTopicPartition = unknownTopic + ":0";
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, unknownTopic, 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicOnly() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        final String unknownTopic = "unknown-topic";
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", unknownTopic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, unknownTopic, -1, -1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicPartition() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        final String topicPartition = topic + ":0";
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", topicPartition};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            stopKSApp(group, topic, streams, service);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, topic, 0, 0, Errors.NONE);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicOnly() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", topic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            stopKSApp(group, topic, streams, service);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, topic, -1, 0, Errors.NONE);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfEmptyStreamsGroupWithMultipleTopics() {
+        final String group = generateRandomAppId();
+        final String topic1 = generateRandomTopic();
+        final String unknownTopic = "unknown-topic";
+
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", topic1, "--input-topic", 
unknownTopic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic1, 
service)) {
+            stopKSApp(group, topic1, streams, service);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, topic1, -1, 0, Errors.NONE);
+            assertError(res, unknownTopic, -1, -1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void 
testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicPartition() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        final String unknownTopic = "unknown-topic";
+        final String unknownTopicPartition = unknownTopic + ":0";
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            stopKSApp(group, topic, streams, service);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, unknownTopic, 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicOnly() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        final String unknownTopic = "unknown-topic";
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--input-topic", unknownTopic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            stopKSApp(group, topic, streams, service);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, unknownTopic, -1, -1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testDeleteOffsetsOfEmptyStreamsGroupWithAllTopics() {
+        final String group = generateRandomAppId();
+        final String topic = generateRandomTopic();
+        String[] args;
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--delete-offsets", "--group", group, "--all-input-topics", topic};
+        try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, 
service)) {
+            stopKSApp(group, topic, streams, service);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets();
+            assertError(res, topic, -1, 0, Errors.NONE);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void assertError(Map.Entry<Errors, Map<TopicPartition, Throwable>> 
res,
+                          String inputTopic,
+                          int inputPartition,
+                          int expectedPartition,
+                          Errors expectedError) {
+        Errors topLevelError = res.getKey();
+        Map<TopicPartition, Throwable> partitions = res.getValue();
+        TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
+        // Partition level error should propagate to top level, unless this is 
due to a missed partition attempt.
+        if (inputPartition >= 0) {
+            assertEquals(expectedError, topLevelError);
+        }
+        if (expectedError == Errors.NONE)
+            assertNull(partitions.get(tp));
+        else
+            assertEquals(expectedError.exception(), 
partitions.get(tp).getCause());
+    }
+
+    private String generateRandomTopic() {
+        return TOPIC_PREFIX + TestUtils.randomString(10);
+    }
+
+    private String generateRandomAppId() {
+        return APP_ID_PREFIX + TestUtils.randomString(10);
+    }
+
+    private void stopKSApp(String appId, String topic, KafkaStreams streams, 
StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
+        if (streams != null) {
+            KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+            closeOptions.timeout(Duration.ofSeconds(30));
+            closeOptions.leaveGroup(true);
+            streams.close(closeOptions);
+            streams.cleanUp();
+
+            TestUtils.waitForCondition(
+                () -> checkGroupState(service, appId, EMPTY),
+                "The group did not become empty as expected."
+            );
+            TestUtils.waitForCondition(
+                () -> service.collectGroupMembers(appId).isEmpty(),
+                "The group size is not zero as expected."
+            );
+        }
+    }
+
+    private KafkaStreams startKSApp(String appId, String inputTopic, 
StreamsGroupCommand.StreamsGroupService service) throws Exception {
+        String outputTopic = generateRandomTopicId(OUTPUT_TOPIC_PREFIX);
+        StreamsBuilder builder = builder(inputTopic, outputTopic);
+        produceMessages(inputTopic);
+
+        final KStream<String, String> inputStream = builder.stream(inputTopic);
+
+        final AtomicInteger recordCount = new AtomicInteger(0);
+        final KTable<String, String> valueCounts = inputStream
+            .groupByKey()
+            .aggregate(
+                () -> "()",
+                (key, value, aggregate) -> aggregate + ",(" + key + ": " + 
value + ")",
+                Materialized.as("aggregated_value"));
+
+        valueCounts.toStream().peek((key, value) -> {
+            if (recordCount.incrementAndGet() > RECORD_TOTAL) {
+                throw new IllegalStateException("Crash on the " + RECORD_TOTAL 
+ " record");
+            }
+        });
+
+        KafkaStreams streams = 
IntegrationTestUtils.getStartedStreams(createStreamsConfig(bootstrapServers, 
appId), builder, true);
+
+        TestUtils.waitForCondition(
+            () -> !service.collectGroupMembers(appId).isEmpty(),
+            "The group did not initialize as expected."
+        );
+        TestUtils.waitForCondition(
+            () -> checkGroupState(service, appId, GroupState.STABLE),
+            "The group did not become stable as expected."
+        );
+
+        return streams;
+    }
+
+    private String generateRandomTopicId(String prefix) {
+        return prefix + TestUtils.randomString(10);
+    }
+
+    private String generateGroupAppId() {
+        return APP_ID_PREFIX + TestUtils.randomString(10);
+    }
+
+    private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService 
service, String groupId, GroupState state) throws Exception {
+        return Objects.equals(service.collectGroupState(groupId), state);
+    }
+
+    private static StreamsBuilder builder(String inputTopic, String 
outputTopic) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(inputTopic, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+            .groupBy((key, value) -> value)
+            .count()
+            .toStream().to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
+        return builder;
+    }
+
+    private static void produceMessages(final String topic) {
+        List<KeyValueTimestamp<String, String>> data = new 
ArrayList<>(RECORD_TOTAL);
+        for (long v = 0; v < RECORD_TOTAL; ++v) {
+            data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", 
cluster.time.milliseconds()));
+        }
+
+        IntegrationTestUtils.produceSynchronously(
+            TestUtils.producerConfig(bootstrapServers, StringSerializer.class, 
StringSerializer.class),
+            false,
+            topic,
+            Optional.empty(),
+            data
+        );
+    }
+
+    private StreamsGroupCommand.StreamsGroupService 
getStreamsGroupService(String[] args) {
+        StreamsGroupCommandOptions opts = 
StreamsGroupCommandOptions.fromArgs(args);
+        return new StreamsGroupCommand.StreamsGroupService(
+            opts, cluster.createAdminClient());
+
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
index bb7fbe58677..394e4cf63d0 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -89,7 +88,6 @@ public class DeleteStreamsGroupTest {
     @BeforeAll
     public static void startCluster() {
         final Properties props = new Properties();
-        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
         cluster = new EmbeddedKafkaCluster(2, props);
         cluster.start();
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 7a2b28d8e15..14c9bedfb88 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -23,10 +23,10 @@ import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -178,6 +178,7 @@ public class StreamsGroupCommandTest {
 
     @Test
     public void testDescribeStreamsGroupsGetOffsets() throws Exception {
+        String groupId = "group1";
         Admin adminClient = mock(KafkaAdminClient.class);
 
         ListOffsetsResult startOffset = mock(ListOffsetsResult.class);
@@ -193,11 +194,11 @@ public class StreamsGroupCommandTest {
 
         
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
 
-        ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
+        ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
         Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new 
HashMap<>();
         committedOffsetsMap.put(new TopicPartition("topic1", 0), new 
OffsetAndMetadata(12, Optional.of(0), ""));
 
-        
when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
+        
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
         
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
 
         StreamsGroupMemberDescription description = new 
StreamsGroupMemberDescription("foo", 0, Optional.empty(),
@@ -215,8 +216,13 @@ public class StreamsGroupCommandTest {
             GroupState.STABLE,
             new Node(0, "host", 0),
             null);
+        DescribeStreamsGroupsResult describeStreamsGroupsResult = 
mock(DescribeStreamsGroupsResult.class);
+        
when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
 x)));
+        
when(adminClient.describeStreamsGroups(List.of(groupId))).thenReturn(describeStreamsGroupsResult);
+
         StreamsGroupCommand.StreamsGroupService service = new 
StreamsGroupCommand.StreamsGroupService(null, adminClient);
         Map<TopicPartition, StreamsGroupCommand.OffsetsInfo> lags = 
service.getOffsets(x);
+
         assertEquals(1, lags.size());
         assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L), 
Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0)));
 

Reply via email to