vamossagar12 commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1266400659


##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -703,8 +544,216 @@ public static boolean matchesInternalTopicFormat(final 
String topicName) {
                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
     }
 
-    public static void main(final String[] args) {
-        Exit.exit(new StreamsResetter().run(args));
-    }
+    private static class StreamsResetterOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> bootstrapServersOption;
+        private final OptionSpec<String> bootstrapServerOption;
+        private final OptionSpec<String> applicationIdOption;
+        private final OptionSpec<String> inputTopicsOption;
+        private final OptionSpec<String> intermediateTopicsOption;
+        private final OptionSpec<String> internalTopicsOption;
+        private final OptionSpec<Long> toOffsetOption;
+        private final OptionSpec<String> toDatetimeOption;
+        private final OptionSpec<String> byDurationOption;
+        private final OptionSpecBuilder toEarliestOption;
+        private final OptionSpecBuilder toLatestOption;
+        private final OptionSpec<String> fromFileOption;
+        private final OptionSpec<Long> shiftByOption;
+        private final OptionSpecBuilder dryRunOption;
+        private final OptionSpec<String> commandConfigOption;
+        private final OptionSpecBuilder forceOption;
+
+        public StreamsResetterOptions(String[] args) {
+            super(args);
+            applicationIdOption = parser.accepts("application-id", "The Kafka 
Streams application ID (application.id).")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("id")
+                .required();
+            bootstrapServersOption = parser.accepts("bootstrap-servers", 
"DEPRECATED: Comma-separated list of broker urls with format: 
HOST1:PORT1,HOST2:PORT2")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("urls");
+            bootstrapServerOption = parser.accepts("bootstrap-server", 
"REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to 
connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. 
(default: localhost:9092)")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("server to connect to");
+            inputTopicsOption = parser.accepts("input-topics", 
"Comma-separated list of user input topics. For these topics, the tool by 
default will reset the offset to the earliest available offset. "
+                    + "Reset to other offset position by appending other reset 
offset option, ex: --input-topics foo --shift-by 5")
+                .withRequiredArg()
+                .ofType(String.class)
+                .withValuesSeparatedBy(',')
+                .describedAs("list");
+            intermediateTopicsOption = parser.accepts("intermediate-topics", 
"Comma-separated list of intermediate user topics (topics that are input and 
output topics, "
+                    + "e.g., used in the deprecated through() method). For 
these topics, the tool will skip to the end.")
+                .withRequiredArg()
+                .ofType(String.class)
+                .withValuesSeparatedBy(',')
+                .describedAs("list");
+            internalTopicsOption = parser.accepts("internal-topics", 
"Comma-separated list of "
+                    + "internal topics to delete. Must be a subset of the 
internal topics marked for deletion by the "
+                    + "default behaviour (do a dry-run without this option to 
view these topics).")
+                .withRequiredArg()
+                .ofType(String.class)
+                .withValuesSeparatedBy(',')
+                .describedAs("list");
+            toOffsetOption = parser.accepts("to-offset", "Reset offsets to a 
specific offset.")
+                .withRequiredArg()
+                .ofType(Long.class);
+            toDatetimeOption = parser.accepts("to-datetime", "Reset offsets to 
offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'")
+                .withRequiredArg()
+                .ofType(String.class);
+            byDurationOption = parser.accepts("by-duration", "Reset offsets to 
offset by duration from current timestamp. Format: 'PnDTnHnMnS'")
+                .withRequiredArg()
+                .ofType(String.class);
+            toEarliestOption = parser.accepts("to-earliest", "Reset offsets to 
earliest offset.");
+            toLatestOption = parser.accepts("to-latest", "Reset offsets to 
latest offset.");
+            fromFileOption = parser.accepts("from-file", "Reset offsets to 
values defined in CSV file.")
+                .withRequiredArg()
+                .ofType(String.class);
+            shiftByOption = parser.accepts("shift-by", "Reset offsets shifting 
current offset by 'n', where 'n' can be positive or negative")
+                .withRequiredArg()
+                .describedAs("number-of-offsets")
+                .ofType(Long.class);
+            commandConfigOption = parser.accepts("config-file", "Property file 
containing configs to be passed to admin clients and embedded consumer.")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("file name");
+            forceOption = parser.accepts("force", "Force the removal of 
members of the consumer group (intended to remove stopped members if a long 
session timeout was used). " +
+                "Make sure to shut down all stream applications when this 
option is specified to avoid unexpected rebalances.");
+
+            dryRunOption = parser.accepts("dry-run", "Display the actions that 
would be performed without executing the reset commands.");
+
+            try {
+                options = parser.parse(args);
+                if (CommandLineUtils.isPrintHelpNeeded(this)) {
+                    CommandLineUtils.printUsageAndExit(parser, USAGE);
+                }
+                if (CommandLineUtils.isPrintVersionNeeded(this)) {
+                    CommandLineUtils.printVersionAndExit();
+                }
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toDatetimeOption, toOffsetOption, byDurationOption, toEarliestOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
byDurationOption, toOffsetOption, toDatetimeOption, toEarliestOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toEarliestOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toLatestOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
fromFileOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, toLatestOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
shiftByOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, toLatestOption, fromFileOption);
+            } catch (final OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasDryRun() {

Review Comment:
   yeah i did notice that. Just that `hasDryRun` sounded slightly verbose. 
Anyways I don't have a strong preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to