OmniaGM commented on code in PR #15457: URL: https://github.com/apache/kafka/pull/15457#discussion_r1510969600
########## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ########## @@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) { } public static class ConsumerWrapper { - final Optional<String> topic; - final OptionalInt partitionId; - final OptionalLong offset; - final Optional<String> includedTopics; - final Consumer<byte[], byte[]> consumer; - final long timeoutMs; final Time time = Time.SYSTEM; + final long timeoutMs; + final Consumer<byte[], byte[]> consumer; Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator(); - public ConsumerWrapper(Optional<String> topic, - OptionalInt partitionId, - OptionalLong offset, - Optional<String> includedTopics, - Consumer<byte[], byte[]> consumer, - long timeoutMs) { - this.topic = topic; - this.partitionId = partitionId; - this.offset = offset; - this.includedTopics = includedTopics; + public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], byte[]> consumer) { + Optional<String> topic = Optional.ofNullable(opts.topicArg()); + Optional<String> includedTopics = Optional.ofNullable(opts.includedTopicsArg()); this.consumer = consumer; - this.timeoutMs = timeoutMs; - - if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { - seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); - } else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { - // default to latest if no offset is provided - seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); - } else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { - consumer.subscribe(Collections.singletonList(topic.get())); - } else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { - consumer.subscribe(Pattern.compile(includedTopics.get())); + timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; + + if (topic.isPresent()) { + if (opts.partitionArg().isPresent()) { + seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg()); + } else { + consumer.subscribe(Collections.singletonList(topic.get())); + } } else { - throw new IllegalArgumentException("An invalid combination of arguments is provided. " + Review Comment: We shouldn't delete this instead it needs to be moved to the validation of `ConsoleConsumerOptions` as the args are not a valid combination. Do we have an alternative exception to this or are we okay with changing the error of this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org