This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dd784e7d7a1 KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets 
(#19820)
dd784e7d7a1 is described below

commit dd784e7d7a1d5e915c43344d4659da06b02a0c98
Author: jimmy <[email protected]>
AuthorDate: Tue Jul 29 18:47:24 2025 +0800

    KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets (#19820)
    
    [KAFKA-16717](https://issues.apache.org/jira/browse/KAFKA-16717) aims to
    finish the AlterShareGroupOffsets for ShareGroupCommand part.
    
    Reviewers: Lan Ding <[email protected]>, Chia-Ping Tsai
     <[email protected]>, TaiJuWu <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../requests/AlterShareGroupOffsetsResponse.java   |   4 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |  17 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../coordinator/group/GroupCoordinatorService.java |  44 ++-
 .../persister/InitializeShareGroupStateResult.java |  13 +
 .../apache/kafka/server/util/CommandLineUtils.java |   5 +
 .../java/org/apache/kafka/tools/OffsetsUtils.java  |  84 +++++
 .../tools/consumer/group/ConsumerGroupCommand.java |  71 +---
 .../tools/consumer/group/ShareGroupCommand.java    | 135 ++++++--
 .../consumer/group/ShareGroupCommandOptions.java   |  12 +
 .../consumer/group/ShareGroupCommandTest.java      | 374 +++++++++++++++++++++
 11 files changed, 660 insertions(+), 101 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
index fd6feaded32..5da47d0d326 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
@@ -83,11 +83,11 @@ public class AlterShareGroupOffsetsResponse extends 
AbstractResponse {
             return topicData;
         }
 
-        public Builder addPartition(String topic, int partition, Map<String, 
Uuid> topicIdsToNames,  Errors error) {
+        public Builder addPartition(String topic, int partition, Map<String, 
Uuid> topicIdsToNames, ApiError error) {
             AlterShareGroupOffsetsResponseTopic topicData = 
getOrCreateTopic(topic, topicIdsToNames.get(topic));
             topicData.partitions().add(new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
                 .setPartitionIndex(partition)
-                .setErrorCode(error.code())
+                .setErrorCode(error.error().code())
                 .setErrorMessage(error.message()));
             return this;
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index dc43a9f2bd4..02c09443370 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -194,6 +195,22 @@ public class AdminClientTestUtils {
         return new ListShareGroupOffsetsResult(coordinatorFutures);
     }
 
+    public static ListOffsetsResult 
createListOffsetsResult(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsets) {
+        Map<TopicPartition, 
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures =
+            partitionOffsets.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    entry -> KafkaFuture.completedFuture(
+                        new ListOffsetsResult.ListOffsetsResultInfo(
+                            entry.getValue().offset(),
+                            System.currentTimeMillis(),
+                            Optional.of(1)
+                        )
+                    )
+                ));
+        return new ListOffsetsResult(futures);
+    }
+
     /**
      * Helper to create a KafkaAdminClient with a custom HostResolver 
accessible to tests outside this package.
      */
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c6aca487404..45bd8b7e7fb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3852,7 +3852,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
         topicError match {
           case Some(error) =>
-            topic.partitions.forEach(partition => 
responseBuilder.addPartition(topic.topicName, partition.partitionIndex, 
metadataCache.topicNamesToIds, error.error))
+            topic.partitions.forEach(partition => 
responseBuilder.addPartition(topic.topicName, partition.partitionIndex, 
metadataCache.topicNamesToIds, error))
           case None =>
             authorizedTopicPartitions.add(topic.duplicate)
         }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index be2636b2bad..ee16f1d3767 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -709,8 +709,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                         
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), 
result, new ShareGroupHeartbeatResponseData());
                         return response;
                     } else {
-                        //TODO build new AlterShareGroupOffsetsResponseData 
for error response
-                        return response;
+                        return buildErrorResponse(response, result);
                     }
                 } else {
                     return buildErrorResponse(request, response, exp);
@@ -718,6 +717,42 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
 
             });
     }
