Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-07 Thread via GitHub


mimaison merged PR #15457:
URL: https://github.com/apache/kafka/pull/15457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


wernerdv commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514997085


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -322,6 +324,20 @@ private void invalidOffset(String offset) {
 "'earliest', 'latest', or a non-negative long.");
 }
 
+private long parseTimeoutMs() {
+long timeout;
+if (options.has(timeoutMsOpt)) {
+timeout = options.valueOf(timeoutMsOpt);
+if (timeout < 0) {
+CommandLineUtils.printUsageAndExit(parser, "The provided 
timeout-ms value '" + timeout +

Review Comment:
   Returned to the previous logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


mimaison commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514959532


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -322,6 +324,20 @@ private void invalidOffset(String offset) {
 "'earliest', 'latest', or a non-negative long.");
 }
 
+private long parseTimeoutMs() {
+long timeout;
+if (options.has(timeoutMsOpt)) {
+timeout = options.valueOf(timeoutMsOpt);
+if (timeout < 0) {
+CommandLineUtils.printUsageAndExit(parser, "The provided 
timeout-ms value '" + timeout +

Review Comment:
   It seems previously we wouldn't fail if a negative value was provided.



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -385,10 +404,14 @@ String bootstrapServer() {
 return options.valueOf(bootstrapServerOpt);
 }
 
-String includedTopicsArg() {
-return options.has(includeOpt)
-? options.valueOf(includeOpt)
-: options.valueOf(whitelistOpt);
+Optional includedTopicsArg() {
+if (options.has(includeOpt)) {
+return Optional.of(options.valueOf(includeOpt));
+} else if (options.has(whitelistOpt)) {
+return Optional.of(options.valueOf(whitelistOpt));
+} else {
+return Optional.empty();
+}

Review Comment:
   Could this be simplified into:
   ```
   return options.has(includeOpt)
   ? Optional.of(options.valueOf(includeOpt))
   : Optional.ofNullable(options.valueOf(whitelistOpt));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


wernerdv commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514949575


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;

Review Comment:
   Done.



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


mimaison commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514821883


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;

Review Comment:
   Could we merge this with the `timeoutMs()` method from 
`ConsoleConsumerOptions`?



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());

Review Comment:
   Could `topicArg()` directly return `Optional`?
   Same below for `includedTopicsArg()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-06 Thread via GitHub


wernerdv commented on PR #15457:
URL: https://github.com/apache/kafka/pull/15457#issuecomment-1980452029

   Hello, @mimaison
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-04 Thread via GitHub


wernerdv commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1511460897


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
+
+if (topic.isPresent()) {
+if (opts.partitionArg().isPresent()) {
+seek(topic.get(), opts.partitionArg().getAsInt(), 
opts.offsetArg());
+} else {
+consumer.subscribe(Collections.singletonList(topic.get()));
+}
 } else {
-throw new IllegalArgumentException("An invalid combination of 
arguments is provided. " +

Review Comment:
   Thank you for the review.
   It seems to me that all the checks that were previously here are already 
implemented in `ConsoleConsumerOptions#checkRequiredArgs()`, namely:
   - Exactly one of --include/--topic is required.
   - The topic is required when partition is specified.
   - The partition is required when offset is specified.
   
   This is protect us from initialise this ConsumerWrapper with invalid 
combination. 
   I couldn't find a case where I would catch an IllegalArgumentException with 
the current message.
   Please correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-04 Thread via GitHub


OmniaGM commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1510981252


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
+
+if (topic.isPresent()) {
+if (opts.partitionArg().isPresent()) {
+seek(topic.get(), opts.partitionArg().getAsInt(), 
opts.offsetArg());
+} else {
+consumer.subscribe(Collections.singletonList(topic.get()));
+}
 } else {
-throw new IllegalArgumentException("An invalid combination of 
arguments is provided. " +

Review Comment:
   I just noticed your note in the pr description. We don't have the exact same 
error message in `ConsoleConsumerOptions#checkRequiredArgs()`. I think it's 
okay to delete this error message from here if 
`ConsoleConsumerOptions#checkRequiredArgs()` protect us from initialise this 
`ConsumerWrapper` with invalid combination 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-04 Thread via GitHub


OmniaGM commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1510969600


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
 }
 
 public static class ConsumerWrapper {
-final Optional topic;
-final OptionalInt partitionId;
-final OptionalLong offset;
-final Optional includedTopics;
-final Consumer consumer;
-final long timeoutMs;
 final Time time = Time.SYSTEM;
+final long timeoutMs;
+final Consumer consumer;
 
 Iterator> recordIter = 
Collections.emptyIterator();
 
-public ConsumerWrapper(Optional topic,
-   OptionalInt partitionId,
-   OptionalLong offset,
-   Optional includedTopics,
-   Consumer consumer,
-   long timeoutMs) {
-this.topic = topic;
-this.partitionId = partitionId;
-this.offset = offset;
-this.includedTopics = includedTopics;
+public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer consumer) {
+Optional topic = Optional.ofNullable(opts.topicArg());
+Optional includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
 this.consumer = consumer;
-this.timeoutMs = timeoutMs;
-
-if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-} else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-// default to latest if no offset is provided
-seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-} else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-consumer.subscribe(Collections.singletonList(topic.get()));
-} else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-consumer.subscribe(Pattern.compile(includedTopics.get()));
+timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
+
+if (topic.isPresent()) {
+if (opts.partitionArg().isPresent()) {
+seek(topic.get(), opts.partitionArg().getAsInt(), 
opts.offsetArg());
+} else {
+consumer.subscribe(Collections.singletonList(topic.get()));
+}
 } else {
-throw new IllegalArgumentException("An invalid combination of 
arguments is provided. " +

Review Comment:
   We shouldn't delete this instead it needs to be moved to the validation of 
`ConsoleConsumerOptions` as the args are not a valid combination. Do we have an 
alternative exception to this or are we okay with changing the error of this 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16246: Cleanups in ConsoleConsumer [kafka]

2024-03-02 Thread via GitHub


wernerdv commented on PR #15457:
URL: https://github.com/apache/kafka/pull/15457#issuecomment-1975051194

   Hello, @mimaison @OmniaGM
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org