vamossagar12 commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1266400659
########## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ########## @@ -703,8 +544,216 @@ public static boolean matchesInternalTopicFormat(final String topicName) { || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic"); } - public static void main(final String[] args) { - Exit.exit(new StreamsResetter().run(args)); - } + private static class StreamsResetterOptions extends CommandDefaultOptions { + private final OptionSpec<String> bootstrapServersOption; + private final OptionSpec<String> bootstrapServerOption; + private final OptionSpec<String> applicationIdOption; + private final OptionSpec<String> inputTopicsOption; + private final OptionSpec<String> intermediateTopicsOption; + private final OptionSpec<String> internalTopicsOption; + private final OptionSpec<Long> toOffsetOption; + private final OptionSpec<String> toDatetimeOption; + private final OptionSpec<String> byDurationOption; + private final OptionSpecBuilder toEarliestOption; + private final OptionSpecBuilder toLatestOption; + private final OptionSpec<String> fromFileOption; + private final OptionSpec<Long> shiftByOption; + private final OptionSpecBuilder dryRunOption; + private final OptionSpec<String> commandConfigOption; + private final OptionSpecBuilder forceOption; + + public StreamsResetterOptions(String[] args) { + super(args); + applicationIdOption = parser.accepts("application-id", "The Kafka Streams application ID (application.id).") + .withRequiredArg() + .ofType(String.class) + .describedAs("id") + .required(); + bootstrapServersOption = parser.accepts("bootstrap-servers", "DEPRECATED: Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") + .withRequiredArg() + .ofType(String.class) + .describedAs("urls"); + bootstrapServerOption = parser.accepts("bootstrap-server", "REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. (default: localhost:9092)") + .withRequiredArg() + .ofType(String.class) + .describedAs("server to connect to"); + inputTopicsOption = parser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset. " + + "Reset to other offset position by appending other reset offset option, ex: --input-topics foo --shift-by 5") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + intermediateTopicsOption = parser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, " + + "e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + internalTopicsOption = parser.accepts("internal-topics", "Comma-separated list of " + + "internal topics to delete. Must be a subset of the internal topics marked for deletion by the " + + "default behaviour (do a dry-run without this option to view these topics).") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + toOffsetOption = parser.accepts("to-offset", "Reset offsets to a specific offset.") + .withRequiredArg() + .ofType(Long.class); + toDatetimeOption = parser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'") + .withRequiredArg() + .ofType(String.class); + byDurationOption = parser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'") + .withRequiredArg() + .ofType(String.class); + toEarliestOption = parser.accepts("to-earliest", "Reset offsets to earliest offset."); + toLatestOption = parser.accepts("to-latest", "Reset offsets to latest offset."); + fromFileOption = parser.accepts("from-file", "Reset offsets to values defined in CSV file.") + .withRequiredArg() + .ofType(String.class); + shiftByOption = parser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative") + .withRequiredArg() + .describedAs("number-of-offsets") + .ofType(Long.class); + commandConfigOption = parser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.") + .withRequiredArg() + .ofType(String.class) + .describedAs("file name"); + forceOption = parser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " + + "Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances."); + + dryRunOption = parser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands."); + + try { + options = parser.parse(args); + if (CommandLineUtils.isPrintHelpNeeded(this)) { + CommandLineUtils.printUsageAndExit(parser, USAGE); + } + if (CommandLineUtils.isPrintVersionNeeded(this)) { + CommandLineUtils.printVersionAndExit(); + } + CommandLineUtils.checkInvalidArgs(parser, options, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, toLatestOption, fromFileOption, shiftByOption); + CommandLineUtils.checkInvalidArgs(parser, options, toDatetimeOption, toOffsetOption, byDurationOption, toEarliestOption, toLatestOption, fromFileOption, shiftByOption); + CommandLineUtils.checkInvalidArgs(parser, options, byDurationOption, toOffsetOption, toDatetimeOption, toEarliestOption, toLatestOption, fromFileOption, shiftByOption); + CommandLineUtils.checkInvalidArgs(parser, options, toEarliestOption, toOffsetOption, toDatetimeOption, byDurationOption, toLatestOption, fromFileOption, shiftByOption); + CommandLineUtils.checkInvalidArgs(parser, options, toLatestOption, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, fromFileOption, shiftByOption); + CommandLineUtils.checkInvalidArgs(parser, options, fromFileOption, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, toLatestOption, shiftByOption); + CommandLineUtils.checkInvalidArgs(parser, options, shiftByOption, toOffsetOption, toDatetimeOption, byDurationOption, toEarliestOption, toLatestOption, fromFileOption); + } catch (final OptionException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + } + } + + public boolean hasDryRun() { Review Comment: yeah i did notice that. Just that `hasDryRun` sounded slightly verbose. Anyways I don't have a strong preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org