Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


mimaison merged PR #15274:
URL: https://github.com/apache/kafka/pull/15274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


mimaison commented on PR #15274:
URL: https://github.com/apache/kafka/pull/15274#issuecomment-1942090513

   None of the test failures seem related, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


mimaison commented on PR #15274:
URL: https://github.com/apache/kafka/pull/15274#issuecomment-1941396381

   Thanks @OmniaGM !
   I created https://issues.apache.org/jira/browse/KAFKA-16246 to track the 
follow up refactoring work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


mimaison commented on PR #15274:
URL: https://github.com/apache/kafka/pull/15274#issuecomment-1941348640

   @OmniaGM do you want to take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1487675184


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -238,7 +237,8 @@ private Set checkConsumerGroup(Properties 
consumerPropsFromFile, Propert
 }
 
 private Properties buildConsumerProps(Properties consumerPropsFromFile, 
Properties extraConsumerProps, Set groupIdsProvided) {
-Properties consumerProps = new Properties(consumerPropsFromFile);
+Properties consumerProps = new Properties();
+consumerProps.putAll(consumerPropsFromFile);

Review Comment:
   Yep! Default values are only retrieved when using `getProperty()` and not 
when using `get()`. This is why the unit test was passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


jlprat commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1487661551


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -238,7 +237,8 @@ private Set checkConsumerGroup(Properties 
consumerPropsFromFile, Propert
 }
 
 private Properties buildConsumerProps(Properties consumerPropsFromFile, 
Properties extraConsumerProps, Set groupIdsProvided) {
-Properties consumerProps = new Properties(consumerPropsFromFile);
+Properties consumerProps = new Properties();
+consumerProps.putAll(consumerPropsFromFile);

Review Comment:
   Strange that this change is not doing the exact same thing. But reading the 
API, constructor with `Property` says that it creates an empty list with the 
passed properties as defaults. Tricky API design from Java's part...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-13 Thread via GitHub


mimaison commented on PR #15274:
URL: https://github.com/apache/kafka/pull/15274#issuecomment-1941235786

   I rebased on trunk to resolve conflicts. It seems my last fix has resolved 
the issues with the system tests so it is ready to review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-09 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1484652868


##
tests/kafkatest/services/console_consumer.py:
##
@@ -21,7 +21,7 @@
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7

Review Comment:
   That seems to work. However while running the system tests I found an issue 
loading configs via `--consumer.config`. I pushed 
https://github.com/apache/kafka/pull/15274/commits/c6355fe630a7eae8ea55c8806c66b04cf389bfa2
 to fix it.
   
   I've kicked another run on our system tests CI, hopefully it will be clean 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-08 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1482987163


##
tests/kafkatest/services/console_consumer.py:
##
@@ -21,7 +21,7 @@
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7

Review Comment:
   Actually since `DEV_VERSION` already points to 3.8, I should be able to 
introduce `LATEST_3_7` even if it's not released yet. I'll try that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-07 Thread via GitHub


jlprat commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1481430316


##
tests/kafkatest/services/console_consumer.py:
##
@@ -21,7 +21,7 @@
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7

Review Comment:
   Do I understand it right that you want to merge this only after 3.7.0 is 
released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478422740


##
tests/kafkatest/services/console_consumer.py:
##
@@ -210,7 +210,10 @@ def start_cmd(self, node):
 
 # LoggingMessageFormatter was introduced after 0.9
 if node.version > LATEST_0_9:
-cmd += " --formatter 
org.apache.kafka.tools.consumer.LoggingMessageFormatter"
+if node.version > LATEST_3_7:
+cmd += " --formatter 
org.apache.kafka.tools.consumer.LoggingMessageFormatter"
+else:
+cmd += " --formatter kafka.tools.LoggingMessageFormatter"

Review Comment:
   `LoggingMessageFormatter`,  `DefaultMessageFormatter`, 
`NoOpMessageFormatter` are not part of the public API and as far as I can tell 
the class names are not visible anywhere so this shouldn't be considered an API 
change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478417021


##
tests/kafkatest/services/console_consumer.py:
##
@@ -21,7 +21,7 @@
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7

Review Comment:
   `LATEST_3_7` does not exist yet, we need to wait for 3.7.0 to be out before 
adding it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478369049


##
tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.record.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+class LoggingMessageFormatter implements MessageFormatter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(LoggingMessageFormatter.class);
+private final DefaultMessageFormatter defaultWriter = new 
DefaultMessageFormatter();
+
+@Override
+public void configure(Map configs) {
+defaultWriter.configure(configs);
+}
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+defaultWriter.writeTo(consumerRecord, output);
+String timestamp = consumerRecord.timestampType() != 
TimestampType.NO_TIMESTAMP_TYPE
+? consumerRecord.timestampType() + ":" + 
consumerRecord.timestamp() + " "
+: "";
+String key = consumerRecord.key() == null ? "null" : new 
String(consumerRecord.key(), StandardCharsets.UTF_8);
+String value = consumerRecord.value() == null ? "null" : new 
String(consumerRecord.value(), StandardCharsets.UTF_8);

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


jlprat commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478335911


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java:
##
@@ -1139,8 +1140,7 @@ private  String 
readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
 "--property", "key.deserializer.window.size.ms=500",
 };
 
-ConsoleConsumer.messageCount_$eq(0); //reset the message count

Review Comment:
   Ah right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478333292


##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java:
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.MockDeserializer;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsoleConsumerOptionsTest {
+
+@Test
+public void shouldParseValidConsumerValidConfig() throws IOException {
+String[] args = new String[]{
+"--bootstrap-server", "localhost:9092",
+"--topic", "test",
+"--from-beginning"
+};
+
+ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+assertEquals("localhost:9092", config.bootstrapServer());
+assertEquals("test", config.topicArg());
+assertTrue(config.fromBeginning());
+assertFalse(config.enableSystestEventsLogging());
+assertFalse(config.skipMessageOnError());
+assertEquals(-1, config.maxMessages());
+assertEquals(-1, config.timeoutMs());

Review Comment:
   Yes, I added a few extra tests to improve the coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478332515


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java:
##
@@ -1139,8 +1140,7 @@ private  String 
readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
 "--property", "key.deserializer.window.size.ms=500",
 };
 
-ConsoleConsumer.messageCount_$eq(0); //reset the message count

Review Comment:
   I made `ConsoleConsumer.run()` reset to count to 0 in 
https://github.com/apache/kafka/pull/15274/files#diff-84e7d94a76d5ecef71ab8b1b978972bcfbe869ace82a87086bdf0067ebf6c2fbR70



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-05 Thread via GitHub


jlprat commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478135674


##
tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.record.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+class LoggingMessageFormatter implements MessageFormatter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(LoggingMessageFormatter.class);
+private final DefaultMessageFormatter defaultWriter = new 
DefaultMessageFormatter();
+
+@Override
+public void configure(Map configs) {
+defaultWriter.configure(configs);
+}
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+defaultWriter.writeTo(consumerRecord, output);
+String timestamp = consumerRecord.timestampType() != 
TimestampType.NO_TIMESTAMP_TYPE
+? consumerRecord.timestampType() + ":" + 
consumerRecord.timestamp() + " "

Review Comment:
   Trailing comma in case timestamps are present is missing
   ```suggestion
   ? consumerRecord.timestampType() + ":" + 
consumerRecord.timestamp() + ", "
   ```



##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java:
##
@@ -1139,8 +1140,7 @@ private  String 
readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
 "--property", "key.deserializer.window.size.ms=500",
 };
 
-ConsoleConsumer.messageCount_$eq(0); //reset the message count

Review Comment:
   Why this is not needed anymore with the new code?



##
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java:
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.MockDeserializer;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsoleConsumerOptionsTest {
+
+@Test
+public void shouldParseValidConsumerValidConfig() throws IOException {
+String[] args = new String[]{
+"--bootstrap-server", "localhost:9092",
+"--topic", "test",
+"--from-beginning"
+};
+
+ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+assertEquals("localhost:9092", config.bootstrapServer());
+assertEquals("test", config.topicArg());
+assertTrue(config.fromBeginning());

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-01 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1474737738


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+class ConsoleConsumer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+static int messageCount = 0;
+
+public static void main(String[] args) throws Exception {
+ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+try {
+run(opts);
+} catch (AuthenticationException ae) {
+LOG.error("Authentication failed: terminating consumer process", 
ae);
+Exit.exit(1);
+} catch (Throwable t) {
+LOG.error("Unknown error when running consumer: ", t);
+Exit.exit(1);
+}
+}
+
+static void run(ConsoleConsumerOptions conf) {
+long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : 
Long.MAX_VALUE;
+Consumer consumer = new 
KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent()
+? new ConsumerWrapper(Optional.of(conf.topicArg()), 
conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
+: new ConsumerWrapper(Optional.of(conf.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs);
+
+addShutdownHook(consumerWrapper, conf);
+
+try {
+process(conf.maxMessages(), conf.formatter(), consumerWrapper, 
System.out, conf.skipMessageOnError());
+} finally {
+consumerWrapper.cleanup();
+conf.formatter().close();
+reportRecordCount();
+
+SHUTDOWN_LATCH.countDown();
+}
+}
+
+static void addShutdownHook(ConsumerWrapper consumer, 
ConsoleConsumerOptions conf) {
+Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+try {
+consumer.wakeup();
+SHUTDOWN_LATCH.await();
+} catch (Throwable t) {
+LOG.error("Exception while running shutdown hook " + 
t.getMessage());
+}
+if (conf.enableSystestEventsLogging()) {
+System.out.println("shutdown_complete");
+}
+});
+}
+
+static void process(int maxMessages, MessageFormatter formatter, 
ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
+while (messageCount < maxMessages || maxMessages == -1) {
+ConsumerRecord msg;
+try {
+msg = consumer.receive();
+} catch (WakeupException we) {
+LOG.trace("Caught WakeupException because consumer is 
shutdown, ignore and terminate.");
+   

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-01 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1474734641


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+class ConsoleConsumer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+static int messageCount = 0;
+
+public static void main(String[] args) throws Exception {
+ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+try {
+run(opts);
+} catch (AuthenticationException ae) {
+LOG.error("Authentication failed: terminating consumer process", 
ae);
+Exit.exit(1);
+} catch (Throwable t) {
+LOG.error("Unknown error when running consumer: ", t);
+Exit.exit(1);
+}
+}
+
+static void run(ConsoleConsumerOptions conf) {
+long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : 
Long.MAX_VALUE;
+Consumer consumer = new 
KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent()
+? new ConsumerWrapper(Optional.of(conf.topicArg()), 
conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
+: new ConsumerWrapper(Optional.of(conf.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs);
+
+addShutdownHook(consumerWrapper, conf);
+
+try {
+process(conf.maxMessages(), conf.formatter(), consumerWrapper, 
System.out, conf.skipMessageOnError());
+} finally {
+consumerWrapper.cleanup();
+conf.formatter().close();
+reportRecordCount();
+
+SHUTDOWN_LATCH.countDown();
+}
+}
+
+static void addShutdownHook(ConsumerWrapper consumer, 
ConsoleConsumerOptions conf) {
+Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+try {
+consumer.wakeup();
+SHUTDOWN_LATCH.await();
+} catch (Throwable t) {
+LOG.error("Exception while running shutdown hook " + 
t.getMessage());
+}
+if (conf.enableSystestEventsLogging()) {
+System.out.println("shutdown_complete");
+}
+});
+}
+
+static void process(int maxMessages, MessageFormatter formatter, 
ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
+while (messageCount < maxMessages || maxMessages == -1) {
+ConsumerRecord msg;
+try {
+msg = consumer.receive();
+} catch (WakeupException we) {
+LOG.trace("Caught WakeupException because consumer is 
shutdown, ignore and terminate.");
+   

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-01 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1474695808


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ConsoleConsumerOptions extends CommandDefaultOptions {
+
+private static final Random RANDOM = new Random();
+
+private final OptionSpec topicOpt;
+private final OptionSpec whitelistOpt;
+private final OptionSpec includeOpt;
+private final OptionSpec partitionIdOpt;
+private final OptionSpec offsetOpt;
+private final OptionSpec messageFormatterOpt;
+private final OptionSpec messageFormatterArgOpt;
+private final OptionSpec messageFormatterConfigOpt;
+private final OptionSpec resetBeginningOpt;
+private final OptionSpec maxMessagesOpt;
+private final OptionSpec timeoutMsOpt;
+private final OptionSpec skipMessageOnErrorOpt;
+private final OptionSpec bootstrapServerOpt;
+private final OptionSpec keyDeserializerOpt;
+private final OptionSpec valueDeserializerOpt;
+private final OptionSpec enableSystestEventsLoggingOpt;
+private final OptionSpec isolationLevelOpt;
+private final OptionSpec groupIdOpt;
+
+private final Properties consumerProps;
+private final long offset;
+private final MessageFormatter formatter;
+
+public ConsoleConsumerOptions(String[] args) throws IOException {
+super(args);
+topicOpt = parser.accepts("topic", "The topic to consume on.")
+.withRequiredArg()
+.describedAs("topic")
+.ofType(String.class);
+whitelistOpt = parser.accepts("whitelist",
+"DEPRECATED, use --include instead; ignored if 
--include specified. Regular expression specifying list of topics to include 
for consumption.")
+.withRequiredArg()
+.describedAs("Java regex (String)")
+.ofType(String.class);
+includeOpt = parser.accepts("include",
+"Regular expression specifying list of topics to 
include for consumption.")
+.withRequiredArg()
+.describedAs("Java regex (String)")
+.ofType(String.class);
+partitionIdOpt = parser.accepts("partition",
+"The partition to consume from. Consumption starts 
from the end of the partition unless '--offset' is specified.")
+.withRequiredArg()
+.describedAs("partition")
+.ofType(Integer.class);
+offsetOpt = parser.accepts("offset", "The offset to consume from (a 
non-negative number), or 'earliest' which means from beginning, or 'latest' 
which means from end")
+.withRequiredArg()
+.describedAs("consume offset")
+.ofType(String.class)
+.defaultsTo("latest");
+OptionSpec consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+.withRequiredArg()
+.describedAs("consumer_prop")
+.ofType(String.class);
+OptionSpec consumerConfigOpt = 
parser.accepts("consumer.config", "Consumer config

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-01-31 Thread via GitHub


OmniaGM commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1472771173


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+class ConsoleConsumer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+static int messageCount = 0;
+
+public static void main(String[] args) throws Exception {
+ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+try {
+run(opts);
+} catch (AuthenticationException ae) {
+LOG.error("Authentication failed: terminating consumer process", 
ae);
+Exit.exit(1);
+} catch (Throwable t) {
+LOG.error("Unknown error when running consumer: ", t);
+Exit.exit(1);
+}
+}
+
+static void run(ConsoleConsumerOptions conf) {
+long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : 
Long.MAX_VALUE;
+Consumer consumer = new 
KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent()
+? new ConsumerWrapper(Optional.of(conf.topicArg()), 
conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
+: new ConsumerWrapper(Optional.of(conf.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs);
+
+addShutdownHook(consumerWrapper, conf);
+
+try {
+process(conf.maxMessages(), conf.formatter(), consumerWrapper, 
System.out, conf.skipMessageOnError());
+} finally {
+consumerWrapper.cleanup();
+conf.formatter().close();
+reportRecordCount();
+
+SHUTDOWN_LATCH.countDown();
+}
+}
+
+static void addShutdownHook(ConsumerWrapper consumer, 
ConsoleConsumerOptions conf) {
+Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+try {
+consumer.wakeup();
+SHUTDOWN_LATCH.await();
+} catch (Throwable t) {
+LOG.error("Exception while running shutdown hook " + 
t.getMessage());
+}
+if (conf.enableSystestEventsLogging()) {
+System.out.println("shutdown_complete");
+}
+});
+}
+
+static void process(int maxMessages, MessageFormatter formatter, 
ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
+while (messageCount < maxMessages || maxMessages == -1) {
+ConsumerRecord msg;
+try {
+msg = consumer.receive();
+} catch (WakeupException we) {
+LOG.trace("Caught WakeupException because consumer is 
shutdown, ignore and terminate.");
+

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-01-31 Thread via GitHub


OmniaGM commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1472764137


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+class ConsoleConsumer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+static int messageCount = 0;
+
+public static void main(String[] args) throws Exception {
+ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+try {
+run(opts);
+} catch (AuthenticationException ae) {
+LOG.error("Authentication failed: terminating consumer process", 
ae);
+Exit.exit(1);
+} catch (Throwable t) {
+LOG.error("Unknown error when running consumer: ", t);
+Exit.exit(1);
+}
+}
+
+static void run(ConsoleConsumerOptions conf) {
+long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : 
Long.MAX_VALUE;
+Consumer consumer = new 
KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent()
+? new ConsumerWrapper(Optional.of(conf.topicArg()), 
conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
+: new ConsumerWrapper(Optional.of(conf.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs);
+
+addShutdownHook(consumerWrapper, conf);
+
+try {
+process(conf.maxMessages(), conf.formatter(), consumerWrapper, 
System.out, conf.skipMessageOnError());
+} finally {
+consumerWrapper.cleanup();
+conf.formatter().close();
+reportRecordCount();
+
+SHUTDOWN_LATCH.countDown();
+}
+}
+
+static void addShutdownHook(ConsumerWrapper consumer, 
ConsoleConsumerOptions conf) {
+Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+try {
+consumer.wakeup();
+SHUTDOWN_LATCH.await();
+} catch (Throwable t) {
+LOG.error("Exception while running shutdown hook " + 
t.getMessage());
+}
+if (conf.enableSystestEventsLogging()) {
+System.out.println("shutdown_complete");
+}
+});
+}
+
+static void process(int maxMessages, MessageFormatter formatter, 
ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
+while (messageCount < maxMessages || maxMessages == -1) {
+ConsumerRecord msg;
+try {
+msg = consumer.receive();
+} catch (WakeupException we) {
+LOG.trace("Caught WakeupException because consumer is 
shutdown, ignore and terminate.");
+

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-01-31 Thread via GitHub


OmniaGM commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1472734283


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+class ConsoleConsumer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+static int messageCount = 0;
+
+public static void main(String[] args) throws Exception {
+ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+try {
+run(opts);
+} catch (AuthenticationException ae) {
+LOG.error("Authentication failed: terminating consumer process", 
ae);
+Exit.exit(1);
+} catch (Throwable t) {
+LOG.error("Unknown error when running consumer: ", t);
+Exit.exit(1);
+}
+}
+
+static void run(ConsoleConsumerOptions conf) {
+long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : 
Long.MAX_VALUE;
+Consumer consumer = new 
KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent()
+? new ConsumerWrapper(Optional.of(conf.topicArg()), 
conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
+: new ConsumerWrapper(Optional.of(conf.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs);
+
+addShutdownHook(consumerWrapper, conf);
+
+try {
+process(conf.maxMessages(), conf.formatter(), consumerWrapper, 
System.out, conf.skipMessageOnError());
+} finally {
+consumerWrapper.cleanup();
+conf.formatter().close();
+reportRecordCount();
+
+SHUTDOWN_LATCH.countDown();
+}
+}
+
+static void addShutdownHook(ConsumerWrapper consumer, 
ConsoleConsumerOptions conf) {
+Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+try {
+consumer.wakeup();
+SHUTDOWN_LATCH.await();
+} catch (Throwable t) {
+LOG.error("Exception while running shutdown hook " + 
t.getMessage());
+}
+if (conf.enableSystestEventsLogging()) {
+System.out.println("shutdown_complete");
+}
+});
+}
+
+static void process(int maxMessages, MessageFormatter formatter, 
ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
+while (messageCount < maxMessages || maxMessages == -1) {
+ConsumerRecord msg;
+try {
+msg = consumer.receive();
+} catch (WakeupException we) {
+LOG.trace("Caught WakeupException because consumer is 
shutdown, ignore and terminate.");
+

Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-01-31 Thread via GitHub


OmniaGM commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1472710835


##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+class ConsoleConsumer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+static int messageCount = 0;
+
+public static void main(String[] args) throws Exception {
+ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+try {
+run(opts);
+} catch (AuthenticationException ae) {
+LOG.error("Authentication failed: terminating consumer process", 
ae);
+Exit.exit(1);
+} catch (Throwable t) {
+LOG.error("Unknown error when running consumer: ", t);
+Exit.exit(1);
+}
+}
+
+static void run(ConsoleConsumerOptions conf) {

Review Comment:
   Just for consistency can we rename `conf` to `opts`like L.57?



##
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apac

[PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-01-26 Thread via GitHub


mimaison opened a new pull request, #15274:
URL: https://github.com/apache/kafka/pull/15274

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org