OmniaGM commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1510969600


##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##########
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
     }
 
     public static class ConsumerWrapper {
-        final Optional<String> topic;
-        final OptionalInt partitionId;
-        final OptionalLong offset;
-        final Optional<String> includedTopics;
-        final Consumer<byte[], byte[]> consumer;
-        final long timeoutMs;
         final Time time = Time.SYSTEM;
+        final long timeoutMs;
+        final Consumer<byte[], byte[]> consumer;
 
         Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
Collections.emptyIterator();
 
-        public ConsumerWrapper(Optional<String> topic,
-                               OptionalInt partitionId,
-                               OptionalLong offset,
-                               Optional<String> includedTopics,
-                               Consumer<byte[], byte[]> consumer,
-                               long timeoutMs) {
-            this.topic = topic;
-            this.partitionId = partitionId;
-            this.offset = offset;
-            this.includedTopics = includedTopics;
+        public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], 
byte[]> consumer) {
+            Optional<String> topic = Optional.ofNullable(opts.topicArg());
+            Optional<String> includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
             this.consumer = consumer;
-            this.timeoutMs = timeoutMs;
-
-            if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-                seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-            } else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-                // default to latest if no offset is provided
-                seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-            } else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-                consumer.subscribe(Collections.singletonList(topic.get()));
-            } else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-                consumer.subscribe(Pattern.compile(includedTopics.get()));
+            timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
+
+            if (topic.isPresent()) {
+                if (opts.partitionArg().isPresent()) {
+                    seek(topic.get(), opts.partitionArg().getAsInt(), 
opts.offsetArg());
+                } else {
+                    consumer.subscribe(Collections.singletonList(topic.get()));
+                }
             } else {
-                throw new IllegalArgumentException("An invalid combination of 
arguments is provided. " +

Review Comment:
   We shouldn't delete this instead it needs to be moved to the validation of 
`ConsoleConsumerOptions` as the args are not a valid combination. Do we have an 
alternative exception to this or are we okay with changing the error of this 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to