JimmyWang6 commented on code in PR #19820:
URL: https://github.com/apache/kafka/pull/19820#discussion_r2220838720


##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java:
##########
@@ -366,6 +378,75 @@ Entry<Throwable, Map<String, Throwable>> 
sendDeleteShareGroupOffsetsRequest(Stri
             return new SimpleImmutableEntry<>(topLevelException, 
topicLevelResult);
         }
 
+        void resetOffsets() {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            try {
+                ShareGroupDescription shareGroupDescription = 
describeShareGroups(List.of(groupId)).get(groupId);
+                if 
(!GroupState.EMPTY.equals(shareGroupDescription.groupState())) {
+                    CommandLineUtils.printErrorAndExit(String.format("Share 
group '%s' is not empty.", groupId));
+                }
+                Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(groupId);
+                if (offsetsToReset == null) {
+                    return;
+                }
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterShareGroupOffsets(groupId,
+                        offsetsToReset.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                Entry::getKey, entry -> 
entry.getValue().offset()
+                            ))
+                    ).all().get();
+                }
+                OffsetsUtils.printOffsetsToReset(Map.of(groupId, 
offsetsToReset));
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    CommandLineUtils.printErrorAndExit(cause.getMessage());
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
+        protected Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId) throws ExecutionException, 
InterruptedException {
+            Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
+            Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+            Set<String> existsTopics = adminClient.listTopics().names().get();
+
+            if (opts.options.has(opts.topicOpt)) {

Review Comment:
   @chia7712 @AndrewJSchofield 
   I have modified the current implementation by reusing 
offsetsUtils.parseTopicPartitionsToReset(). However, when attempting to pass a 
non-existent topic-partition (e.g., topicA:999), a TimeoutException is thrown 
as shown below:
   `Caused by: java.util.concurrent.ExecutionException: 
   org.apache.kafka.common.errors.TimeoutException: 
   Call(callName=listOffsets(api=METADATA), deadlineMs=1753150292432, 
tries=73308, nextAllowedTryMs=1753150293436) 
   timed out at 1753150292436 after 73308 attempt(s)`
   This behavior is identical when resetting offsets for a non-existent 
topic-partition for a consumer group. I think maybe we could create a new issue 
to address this scenario, as the current implementation does not handle invalid 
topic-partitions gracefully. I wonder what you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to