+    
+    private AlterShareGroupOffsetsResponseData 
buildErrorResponse(AlterShareGroupOffsetsResponseData response, 
InitializeShareGroupStateResult result) {
+        AlterShareGroupOffsetsResponseData data = new 
AlterShareGroupOffsetsResponseData();
+        Map<Uuid, Map<Integer, PartitionErrorData>> topicPartitionErrorsMap = 
result.getErrors();
+        data.setResponses(
+            new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
+                .map(topic -> {
+                    
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic 
topicData = new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
+                        .setTopicName(topic.topicName())
+                        .setTopicId(topic.topicId());
+                    topic.partitions().forEach(partition -> {
+                        if (partition.errorCode() != Errors.NONE.code()) {
+                            topicData.partitions().add(partition);
+                            return;
+                        }
+                        
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition 
partitionData;
+                        Map<Integer, PartitionErrorData> partitionErrors =
+                            Optional.ofNullable(topicPartitionErrorsMap)
+                                .map(map -> map.get(topic.topicId()))
+                                .orElse(Collections.emptyMap());
+                        PartitionErrorData error = 
partitionErrors.get(partition.partitionIndex());
+                        if (error == null) {
+                            partitionData = partition.duplicate();
+                        } else {
+                            partitionData = new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(partition.partitionIndex())
+                                .setErrorCode(error.errorCode())
+                                .setErrorMessage(error.errorMessage());
+                        }
+                        topicData.partitions().add(partitionData);
+                    });
+                    return topicData;
+                })
+                .iterator()));
+        return data;
+    }
 
     private AlterShareGroupOffsetsResponseData 
