lucasbru commented on code in PR #19646: URL: https://github.com/apache/kafka/pull/19646#discussion_r2157280851
########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -678,29 +692,7 @@ Map<String, Throwable> deleteGroups() { }); // delete internal topics - if (!success.isEmpty()) { - for (String groupId : success.keySet()) { - List<String> internalTopicsToDelete = internalTopics.get(groupId); - if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { - DeleteTopicsResult deleteTopicsResult = null; - try { - deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); - deleteTopicsResult.all().get(); - } catch (InterruptedException | ExecutionException e) { - if (deleteTopicsResult != null) { - deleteTopicsResult.topicNameValues().forEach((topic, future) -> { - try { - future.get(); - } catch (Exception topicException) { - System.out.println("Failed to delete internal topic: " + topic); - } - }); - } - internalTopicsDeletionFailures.put(groupId, e.getCause()); - } - } - } - } + internalTopicsDeletionFailures = maybeDeleteInternalTopics(deleteInternalTopics, success, internalTopics); Review Comment: If you pass "internalTopicsToBeDeleted", you don't need to pass "deleteInternalTopics" ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -483,8 +483,8 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() { result.put(groupId, resetOffsetsForInactiveGroup(groupId, dryRun)); // delete internal topics if (!dryRun) { - List<String> internalTopics = retrieveInternalTopics(List.of(groupId)).get(groupId); - if (internalTopics != null && !internalTopics.isEmpty()) { + List<String> internalTopics = getInternalTopicsForGroup(groupId); Review Comment: `getInternalTopicsToBeDeleted` maybe? ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java: ########## @@ -73,6 +73,10 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative."; + private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified internal topic of the streams group. Supported operations: reset-offsets." + Review Comment: also with --delete ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -723,6 +715,36 @@ Map<String, Throwable> deleteGroups() { return failed; } + private Map<String, Throwable> maybeDeleteInternalTopics(boolean deleteInternalTopics, + Map<String, Throwable> success, Review Comment: formatting is off ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java: ########## @@ -73,6 +73,10 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative."; + private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified internal topic of the streams group. Supported operations: reset-offsets." + + "This option is applicable only when --execute is used."; + private static final String DELETE_ALL_INTERNAL_TOPICS_DOC = "Delete all internal topics linked to the streams group. Supported operations: reset-offsets, delete." + Review Comment: also with --delete ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -659,7 +671,9 @@ Map<String, Throwable> deleteGroups() { Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); if (!groupIds.isEmpty()) { // retrieve internal topics before deleting groups - internalTopics = retrieveInternalTopics(groupIds); + if (deleteInternalTopics) { + internalTopics = retrieveInternalTopics(groupIds); Review Comment: `internalTopicsToBeDeleted`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org