lucasbru commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2157280851


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -678,29 +692,7 @@ Map<String, Throwable> deleteGroups() {
                 });
 
                 // delete internal topics
-                if (!success.isEmpty()) {
-                    for (String groupId : success.keySet()) {
-                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
-                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
-                            DeleteTopicsResult deleteTopicsResult = null;
-                            try {
-                                deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
-                                deleteTopicsResult.all().get();
-                            } catch (InterruptedException | ExecutionException 
e) {
-                                if (deleteTopicsResult != null) {
-                                    
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
-                                        try {
-                                            future.get();
-                                        } catch (Exception topicException) {
-                                            System.out.println("Failed to 
delete internal topic: " + topic);
-                                        }
-                                    });
-                                }
-                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
-                            }
-                        }
-                    }
-                }
+                internalTopicsDeletionFailures = 
maybeDeleteInternalTopics(deleteInternalTopics, success, internalTopics);

Review Comment:
   If you pass "internalTopicsToBeDeleted", you don't need to pass 
"deleteInternalTopics" 



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -483,8 +483,8 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> 
resetOffsets() {
                                 result.put(groupId, 
resetOffsetsForInactiveGroup(groupId, dryRun));
                                 // delete internal topics
                                 if (!dryRun) {
-                                    List<String> internalTopics = 
retrieveInternalTopics(List.of(groupId)).get(groupId);
-                                    if (internalTopics != null && 
!internalTopics.isEmpty()) {
+                                    List<String> internalTopics = 
getInternalTopicsForGroup(groupId);

Review Comment:
   `getInternalTopicsToBeDeleted` maybe?



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -73,6 +73,10 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
     private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest 
offset.";
     private static final String RESET_TO_CURRENT_DOC = "Reset offsets to 
current offset.";
     private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting 
current offset by 'n', where 'n' can be positive or negative.";
+    private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified 
internal topic of the streams group. Supported operations: reset-offsets." +

Review Comment:
   also with --delete



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -723,6 +715,36 @@ Map<String, Throwable> deleteGroups() {
             return failed;
         }
 
+        private Map<String, Throwable> maybeDeleteInternalTopics(boolean 
deleteInternalTopics,
+                                               Map<String, Throwable> success,

Review Comment:
   formatting is off



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -73,6 +73,10 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
     private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest 
offset.";
     private static final String RESET_TO_CURRENT_DOC = "Reset offsets to 
current offset.";
     private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting 
current offset by 'n', where 'n' can be positive or negative.";
+    private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified 
internal topic of the streams group. Supported operations: reset-offsets." +
+        "This option is applicable only when --execute is used.";
+    private static final String DELETE_ALL_INTERNAL_TOPICS_DOC = "Delete all 
internal topics linked to the streams group. Supported operations: 
reset-offsets, delete." +

Review Comment:
   also with --delete



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -659,7 +671,9 @@ Map<String, Throwable> deleteGroups() {
             Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
             if (!groupIds.isEmpty()) {
                 // retrieve internal topics before deleting groups
-                internalTopics = retrieveInternalTopics(groupIds);
+                if (deleteInternalTopics) {
+                    internalTopics = retrieveInternalTopics(groupIds);

Review Comment:
   `internalTopicsToBeDeleted`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to