Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
mimaison merged PR #15457: URL: https://github.com/apache/kafka/pull/15457 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
wernerdv commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1514997085 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -322,6 +324,20 @@ private void invalidOffset(String offset) { "'earliest', 'latest', or a non-negative long."); } +private long parseTimeoutMs() { +long timeout; +if (options.has(timeoutMsOpt)) { +timeout = options.valueOf(timeoutMsOpt); +if (timeout < 0) { +CommandLineUtils.printUsageAndExit(parser, "The provided timeout-ms value '" + timeout + Review Comment: Returned to the previous logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
mimaison commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1514959532 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -322,6 +324,20 @@ private void invalidOffset(String offset) { "'earliest', 'latest', or a non-negative long."); } +private long parseTimeoutMs() { +long timeout; +if (options.has(timeoutMsOpt)) { +timeout = options.valueOf(timeoutMsOpt); +if (timeout < 0) { +CommandLineUtils.printUsageAndExit(parser, "The provided timeout-ms value '" + timeout + Review Comment: It seems previously we wouldn't fail if a negative value was provided. ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -385,10 +404,14 @@ String bootstrapServer() { return options.valueOf(bootstrapServerOpt); } -String includedTopicsArg() { -return options.has(includeOpt) -? options.valueOf(includeOpt) -: options.valueOf(whitelistOpt); +Optional includedTopicsArg() { +if (options.has(includeOpt)) { +return Optional.of(options.valueOf(includeOpt)); +} else if (options.has(whitelistOpt)) { +return Optional.of(options.valueOf(whitelistOpt)); +} else { +return Optional.empty(); +} Review Comment: Could this be simplified into: ``` return options.has(includeOpt) ? Optional.of(options.valueOf(includeOpt)) : Optional.ofNullable(options.valueOf(whitelistOpt)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
wernerdv commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1514949575 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); +Optional includedTopics = Optional.ofNullable(opts.includedTopicsArg()); this.consumer = consumer; -this.timeoutMs = timeoutMs; - -if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { -seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); -} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -// default to latest if no offset is provided -seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); -} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -consumer.subscribe(Collections.singletonList(topic.get())); -} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { -consumer.subscribe(Pattern.compile(includedTopics.get())); +timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; Review Comment: Done. ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
mimaison commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1514821883 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); +Optional includedTopics = Optional.ofNullable(opts.includedTopicsArg()); this.consumer = consumer; -this.timeoutMs = timeoutMs; - -if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { -seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); -} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -// default to latest if no offset is provided -seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); -} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -consumer.subscribe(Collections.singletonList(topic.get())); -} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { -consumer.subscribe(Pattern.compile(includedTopics.get())); +timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; Review Comment: Could we merge this with the `timeoutMs()` method from `ConsoleConsumerOptions`? ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); Review Comment: Could `topicArg()` directly return `Optional`? Same below for `includedTopicsArg()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
wernerdv commented on PR #15457: URL: https://github.com/apache/kafka/pull/15457#issuecomment-1980452029 Hello, @mimaison Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
wernerdv commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1511460897 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); +Optional includedTopics = Optional.ofNullable(opts.includedTopicsArg()); this.consumer = consumer; -this.timeoutMs = timeoutMs; - -if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { -seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); -} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -// default to latest if no offset is provided -seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); -} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -consumer.subscribe(Collections.singletonList(topic.get())); -} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { -consumer.subscribe(Pattern.compile(includedTopics.get())); +timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; + +if (topic.isPresent()) { +if (opts.partitionArg().isPresent()) { +seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg()); +} else { +consumer.subscribe(Collections.singletonList(topic.get())); +} } else { -throw new IllegalArgumentException("An invalid combination of arguments is provided. " + Review Comment: Thank you for the review. It seems to me that all the checks that were previously here are already implemented in `ConsoleConsumerOptions#checkRequiredArgs()`, namely: - Exactly one of --include/--topic is required. - The topic is required when partition is specified. - The partition is required when offset is specified. This is protect us from initialise this ConsumerWrapper with invalid combination. I couldn't find a case where I would catch an IllegalArgumentException with the current message. Please correct me if I'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
OmniaGM commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1510981252 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); +Optional includedTopics = Optional.ofNullable(opts.includedTopicsArg()); this.consumer = consumer; -this.timeoutMs = timeoutMs; - -if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { -seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); -} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -// default to latest if no offset is provided -seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); -} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -consumer.subscribe(Collections.singletonList(topic.get())); -} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { -consumer.subscribe(Pattern.compile(includedTopics.get())); +timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; + +if (topic.isPresent()) { +if (opts.partitionArg().isPresent()) { +seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg()); +} else { +consumer.subscribe(Collections.singletonList(topic.get())); +} } else { -throw new IllegalArgumentException("An invalid combination of arguments is provided. " + Review Comment: I just noticed your note in the pr description. We don't have the exact same error message in `ConsoleConsumerOptions#checkRequiredArgs()`. I think it's okay to delete this error message from here if `ConsoleConsumerOptions#checkRequiredArgs()` protect us from initialise this `ConsumerWrapper` with invalid combination -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
OmniaGM commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1510969600 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { -final Optional topic; -final OptionalInt partitionId; -final OptionalLong offset; -final Optional includedTopics; -final Consumer consumer; -final long timeoutMs; final Time time = Time.SYSTEM; +final long timeoutMs; +final Consumer consumer; Iterator> recordIter = Collections.emptyIterator(); -public ConsumerWrapper(Optional topic, - OptionalInt partitionId, - OptionalLong offset, - Optional includedTopics, - Consumer consumer, - long timeoutMs) { -this.topic = topic; -this.partitionId = partitionId; -this.offset = offset; -this.includedTopics = includedTopics; +public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) { +Optional topic = Optional.ofNullable(opts.topicArg()); +Optional includedTopics = Optional.ofNullable(opts.includedTopicsArg()); this.consumer = consumer; -this.timeoutMs = timeoutMs; - -if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { -seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); -} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -// default to latest if no offset is provided -seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); -} else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { -consumer.subscribe(Collections.singletonList(topic.get())); -} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { -consumer.subscribe(Pattern.compile(includedTopics.get())); +timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; + +if (topic.isPresent()) { +if (opts.partitionArg().isPresent()) { +seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg()); +} else { +consumer.subscribe(Collections.singletonList(topic.get())); +} } else { -throw new IllegalArgumentException("An invalid combination of arguments is provided. " + Review Comment: We shouldn't delete this instead it needs to be moved to the validation of `ConsoleConsumerOptions` as the args are not a valid combination. Do we have an alternative exception to this or are we okay with changing the error of this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]
wernerdv commented on PR #15457: URL: https://github.com/apache/kafka/pull/15457#issuecomment-1975051194 Hello, @mimaison @OmniaGM Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org