buildErrorResponse(InitializeShareGroupStateParameters request, 
AlterShareGroupOffsetsResponseData response, Throwable exp) {
         // build new AlterShareGroupOffsetsResponseData for error response
@@ -726,13 +761,14 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         log.error("Unable to initialize share group state for {}, {} while 
altering share group offsets", gtp.groupId(), gtp.topicsData(), exp);
         Errors error = Errors.forException(exp);
         data.setErrorCode(error.code())
-            .setErrorMessage(error.message())
+            .setErrorMessage(exp.getMessage())
             .setResponses(response.responses());
         data.setResponses(
             new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
                 .map(topic -> {
                     
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic 
topicData = new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
-                        .setTopicName(topic.topicName());
+                        .setTopicName(topic.topicName())
+                        .setTopicId(topic.topicId());
                     topic.partitions().forEach(partition -> {
                         
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition 
partitionData = new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
                             .setPartitionIndex(partition.partitionIndex())
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
index 722cf8da1da..becb552de21 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.server.share.persister;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
 import org.apache.kafka.common.protocol.Errors;
 
@@ -59,6 +60,18 @@ public class InitializeShareGroupStateResult implements 
PersisterResult {
             ));
     }
 
+    public Map<Uuid, Map<Integer, PartitionErrorData>> getErrors() {
+        return topicsData.stream()
+            .collect(Collectors.toMap(
+                TopicData::topicId,
+                topicData -> topicData.partitions().stream()
+                    .collect(Collectors.toMap(
+                        PartitionIdData::partition,
+                        partitionErrorData -> partitionErrorData
+                    ))
+            ));
+    }
+
     public static class Builder {
         private List<TopicData<PartitionErrorData>> topicsData;
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
index 82a02482db8..6146daeb45c 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -130,6 +130,11 @@ public class CommandLineUtils {
         }
     }
 
+    public static void printErrorAndExit(String message) {
+        System.err.println(message);
+        Exit.exit(1, message);
+    }
+
     public static void printUsageAndExit(OptionParser parser, String message) {
         System.err.println(message);
         try {
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index c529e4b6820..e37bf2804d7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
@@ -46,6 +48,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.function.ToIntFunction;
@@ -68,6 +71,28 @@ public class OffsetsUtils {
         this.parser = parser;
     }
 
+    public static void printOffsetsToReset(Map<String, Map<TopicPartition, 
OffsetAndMetadata>> groupAssignmentsToReset) {
+        int maxGroupLen = Math.max(15, 
groupAssignmentsToReset.keySet().stream().mapToInt(String::length).max().orElse(0));
+        int maxTopicLen = Math.max(15, 
groupAssignmentsToReset.values().stream()
+            .flatMap(assignments -> assignments.keySet().stream())
+            .mapToInt(tp -> tp.topic().length())
+            .max()
+            .orElse(0));
+
+        String format = "%n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s 
%-10s %s";
+        if (!groupAssignmentsToReset.isEmpty())
+            System.out.printf(format, "GROUP", "TOPIC", "PARTITION", 
"NEW-OFFSET");
+
+        groupAssignmentsToReset.forEach((groupId, assignment) ->
+            assignment.forEach((consumerAssignment, offsetAndMetadata) ->
+                System.out.printf(format,
+                    groupId,
+                    consumerAssignment.topic(),
+                    consumerAssignment.partition(),
+                    offsetAndMetadata.offset())));
+        System.out.println();
+    }
+
     public Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
resetPlanFromFile() {
         if (opts.resetFromFileOpt != null && !opts.resetFromFileOpt.isEmpty()) 
{
             try {
@@ -414,6 +439,55 @@ public class OffsetsUtils {
         return preparedOffsetsForPartitionsWithCommittedOffset;
     }
 
+    public void checkAllTopicPartitionsValid(Collection<TopicPartition> 
partitionsToReset) {
+        // check the partitions exist
+        List<TopicPartition> partitionsNotExistList = 
filterNonExistentPartitions(partitionsToReset);
+        if (!partitionsNotExistList.isEmpty()) {
+            String partitionStr = 
partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+            throw new UnknownTopicOrPartitionException("The partitions \"" + 
partitionStr + "\" do not exist");
+        }
+
+        // check the partitions have leader
+        List<TopicPartition> partitionsWithoutLeader = 
filterNoneLeaderPartitions(partitionsToReset);
+        if (!partitionsWithoutLeader.isEmpty()) {
+            String partitionStr = 
partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+            throw new LeaderNotAvailableException("The partitions \"" + 
partitionStr + "\" have no leader");
+        }
+    }
+
+    public List<TopicPartition> 
filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
+        // collect all topics
+        Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+
+        try {
+            return 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+                .stream()
+                .flatMap(entry -> entry.getValue().partitions().stream()
+                    .filter(partitionInfo -> partitionInfo.leader() == null)
+                    .map(partitionInfo -> new TopicPartition(entry.getKey(), 
partitionInfo.partition())))
+                    .filter(topicPartitions::contains)
+                .toList();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<TopicPartition> 
filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
+        // collect all topics
+        Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+        try {
+            List<TopicPartition> existPartitions = 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+                .stream()
+                .flatMap(entry -> entry.getValue().partitions().stream()
+                    .map(partitionInfo -> new TopicPartition(entry.getKey(), 
partitionInfo.partition())))
+                .toList();
+
+            return topicPartitions.stream().filter(tp -> 
!existPartitions.contains(tp)).toList();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
         int t = (int) opts.timeoutMsOpt;
         return options.timeoutMs(t);
@@ -469,5 +543,15 @@ public class OffsetsUtils {
             this.resetShiftByOpt = resetShiftByOpt;
             this.timeoutMsOpt = timeoutMsOpt;
         }
+
+        public OffsetsUtilsOptions(
+            List<String> groupOpt,
+            List<String> resetToDatetimeOpt,
+            long timeoutMsOpt) {
+
+            this.groupOpt = groupOpt;
+            this.resetToDatetimeOpt = resetToDatetimeOpt;
+            this.timeoutMsOpt = timeoutMsOpt;
+        }
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 2cfffb9fe78..1f29cdd8156 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -41,8 +41,6 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
@@ -133,7 +131,7 @@ public class ConsumerGroupCommand {
                     String exported = 
consumerGroupService.exportOffsetsToCsv(offsetsToReset);
                     System.out.println(exported);
                 } else
-                    printOffsetsToReset(offsetsToReset);
+                    OffsetsUtils.printOffsetsToReset(offsetsToReset);
             } else if (opts.options.has(opts.deleteOffsetsOpt)) {
                 consumerGroupService.deleteOffsets();
             }
@@ -179,21 +177,6 @@ public class ConsumerGroupCommand {
         e.ifPresent(Throwable::printStackTrace);
     }
 
-    static void printOffsetsToReset(Map<String, Map<TopicPartition, 
OffsetAndMetadata>> groupAssignmentsToReset) {
-        String format = "%n%-30s %-30s %-10s %-15s";
-        if (!groupAssignmentsToReset.isEmpty())
-            System.out.printf(format, "GROUP", "TOPIC", "PARTITION", 
"NEW-OFFSET");
-
-        groupAssignmentsToReset.forEach((groupId, assignment) ->
-            assignment.forEach((consumerAssignment, offsetAndMetadata) ->
-                System.out.printf(format,
-                    groupId,
-                    consumerAssignment.topic(),
-                    consumerAssignment.partition(),
-                    offsetAndMetadata.offset())));
-        System.out.println();
-    }
-
     @SuppressWarnings("ClassFanOutComplexity")
     static class ConsumerGroupService implements AutoCloseable {
         final ConsumerGroupCommandOptions opts;
@@ -615,7 +598,7 @@ public class ConsumerGroupCommand {
                     consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt, 
leaderEpoch);
             };
 
-            List<TopicPartition> topicPartitionsWithoutLeader = 
filterNoneLeaderPartitions(topicPartitions);
+            List<TopicPartition> topicPartitionsWithoutLeader = 
offsetsUtils.filterNoneLeaderPartitions(topicPartitions);
             List<TopicPartition> topicPartitionsWithLeader = 
topicPartitions.stream().filter(tp -> 
!topicPartitionsWithoutLeader.contains(tp)).toList();
 
             // prepare data for partitions with leaders
@@ -645,22 +628,6 @@ public class ConsumerGroupCommand {
                     .collect(Collectors.toList());
         }
 
-        private List<TopicPartition> 
filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
-            // collect all topics
-            Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
-
-            try {
-                return 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
-                        .stream()
-                        .flatMap(entry -> 
entry.getValue().partitions().stream()
-                                .filter(partitionInfo -> 
partitionInfo.leader() == null)
-                                .map(partitionInfo -> new 
TopicPartition(entry.getKey(), partitionInfo.partition())))
-                        .toList();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
         Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
             List<String> groupIds = opts.options.has(opts.allGroupsOpt)
                 ? listConsumerGroups()
@@ -1002,7 +969,7 @@ public class ConsumerGroupCommand {
 
         private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId, Collection<TopicPartition> 
partitionsToReset) {
             // ensure all partitions are valid, otherwise throw a runtime 
exception
-            checkAllTopicPartitionsValid(partitionsToReset);
+            offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
 
             if (opts.options.has(opts.resetToOffsetOpt)) {
                 return offsetsUtils.resetToOffset(partitionsToReset);
@@ -1028,38 +995,6 @@ public class ConsumerGroupCommand {
             return null;
         }
 
-        private void checkAllTopicPartitionsValid(Collection<TopicPartition> 
partitionsToReset) {
-            // check the partitions exist
-            List<TopicPartition> partitionsNotExistList = 
filterNonExistentPartitions(partitionsToReset);
-            if (!partitionsNotExistList.isEmpty()) {
-                String partitionStr = 
partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
-                throw new UnknownTopicOrPartitionException("The partitions \"" 
+ partitionStr + "\" do not exist");
-            }
-
-            // check the partitions have leader
-            List<TopicPartition> partitionsWithoutLeader = 
filterNoneLeaderPartitions(partitionsToReset);
-            if (!partitionsWithoutLeader.isEmpty()) {
-                String partitionStr = 
partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
-                throw new LeaderNotAvailableException("The partitions \"" + 
partitionStr + "\" have no leader");
-            }
-        }
-
-        private List<TopicPartition> 
filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
-            // collect all topics
-            Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
-            try {
-                List<TopicPartition> existPartitions = 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
-                        .stream()
-                        .flatMap(entry -> 
entry.getValue().partitions().stream()
-                                .map(partitionInfo -> new 
TopicPartition(entry.getKey(), partitionInfo.partition())))
-                        .toList();
-
-                return topicPartitions.stream().filter(element -> 
!existPartitions.contains(element)).toList();
-            } catch (InterruptedException | ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
         String exportOffsetsToCsv(Map<String, Map<TopicPartition, 
OffsetAndMetadata>> assignments) {
             boolean isSingleGroupQuery = 
opts.options.valuesOf(opts.groupOpt).size() == 1;
             ObjectWriter csvWriter = isSingleGroupQuery
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 8996f92b84e..95723665032 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -33,12 +33,14 @@ import 
org.apache.kafka.clients.admin.ShareMemberDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.tools.OffsetsUtils;
 
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
@@ -92,7 +94,7 @@ public class ShareGroupCommand {
             } else if (opts.options.has(opts.deleteOpt)) {
                 shareGroupService.deleteShareGroups();
             } else if (opts.options.has(opts.resetOffsetsOpt)) {
-                throw new UnsupportedOperationException("--reset-offsets 
option is not yet implemented");
+                shareGroupService.resetOffsets();
             } else if (opts.options.has(opts.deleteOffsetsOpt)) {
                 shareGroupService.deleteOffsets();
             }
@@ -123,6 +125,7 @@ public class ShareGroupCommand {
     static class ShareGroupService implements AutoCloseable {
         final ShareGroupCommandOptions opts;
         private final Admin adminClient;
+        private final OffsetsUtils offsetsUtils;
 
         public ShareGroupService(ShareGroupCommandOptions opts, Map<String, 
String> configOverrides) {
             this.opts = opts;
@@ -131,11 +134,20 @@ public class ShareGroupCommand {
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
+            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
         }
 
         public ShareGroupService(ShareGroupCommandOptions opts, Admin 
adminClient) {
             this.opts = opts;
             this.adminClient = adminClient;
+            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
+        }
+
+        private OffsetsUtils.OffsetsUtilsOptions 
getOffsetsUtilsOptions(ShareGroupCommandOptions opts) {
+            return
+                new 
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+                    opts.options.valuesOf(opts.resetToDatetimeOpt),
+                    opts.options.valueOf(opts.timeoutMsOpt));
         }
 
         public void listGroups() throws ExecutionException, 
InterruptedException {
@@ -366,6 +378,75 @@ public class ShareGroupCommand {
             return new SimpleImmutableEntry<>(topLevelException, 
topicLevelResult);
         }
 
+        void resetOffsets() {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            try {
+                ShareGroupDescription shareGroupDescription = 
describeShareGroups(List.of(groupId)).get(groupId);
+                if 
(!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) || 
GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
+                    CommandLineUtils.printErrorAndExit(String.format("Share 
group '%s' is not empty.", groupId));
+                }
+                Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(groupId);
+                if (offsetsToReset == null) {
+                    return;
+                }
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterShareGroupOffsets(groupId,
+                        offsetsToReset.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                Entry::getKey, entry -> 
entry.getValue().offset()
+                            ))
+                    ).all().get();
+                }
+                OffsetsUtils.printOffsetsToReset(Map.of(groupId, 
offsetsToReset));
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    CommandLineUtils.printErrorAndExit(cause.getMessage());
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
+        protected Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId) throws ExecutionException, 
InterruptedException {
+            Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
+            Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+            Collection<TopicPartition> partitionsToReset;
+
+            if (opts.options.has(opts.topicOpt)) {
+                partitionsToReset = 
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
+                Set<String> subscribedTopics = 
offsetsByTopicPartitions.keySet().stream()
+                    .map(TopicPartition::topic)
+                    .collect(Collectors.toSet());
+                Set<String> resetTopics = partitionsToReset.stream()
+                    .map(TopicPartition::topic)
+                    .collect(Collectors.toSet());
+                if (!subscribedTopics.containsAll(resetTopics)) {
+                    CommandLineUtils
+                        .printErrorAndExit(String.format("Share group '%s' is 
not subscribed to topic '%s'.",
+                            groupId, resetTopics.stream().filter(topic -> 
!subscribedTopics.contains(topic)).collect(Collectors.joining(", "))));
+                    return null;
+                }
+            } else {
+                partitionsToReset = offsetsByTopicPartitions.keySet();
+            }
+
+            offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
+            if (opts.options.has(opts.resetToEarliestOpt)) {
+                return offsetsUtils.resetToEarliest(partitionsToReset);
+            } else if (opts.options.has(opts.resetToLatestOpt)) {
+                return offsetsUtils.resetToLatest(partitionsToReset);
+            } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+                return offsetsUtils.resetToDateTime(partitionsToReset);
+            }
+            CommandLineUtils
+                .printUsageAndExit(opts.parser, String.format("Option '%s' 
requires one of the following scenarios: %s", opts.resetOffsetsOpt, 
opts.allResetOffsetScenarioOpts));
+            return null;
+        }
+
         private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
             int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
             return options.timeoutMs(t);
@@ -396,34 +477,11 @@ public class ShareGroupCommand {
             TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
 
             shareGroups.forEach((groupId, shareGroup) -> {
-                ListShareGroupOffsetsSpec offsetsSpec = new 
ListShareGroupOffsetsSpec();
-                Map<String, ListShareGroupOffsetsSpec> groupSpecs = new 
HashMap<>();
-                groupSpecs.put(groupId, offsetsSpec);
+                Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
 
                 try {
                     Map<TopicPartition, OffsetAndMetadata> startOffsets = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
-
-                    Set<SharePartitionOffsetInformation> partitionOffsets = 
new HashSet<>();
-
-                    startOffsets.forEach((tp, offsetAndMetadata) -> {
-                        if (offsetAndMetadata != null) {
-                            partitionOffsets.add(new 
SharePartitionOffsetInformation(
-                                groupId,
-                                tp.topic(),
-                                tp.partition(),
-                                Optional.of(offsetAndMetadata.offset()),
-                                offsetAndMetadata.leaderEpoch()
-                            ));
-                        } else {
-                            partitionOffsets.add(new 
SharePartitionOffsetInformation(
-                                groupId,
-                                tp.topic(),
-                                tp.partition(),
-                                Optional.empty(),
-                                Optional.empty()
-                            ));
-                        }
-                    });
+                    Set<SharePartitionOffsetInformation> partitionOffsets = 
mapOffsetsToSharePartitionInformation(groupId, startOffsets);
 
                     groupOffsets.put(groupId, new 
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
                 } catch (InterruptedException | ExecutionException e) {
@@ -434,6 +492,31 @@ public class ShareGroupCommand {
             return groupOffsets;
         }
 
+        private static Set<SharePartitionOffsetInformation> 
mapOffsetsToSharePartitionInformation(String groupId, Map<TopicPartition, 
OffsetAndMetadata> startOffsets) {
+            Set<SharePartitionOffsetInformation> partitionOffsets = new 
HashSet<>();
+
+            startOffsets.forEach((tp, offsetAndMetadata) -> {
+                if (offsetAndMetadata != null) {
+                    partitionOffsets.add(new SharePartitionOffsetInformation(
+                        groupId,
+                        tp.topic(),
+                        tp.partition(),
+                        Optional.of(offsetAndMetadata.offset()),
+                        offsetAndMetadata.leaderEpoch()
+                    ));
+                } else {
+                    partitionOffsets.add(new SharePartitionOffsetInformation(
+                        groupId,
+                        tp.topic(),
+                        tp.partition(),
+                        Optional.empty(),
+                        Optional.empty()
+                    ));
+                }
+            });
+            return partitionOffsets;
+        }
+
         private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) {
             offsets.forEach((groupId, tuple) -> {
                 Collection<SharePartitionOffsetInformation> offsetsInfo = 
tuple.getValue().stream()
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index f155a9c4b5e..e85822d4971 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -88,6 +88,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
     final OptionSpec<Void> verboseOpt;
 
     final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
+    final Set<OptionSpec<?>> allTopicSelectionScopeOpts;
     final Set<OptionSpec<?>> allShareGroupLevelOpts;
     final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
     final Set<OptionSpec<?>> allDeleteOffsetsOpts;
@@ -143,6 +144,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
             .availableIf(describeOpt);
 
         allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+        allTopicSelectionScopeOpts = Set.of(topicOpt, allTopicsOpt);
         allShareGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, 
resetOffsetsOpt);
         allResetOffsetScenarioOpts = Set.of(resetToDatetimeOpt, 
resetToEarliestOpt, resetToLatestOpt);
         allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
@@ -200,6 +202,16 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + resetOffsetsOpt + " takes the option: " + 
groupOpt);
 
+            if (!options.has(topicOpt) && !options.has(allTopicsOpt)) {
+                CommandLineUtils.printUsageAndExit(parser,
+                    "Option " + resetOffsetsOpt + " takes one of these 
options: " + 
allTopicSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
 ")));
+            }
+            
+            if (!options.has(resetToEarliestOpt) && 
!options.has(resetToLatestOpt) && !options.has(resetToDatetimeOpt)) {
+                CommandLineUtils.printUsageAndExit(parser,
+                    "Option " + resetOffsetsOpt + " takes one of these 
options: " + 
allResetOffsetScenarioOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
 ")));
+            }
+
             CommandLineUtils.checkInvalidArgs(parser, options, 
resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
             CommandLineUtils.checkInvalidArgs(parser, options, 
resetToEarliestOpt, minus(allResetOffsetScenarioOpts, resetToEarliestOpt));
             CommandLineUtils.checkInvalidArgs(parser, options, 
resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt));
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 19a11ce0f90..e80071907e7 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -19,25 +19,34 @@ package org.apache.kafka.tools.consumer.group;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
 import org.apache.kafka.clients.admin.ShareMemberAssignment;
 import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.protocol.Errors;
@@ -50,6 +59,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
 import org.mockito.ArgumentMatchers;
 
 import java.util.ArrayList;
@@ -67,10 +77,12 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import joptsimple.OptionException;
 
+import static org.apache.kafka.common.KafkaFuture.completedFuture;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -79,7 +91,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
 import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -1052,6 +1066,366 @@ public class ShareGroupCommandTest {
             assertEquals(expectedResults, service.deleteShareGroups());
         }
     }
+    
+    @Test
+    public void testAlterShareGroupMultipleTopicsSuccess() {
+        String group = "share-group";
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic1, "--topic", 
topic2, "--group", group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 
0), new OffsetAndMetadata(10L), new TopicPartition(topic1, 1), new 
OffsetAndMetadata(10L), 
+                    new TopicPartition(topic2, 0), new OffsetAndMetadata(0L)))
+            )
+        );
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        
+        AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
+        TopicPartition tp0 = new TopicPartition(topic1, 0);
+        TopicPartition tp1 = new TopicPartition(topic1, 1);
+        TopicPartition tp2 = new TopicPartition(topic2, 0);
+        Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp0, 
new OffsetAndMetadata(0L), tp1, new OffsetAndMetadata(0L),
+            tp2, new OffsetAndMetadata(0L));
+        ListOffsetsResult listOffsetsResult = 
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+        when(adminClient.listOffsets(any(), 
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+        ShareGroupDescription exp = new ShareGroupDescription(
+            group,
+            List.of(),
+            GroupState.EMPTY,
+            new Node(0, "host1", 9090), 0, 0);
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
+        when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+        Map<String, TopicDescription> d1 = Map.of(
+            topic1, new TopicDescription(topic1, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
+                new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of()))
+        ));
+        Map<String, TopicDescription> d2 = Map.of(
+            topic2, new TopicDescription(topic2, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+        )));
+        DescribeTopicsResult topicsResult1 = mock(DescribeTopicsResult.class);
+        DescribeTopicsResult topicsResult2 = mock(DescribeTopicsResult.class);
+        when(topicsResult1.allTopicNames()).thenReturn(completedFuture(d1));
+        when(topicsResult2.allTopicNames()).thenReturn(completedFuture(d2));
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(topicsResult1, topicsResult2);
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(topicsResult1, 
topicsResult2);
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.resetOffsets();
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+            verify(adminClient).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
+            verify(alterShareGroupOffsetsResult, times(1)).all();
+            
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
+        }
+    }
+
+    @Test
+    public void testAlterShareGroupToLatestSuccess() {
+        String group = "share-group";
+        String topic = "topic";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-latest", "--execute", "--topic", topic, "--group", 
group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        TopicPartition t1 = new TopicPartition(topic, 0);
+        TopicPartition t2 = new TopicPartition(topic, 1);
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(t1, new 
OffsetAndMetadata(10L), t2, new OffsetAndMetadata(10L)))
+            )
+        );
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic, new TopicDescription(topic, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
+                new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of()))
+        ));
+        DescribeTopicsResult describeTopicResult = 
mock(DescribeTopicsResult.class);
+        
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+        AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
+        Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(t1, 
new OffsetAndMetadata(40L), t2, new OffsetAndMetadata(40L));
+        ListOffsetsResult listOffsetsResult = 
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+        when(adminClient.listOffsets(any(), 
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+        ShareGroupDescription exp = new ShareGroupDescription(
+            group,
+            List.of(),
+            GroupState.EMPTY,
+            new Node(0, "host1", 9090), 0, 0);
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
+        when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+        Function<Collection<TopicPartition>, 
ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher = 
expectedPartitions ->
+            topicPartitionOffsets -> topicPartitionOffsets != null && 
topicPartitionOffsets.keySet().equals(expectedPartitions) &&
+                topicPartitionOffsets.values().stream().allMatch(offsetSpec -> 
offsetSpec instanceof OffsetSpec.LatestSpec);
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.resetOffsets();
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+            verify(adminClient, 
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(t1,
 t2))), any());
+            verify(alterShareGroupOffsetsResult, times(1)).all();
+            
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
+        }
+    }
+
+    @Test
+    public void testAlterShareGroupAllTopicsToDatetimeSuccess() {
+        String group = "share-group";
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String topic3 = "topic3";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-datetime", "2025-07-20T01:20:38.198", "--execute", 
"--all-topics", "--group", group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 
0), new OffsetAndMetadata(5L), new TopicPartition(topic1, 1), new 
OffsetAndMetadata(10L),
+                    new TopicPartition(topic2, 0), new OffsetAndMetadata(10L), 
new TopicPartition(topic3, 0), new OffsetAndMetadata(10L)))
+            )
+        );
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
+        Set<String> topics = Set.of(topic1, topic2, topic3);
+        when(listTopicsResult.names()).thenReturn(completedFuture(topics));
+        when(adminClient.listTopics()).thenReturn(listTopicsResult);
+
+        AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
+        TopicPartition tp1 = new TopicPartition(topic1, 0);
+        TopicPartition tp2 = new TopicPartition(topic1, 1);
+        TopicPartition tp3 = new TopicPartition(topic2, 0);
+        TopicPartition tp4 = new TopicPartition(topic3, 0);
+        Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp1, 
new OffsetAndMetadata(10L), tp2, new OffsetAndMetadata(15L),
+            tp3, new OffsetAndMetadata(15L), tp4, new OffsetAndMetadata(15L));
+        ListOffsetsResult listOffsetsResult = 
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+        when(adminClient.listOffsets(any(), 
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic1, new TopicDescription(topic1, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
+                new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of())
+            )),
+            topic2, new TopicDescription(topic2, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+            )),
+            topic3, new TopicDescription(topic3, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()
+            ))
+        ));
+        DescribeTopicsResult describeTopicResult = 
mock(DescribeTopicsResult.class);
+        
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+
+        ShareGroupDescription exp = new ShareGroupDescription(
+            group,
+            List.of(),
+            GroupState.EMPTY,
+            new Node(0, "host1", 9090), 0, 0);
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
+        when(adminClient.describeShareGroups(anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+        Function<Collection<TopicPartition>, 
ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher = 
expectedPartitions ->
+            topicPartitionOffsets -> topicPartitionOffsets != null && 
topicPartitionOffsets.keySet().equals(expectedPartitions) &&
+                topicPartitionOffsets.values().stream().allMatch(offsetSpec -> 
offsetSpec instanceof OffsetSpec.TimestampSpec);
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.resetOffsets();
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+            verify(adminClient, 
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(tp1,
 tp2, tp3, tp4))), any());
