AndrewJSchofield commented on code in PR #19820:
URL: https://github.com/apache/kafka/pull/19820#discussion_r2200620277


##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java:
##########
@@ -366,6 +378,67 @@ Entry<Throwable, Map<String, Throwable>> 
sendDeleteShareGroupOffsetsRequest(Stri
             return new SimpleImmutableEntry<>(topLevelException, 
topicLevelResult);
         }
 
+        void resetOffsets() throws ExecutionException, InterruptedException {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(groupId);
+            if (offsetsToReset == null) {
+                return;
+            }
+            boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+            if (!dryRun) {
+                adminClient.alterShareGroupOffsets(groupId,
+                    offsetsToReset.entrySet().stream()
+                        .collect(Collectors.toMap(
+                            Entry::getKey,
+                            entry -> entry.getValue().offset()
+                        ))
+                ).all().get();
+            }
+            Set<SharePartitionOffsetInformation> partitionOffsets = 
mapOffsetsToSharePartitionInformation(groupId, offsetsToReset);
+            Map<String, ShareGroupDescription> shareGroups = 
describeShareGroups(Collections.singleton(groupId));
+            TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
+            shareGroups.forEach((id, description) -> groupOffsets.put(id, new 
SimpleImmutableEntry<>(description, partitionOffsets)));
+            printOffsets(groupOffsets, opts.options.has(opts.verboseOpt));
+        }
+
+        protected Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId) throws ExecutionException, 
InterruptedException {
+            Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
+            Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+
+            if (opts.options.has(opts.topicOpt)) {
+                Set<String> topics = 
Set.copyOf(opts.options.valuesOf(opts.topicOpt));
+                Set<String> existsTopics = 
offsetsByTopicPartitions.keySet().stream()
+                    .map(TopicPartition::topic)
+                    .collect(Collectors.toSet());
+                if (!existsTopics.containsAll(topics)) {
+                    CommandLineUtils
+                        .printUsageAndExit(opts.parser, String.format("Share 
group '%s' is not subscribed to topic '%s'",

Review Comment:
   This doesn't distinguish between non-existent topics and topics which are 
just not subscribed. This is probably a debatable point, so I wonder what you 
think.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java:
##########
@@ -366,6 +378,67 @@ Entry<Throwable, Map<String, Throwable>> 
sendDeleteShareGroupOffsetsRequest(Stri
             return new SimpleImmutableEntry<>(topLevelException, 
topicLevelResult);
         }
 
+        void resetOffsets() throws ExecutionException, InterruptedException {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(groupId);
+            if (offsetsToReset == null) {
+                return;
+            }
+            boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+            if (!dryRun) {
+                adminClient.alterShareGroupOffsets(groupId,
+                    offsetsToReset.entrySet().stream()
+                        .collect(Collectors.toMap(
+                            Entry::getKey,
+                            entry -> entry.getValue().offset()
+                        ))
+                ).all().get();
+            }
+            Set<SharePartitionOffsetInformation> partitionOffsets = 
mapOffsetsToSharePartitionInformation(groupId, offsetsToReset);
+            Map<String, ShareGroupDescription> shareGroups = 
describeShareGroups(Collections.singleton(groupId));
+            TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
+            shareGroups.forEach((id, description) -> groupOffsets.put(id, new 
SimpleImmutableEntry<>(description, partitionOffsets)));
+            printOffsets(groupOffsets, opts.options.has(opts.verboseOpt));

Review Comment:
   The `--verbose` option is not valid with `--reset-offsets` so this is a bit 
weird. But actually, I think it would be better if there was something like the 
consumer group tool `printOffsetsToReset`. This is just because I think having 
a column heading `NEW-START-OFFSET` would make it clear that it's the new start 
offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to