chia7712 commented on code in PR #19820:
URL: https://github.com/apache/kafka/pull/19820#discussion_r2221914472


##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java:
##########
@@ -366,6 +378,75 @@ Entry<Throwable, Map<String, Throwable>> 
sendDeleteShareGroupOffsetsRequest(Stri
             return new SimpleImmutableEntry<>(topLevelException, 
topicLevelResult);
         }
 
+        void resetOffsets() {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            try {
+                ShareGroupDescription shareGroupDescription = 
describeShareGroups(List.of(groupId)).get(groupId);
+                if 
(!GroupState.EMPTY.equals(shareGroupDescription.groupState())) {
+                    CommandLineUtils.printErrorAndExit(String.format("Share 
group '%s' is not empty.", groupId));
+                }
+                Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(groupId);
+                if (offsetsToReset == null) {
+                    return;
+                }
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterShareGroupOffsets(groupId,
+                        offsetsToReset.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                Entry::getKey, entry -> 
entry.getValue().offset()
+                            ))
+                    ).all().get();
+                }
+                OffsetsUtils.printOffsetsToReset(Map.of(groupId, 
offsetsToReset));
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    CommandLineUtils.printErrorAndExit(cause.getMessage());
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
+        protected Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId) throws ExecutionException, 
InterruptedException {
+            Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
+            Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+            Set<String> existsTopics = adminClient.listTopics().names().get();
+
+            if (opts.options.has(opts.topicOpt)) {

Review Comment:
   @JimmyWang6 yes, that was a issue in `kafka-consumer-groups.sh` and it was 
already fixed by @xijiu see #20168 #20064
   
   you could refactor the code to ensure the fixes could be applied on 
`ShareGroupCommand` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to