vamossagar12 commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1266363807


##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -513,8 +373,8 @@ private void resetByDuration(final Consumer<byte[], byte[]> 
client,
 
     // visible for testing
     public void resetToDatetime(final Consumer<byte[], byte[]> client,
-                                final Set<TopicPartition> inputTopicPartitions,
-                                final Long timestamp) {
+                                Set<TopicPartition> inputTopicPartitions,

Review Comment:
   nit: `inputTopicPartitions` and `timestamp` arguements could be made final..



##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -133,77 +112,69 @@ public class StreamsResetter {
             + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
             + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-    private OptionSet options = null;
     private final List<String> allTopics = new LinkedList<>();
 
-
-    public int run(final String[] args) {
-        return run(args, new Properties());
+    public static void main(final String[] args) {
+        Exit.exit(new StreamsResetter().execute(args));
     }
 
-    public int run(final String[] args,
-                   final Properties config) {
-        int exitCode;
+    public int execute(final String[] args) {
+        return execute(args, new Properties());
+    }
 
-        Admin adminClient = null;
+    public int execute(final String[] args, final Properties config) {
         try {
-            parseArguments(args);
-
-            final boolean dryRun = options.has(dryRunOption);
+            StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-            final String groupId = options.valueOf(applicationIdOption);
-            final Properties properties = new Properties();
-            if (options.has(commandConfigOption)) {
-                
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+            String groupId = options.applicationId();

Review Comment:
   Not sure if it matters here but IIRC, streams used to have this checkstyle 
that all variables should be final. Do we want to continue that practice here?



##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -703,8 +544,216 @@ public static boolean matchesInternalTopicFormat(final 
String topicName) {
                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
     }
 
-    public static void main(final String[] args) {
-        Exit.exit(new StreamsResetter().run(args));
-    }
+    private static class StreamsResetterOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> bootstrapServersOption;
+        private final OptionSpec<String> bootstrapServerOption;
+        private final OptionSpec<String> applicationIdOption;
+        private final OptionSpec<String> inputTopicsOption;
+        private final OptionSpec<String> intermediateTopicsOption;
+        private final OptionSpec<String> internalTopicsOption;
+        private final OptionSpec<Long> toOffsetOption;
+        private final OptionSpec<String> toDatetimeOption;
+        private final OptionSpec<String> byDurationOption;
+        private final OptionSpecBuilder toEarliestOption;
+        private final OptionSpecBuilder toLatestOption;
+        private final OptionSpec<String> fromFileOption;
+        private final OptionSpec<Long> shiftByOption;
+        private final OptionSpecBuilder dryRunOption;
+        private final OptionSpec<String> commandConfigOption;
+        private final OptionSpecBuilder forceOption;
+
+        public StreamsResetterOptions(String[] args) {
+            super(args);
+            applicationIdOption = parser.accepts("application-id", "The Kafka 
Streams application ID (application.id).")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("id")
+                .required();
+            bootstrapServersOption = parser.accepts("bootstrap-servers", 
"DEPRECATED: Comma-separated list of broker urls with format: 
HOST1:PORT1,HOST2:PORT2")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("urls");
+            bootstrapServerOption = parser.accepts("bootstrap-server", 
"REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to 
connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. 
(default: localhost:9092)")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("server to connect to");
+            inputTopicsOption = parser.accepts("input-topics", 
"Comma-separated list of user input topics. For these topics, the tool by 
default will reset the offset to the earliest available offset. "
+                    + "Reset to other offset position by appending other reset 
offset option, ex: --input-topics foo --shift-by 5")
+                .withRequiredArg()
+                .ofType(String.class)
+                .withValuesSeparatedBy(',')
+                .describedAs("list");
+            intermediateTopicsOption = parser.accepts("intermediate-topics", 
"Comma-separated list of intermediate user topics (topics that are input and 
output topics, "
+                    + "e.g., used in the deprecated through() method). For 
these topics, the tool will skip to the end.")
+                .withRequiredArg()
+                .ofType(String.class)
+                .withValuesSeparatedBy(',')
+                .describedAs("list");
+            internalTopicsOption = parser.accepts("internal-topics", 
"Comma-separated list of "
+                    + "internal topics to delete. Must be a subset of the 
internal topics marked for deletion by the "
+                    + "default behaviour (do a dry-run without this option to 
view these topics).")
+                .withRequiredArg()
+                .ofType(String.class)
+                .withValuesSeparatedBy(',')
+                .describedAs("list");
+            toOffsetOption = parser.accepts("to-offset", "Reset offsets to a 
specific offset.")
+                .withRequiredArg()
+                .ofType(Long.class);
+            toDatetimeOption = parser.accepts("to-datetime", "Reset offsets to 
offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'")
+                .withRequiredArg()
+                .ofType(String.class);
+            byDurationOption = parser.accepts("by-duration", "Reset offsets to 
offset by duration from current timestamp. Format: 'PnDTnHnMnS'")
+                .withRequiredArg()
+                .ofType(String.class);
+            toEarliestOption = parser.accepts("to-earliest", "Reset offsets to 
earliest offset.");
+            toLatestOption = parser.accepts("to-latest", "Reset offsets to 
latest offset.");
+            fromFileOption = parser.accepts("from-file", "Reset offsets to 
values defined in CSV file.")
+                .withRequiredArg()
+                .ofType(String.class);
+            shiftByOption = parser.accepts("shift-by", "Reset offsets shifting 
current offset by 'n', where 'n' can be positive or negative")
+                .withRequiredArg()
+                .describedAs("number-of-offsets")
+                .ofType(Long.class);
+            commandConfigOption = parser.accepts("config-file", "Property file 
containing configs to be passed to admin clients and embedded consumer.")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("file name");
+            forceOption = parser.accepts("force", "Force the removal of 
members of the consumer group (intended to remove stopped members if a long 
session timeout was used). " +
+                "Make sure to shut down all stream applications when this 
option is specified to avoid unexpected rebalances.");
+
+            dryRunOption = parser.accepts("dry-run", "Display the actions that 
would be performed without executing the reset commands.");
+
+            try {
+                options = parser.parse(args);
+                if (CommandLineUtils.isPrintHelpNeeded(this)) {
+                    CommandLineUtils.printUsageAndExit(parser, USAGE);
+                }
+                if (CommandLineUtils.isPrintVersionNeeded(this)) {
+                    CommandLineUtils.printVersionAndExit();
+                }
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toDatetimeOption, toOffsetOption, byDurationOption, toEarliestOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
byDurationOption, toOffsetOption, toDatetimeOption, toEarliestOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toEarliestOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toLatestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
toLatestOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, fromFileOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
fromFileOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, toLatestOption, shiftByOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
shiftByOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, toLatestOption, fromFileOption);
+            } catch (final OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        public boolean hasDryRun() {

Review Comment:
   nit: Does `dryRun` sound a better method name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to