aliehsaeedii commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2131061495
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -330,13 +405,549 @@ Map<TopicPartition, OffsetsInfo>
getOffsets(StreamsGroupDescription description)
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String
groupId) {
try {
- return adminClient.listConsumerGroupOffsets(
- Map.of(groupId, new
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+ var sourceTopics =
adminClient.describeStreamsGroups(List.of(groupId))
+ .all().get().get(groupId)
+ .subtopologies().stream()
+ .flatMap(subtopology ->
subtopology.sourceTopics().stream())
+ .collect(Collectors.toSet());
+
+ var allTopicPartitions =
adminClient.listStreamsGroupOffsets(Map.of(groupId, new
ListStreamsGroupOffsetsSpec()))
+ .partitionsToOffsetAndMetadata(groupId).get();
+
+ allTopicPartitions.keySet().removeIf(tp ->
!sourceTopics.contains(tp.topic()));
+ return allTopicPartitions;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<TopicPartition> filterExistingGroupTopics(String groupId,
List<TopicPartition> topicPartitions) {
+ try {
+ var allTopicPartitions =
adminClient.listStreamsGroupOffsets(Map.of(groupId, new
ListStreamsGroupOffsetsSpec()))
+ .partitionsToOffsetAndMetadata(groupId).get();
+ boolean allPresent =
topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
+ if (!allPresent) {
+ printError("One or more topics are not part of the group
'" + groupId + "'.", Optional.empty());
+ return Collections.emptyList();
+ }
+ return topicPartitions;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
+ // Dry-run is the default behavior if --execute is not specified
+ boolean dryRun = opts.options.has(opts.dryRunOpt) ||
!opts.options.has(opts.executeOpt);
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new
HashMap<>();
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+ if (!groupIds.isEmpty()) {
+ Map<String, KafkaFuture<StreamsGroupDescription>>
streamsGroups = adminClient.describeStreamsGroups(
+ groupIds
+ ).describedGroups();
+
+ streamsGroups.forEach((groupId, groupDescription) -> {
+ try {
+ String state =
groupDescription.get().groupState().toString();
+ switch (state) {
+ case "Empty":
+ case "Dead":
+ // reset offsets in source topics
+ result.put(groupId,
resetOffsetsForInactiveGroup(groupId, dryRun));
+ // delete internal topics
+ if (!dryRun) {
+ List<String> internalTopics =
retrieveInternalTopics(List.of(groupId)).get(groupId);
+ if (internalTopics != null &&
!internalTopics.isEmpty()) {
+ try {
+
adminClient.deleteTopics(internalTopics).all().get();
Review Comment:
As of https://github.com/apache/kafka/pull/19646#discussion_r2108859067
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]