+            verify(alterShareGroupOffsetsResult, times(1)).all();
+            
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
+        }
+    }
+
+    @Test
+    public void testResetOffsetsDryRunSuccess() {
+        String group = "share-group";
+        String topic = "topic";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-earliest", "--dry-run", "--topic", topic, "--group", 
group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 
0), new OffsetAndMetadata(10L)))
+            )
+        );
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+        Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(new 
TopicPartition(topic, 0), new OffsetAndMetadata(0L));
+        ListOffsetsResult listOffsetsResult = 
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+        when(adminClient.listOffsets(any(), 
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+        AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mock(AlterShareGroupOffsetsResult.class);
+        
when(alterShareGroupOffsetsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
+        when(adminClient.alterShareGroupOffsets(any(), 
any())).thenReturn(alterShareGroupOffsetsResult);
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic, new TopicDescription(topic, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+        )));
+        DescribeTopicsResult describeTopicResult = 
mock(DescribeTopicsResult.class);
+        
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+
+        ShareGroupDescription exp = new ShareGroupDescription(
+            group,
+            List.of(),
+            GroupState.EMPTY,
+            new Node(0, "host1", 9090), 0, 0);
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
+        when(adminClient.describeShareGroups(anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.resetOffsets();
+            verify(adminClient, times(0)).alterShareGroupOffsets(any(), any());
+        }
+    }
+
+    @Test
+    public void testAlterShareGroupOffsetsFailureWithoutTopic() {
+        String bootstrapServer = "localhost:9092";
+        String group = "share-group";
+        Admin adminClient = mock(KafkaAdminClient.class);
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-earliest", "--execute", "--group", group};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [reset-offsets] takes one of 
these options: [all-topics], [topic]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testAlterShareGroupOffsetsFailureWithNoneEmptyGroup() {
+        String group = "share-group";
+        String topic = "topic";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group", 
group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
+            )
+        );
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
+        Set<String> topics = Set.of("topic");
+        when(listTopicsResult.names()).thenReturn(completedFuture(topics));
+        when(adminClient.listTopics()).thenReturn(listTopicsResult);
+
+        ShareGroupDescription exp = new ShareGroupDescription(
+            group,
+            List.of(new ShareMemberDescription("memid1", "clId1", "host1", new 
ShareMemberAssignment(
+                Set.of(new TopicPartition("topic", 0))
+            ), 0)),
+            GroupState.STABLE,
+            new Node(0, "host1", 9090), 0, 0);
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
+        when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+
+        Exit.setExitProcedure((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Share group 'share-group' is not 
empty."));
+            throw new IllegalArgumentException(message);
+        });
+        assertThrows(IllegalArgumentException.class, () -> 
getShareGroupService(cgcArgs, adminClient).resetOffsets());
+    }
+
+    @Test
+    public void testAlterShareGroupOffsetsArgsFailureWithoutResetOffsetsArgs() 
{
+        String bootstrapServer = "localhost:9092";
+        String group = "share-group";
+        Admin adminClient = mock(KafkaAdminClient.class);
+        // no reset-offsets spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--execute", "--reset-offsets", "--group", group, "--topic", "topic"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [reset-offsets] takes one of 
these options: [to-datetime], [to-earliest], [to-latest]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+    
+    @Test
+    public void testAlterShareGroupFailureFailureWithNonExistentTopic() {
+        String group = "share-group";
+        String topic = "none";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group", 
group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
+            )
+        );
+        ShareGroupDescription exp = new ShareGroupDescription(
+            group,
+            List.of(new ShareMemberDescription("memid1", "clId1", "host1", new 
ShareMemberAssignment(
+                Set.of(new TopicPartition(topic, 0))
+            ), 0)),
+            GroupState.EMPTY,
+            new Node(0, "host1", 9090), 0, 0);
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
+        when(adminClient.describeShareGroups(any(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+        AtomicBoolean exited = new AtomicBoolean(false);
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic, new TopicDescription(topic, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+        )));
+        DescribeTopicsResult describeTopicResult = 
mock(DescribeTopicsResult.class);
+        
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Share group 'share-group' is not 
subscribed to topic 'none'"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient).resetOffsets();
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    private AlterShareGroupOffsetsResult mockAlterShareGroupOffsets(Admin 
client, String groupId) {
+        AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mock(AlterShareGroupOffsetsResult.class);
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        resultFuture.complete(null);
+        when(alterShareGroupOffsetsResult.all()).thenReturn(resultFuture);
+        when(client.alterShareGroupOffsets(eq(groupId), 
any())).thenReturn(alterShareGroupOffsetsResult);
+        return alterShareGroupOffsetsResult;
+    }
 
     private void mockListShareGroups(Admin client, LinkedHashMap<String, 
GroupState> groupIds) {
         ListGroupsResult listResult = mock(ListGroupsResult.class);

Reply via email to