Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison merged PR #15274: URL: https://github.com/apache/kafka/pull/15274 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on PR #15274: URL: https://github.com/apache/kafka/pull/15274#issuecomment-1942090513 None of the test failures seem related, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on PR #15274: URL: https://github.com/apache/kafka/pull/15274#issuecomment-1941396381 Thanks @OmniaGM ! I created https://issues.apache.org/jira/browse/KAFKA-16246 to track the follow up refactoring work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on PR #15274: URL: https://github.com/apache/kafka/pull/15274#issuecomment-1941348640 @OmniaGM do you want to take another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1487675184 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -238,7 +237,8 @@ private Set checkConsumerGroup(Properties consumerPropsFromFile, Propert } private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set groupIdsProvided) { -Properties consumerProps = new Properties(consumerPropsFromFile); +Properties consumerProps = new Properties(); +consumerProps.putAll(consumerPropsFromFile); Review Comment: Yep! Default values are only retrieved when using `getProperty()` and not when using `get()`. This is why the unit test was passing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
jlprat commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1487661551 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -238,7 +237,8 @@ private Set checkConsumerGroup(Properties consumerPropsFromFile, Propert } private Properties buildConsumerProps(Properties consumerPropsFromFile, Properties extraConsumerProps, Set groupIdsProvided) { -Properties consumerProps = new Properties(consumerPropsFromFile); +Properties consumerProps = new Properties(); +consumerProps.putAll(consumerPropsFromFile); Review Comment: Strange that this change is not doing the exact same thing. But reading the API, constructor with `Property` says that it creates an empty list with the passed properties as defaults. Tricky API design from Java's part... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on PR #15274: URL: https://github.com/apache/kafka/pull/15274#issuecomment-1941235786 I rebased on trunk to resolve conflicts. It seems my last fix has resolved the issues with the system tests so it is ready to review again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1484652868 ## tests/kafkatest/services/console_consumer.py: ## @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 Review Comment: That seems to work. However while running the system tests I found an issue loading configs via `--consumer.config`. I pushed https://github.com/apache/kafka/pull/15274/commits/c6355fe630a7eae8ea55c8806c66b04cf389bfa2 to fix it. I've kicked another run on our system tests CI, hopefully it will be clean now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1482987163 ## tests/kafkatest/services/console_consumer.py: ## @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 Review Comment: Actually since `DEV_VERSION` already points to 3.8, I should be able to introduce `LATEST_3_7` even if it's not released yet. I'll try that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
jlprat commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1481430316 ## tests/kafkatest/services/console_consumer.py: ## @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 Review Comment: Do I understand it right that you want to merge this only after 3.7.0 is released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478422740 ## tests/kafkatest/services/console_consumer.py: ## @@ -210,7 +210,10 @@ def start_cmd(self, node): # LoggingMessageFormatter was introduced after 0.9 if node.version > LATEST_0_9: -cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" +if node.version > LATEST_3_7: +cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter" +else: +cmd += " --formatter kafka.tools.LoggingMessageFormatter" Review Comment: `LoggingMessageFormatter`, `DefaultMessageFormatter`, `NoOpMessageFormatter` are not part of the public API and as far as I can tell the class names are not visible anywhere so this shouldn't be considered an API change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478417021 ## tests/kafkatest/services/console_consumer.py: ## @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 Review Comment: `LATEST_3_7` does not exist yet, we need to wait for 3.7.0 to be out before adding it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478369049 ## tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.record.TimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +class LoggingMessageFormatter implements MessageFormatter { + +private static final Logger LOG = LoggerFactory.getLogger(LoggingMessageFormatter.class); +private final DefaultMessageFormatter defaultWriter = new DefaultMessageFormatter(); + +@Override +public void configure(Map configs) { +defaultWriter.configure(configs); +} + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +defaultWriter.writeTo(consumerRecord, output); +String timestamp = consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE +? consumerRecord.timestampType() + ":" + consumerRecord.timestamp() + " " +: ""; +String key = consumerRecord.key() == null ? "null" : new String(consumerRecord.key(), StandardCharsets.UTF_8); +String value = consumerRecord.value() == null ? "null" : new String(consumerRecord.value(), StandardCharsets.UTF_8); Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
jlprat commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478335911 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java: ## @@ -1139,8 +1140,7 @@ private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial "--property", "key.deserializer.window.size.ms=500", }; -ConsoleConsumer.messageCount_$eq(0); //reset the message count Review Comment: Ah right! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478333292 ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java: ## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.test.MockDeserializer; +import org.apache.kafka.tools.ToolsTestUtils; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsoleConsumerOptionsTest { + +@Test +public void shouldParseValidConsumerValidConfig() throws IOException { +String[] args = new String[]{ +"--bootstrap-server", "localhost:9092", +"--topic", "test", +"--from-beginning" +}; + +ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + +assertEquals("localhost:9092", config.bootstrapServer()); +assertEquals("test", config.topicArg()); +assertTrue(config.fromBeginning()); +assertFalse(config.enableSystestEventsLogging()); +assertFalse(config.skipMessageOnError()); +assertEquals(-1, config.maxMessages()); +assertEquals(-1, config.timeoutMs()); Review Comment: Yes, I added a few extra tests to improve the coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478332515 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java: ## @@ -1139,8 +1140,7 @@ private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial "--property", "key.deserializer.window.size.ms=500", }; -ConsoleConsumer.messageCount_$eq(0); //reset the message count Review Comment: I made `ConsoleConsumer.run()` reset to count to 0 in https://github.com/apache/kafka/pull/15274/files#diff-84e7d94a76d5ecef71ab8b1b978972bcfbe869ace82a87086bdf0067ebf6c2fbR70 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
jlprat commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1478135674 ## tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.record.TimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +class LoggingMessageFormatter implements MessageFormatter { + +private static final Logger LOG = LoggerFactory.getLogger(LoggingMessageFormatter.class); +private final DefaultMessageFormatter defaultWriter = new DefaultMessageFormatter(); + +@Override +public void configure(Map configs) { +defaultWriter.configure(configs); +} + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +defaultWriter.writeTo(consumerRecord, output); +String timestamp = consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE +? consumerRecord.timestampType() + ":" + consumerRecord.timestamp() + " " Review Comment: Trailing comma in case timestamps are present is missing ```suggestion ? consumerRecord.timestampType() + ":" + consumerRecord.timestamp() + ", " ``` ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java: ## @@ -1139,8 +1140,7 @@ private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial "--property", "key.deserializer.window.size.ms=500", }; -ConsoleConsumer.messageCount_$eq(0); //reset the message count Review Comment: Why this is not needed anymore with the new code? ## tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java: ## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.test.MockDeserializer; +import org.apache.kafka.tools.ToolsTestUtils; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsoleConsumerOptionsTest { + +@Test +public void shouldParseValidConsumerValidConfig() throws IOException { +String[] args = new String[]{ +"--bootstrap-server", "localhost:9092", +"--topic", "test", +"--from-beginning" +}; + +ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); + +assertEquals("localhost:9092", config.bootstrapServer()); +assertEquals("test", config.topicArg()); +assertTrue(config.fromBeginning());
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1474737738 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +class ConsoleConsumer { + +private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); +private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + +static int messageCount = 0; + +public static void main(String[] args) throws Exception { +ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); +try { +run(opts); +} catch (AuthenticationException ae) { +LOG.error("Authentication failed: terminating consumer process", ae); +Exit.exit(1); +} catch (Throwable t) { +LOG.error("Unknown error when running consumer: ", t); +Exit.exit(1); +} +} + +static void run(ConsoleConsumerOptions conf) { +long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : Long.MAX_VALUE; +Consumer consumer = new KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); +ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent() +? new ConsumerWrapper(Optional.of(conf.topicArg()), conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), consumer, timeoutMs) +: new ConsumerWrapper(Optional.of(conf.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs); + +addShutdownHook(consumerWrapper, conf); + +try { +process(conf.maxMessages(), conf.formatter(), consumerWrapper, System.out, conf.skipMessageOnError()); +} finally { +consumerWrapper.cleanup(); +conf.formatter().close(); +reportRecordCount(); + +SHUTDOWN_LATCH.countDown(); +} +} + +static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) { +Exit.addShutdownHook("consumer-shutdown-hook", () -> { +try { +consumer.wakeup(); +SHUTDOWN_LATCH.await(); +} catch (Throwable t) { +LOG.error("Exception while running shutdown hook " + t.getMessage()); +} +if (conf.enableSystestEventsLogging()) { +System.out.println("shutdown_complete"); +} +}); +} + +static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) { +while (messageCount < maxMessages || maxMessages == -1) { +ConsumerRecord msg; +try { +msg = consumer.receive(); +} catch (WakeupException we) { +LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); +
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1474734641 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +class ConsoleConsumer { + +private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); +private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + +static int messageCount = 0; + +public static void main(String[] args) throws Exception { +ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); +try { +run(opts); +} catch (AuthenticationException ae) { +LOG.error("Authentication failed: terminating consumer process", ae); +Exit.exit(1); +} catch (Throwable t) { +LOG.error("Unknown error when running consumer: ", t); +Exit.exit(1); +} +} + +static void run(ConsoleConsumerOptions conf) { +long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : Long.MAX_VALUE; +Consumer consumer = new KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); +ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent() +? new ConsumerWrapper(Optional.of(conf.topicArg()), conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), consumer, timeoutMs) +: new ConsumerWrapper(Optional.of(conf.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs); + +addShutdownHook(consumerWrapper, conf); + +try { +process(conf.maxMessages(), conf.formatter(), consumerWrapper, System.out, conf.skipMessageOnError()); +} finally { +consumerWrapper.cleanup(); +conf.formatter().close(); +reportRecordCount(); + +SHUTDOWN_LATCH.countDown(); +} +} + +static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) { +Exit.addShutdownHook("consumer-shutdown-hook", () -> { +try { +consumer.wakeup(); +SHUTDOWN_LATCH.await(); +} catch (Throwable t) { +LOG.error("Exception while running shutdown hook " + t.getMessage()); +} +if (conf.enableSystestEventsLogging()) { +System.out.println("shutdown_complete"); +} +}); +} + +static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) { +while (messageCount < maxMessages || maxMessages == -1) { +ConsumerRecord msg; +try { +msg = consumer.receive(); +} catch (WakeupException we) { +LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); +
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1474695808 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java: ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +public class ConsoleConsumerOptions extends CommandDefaultOptions { + +private static final Random RANDOM = new Random(); + +private final OptionSpec topicOpt; +private final OptionSpec whitelistOpt; +private final OptionSpec includeOpt; +private final OptionSpec partitionIdOpt; +private final OptionSpec offsetOpt; +private final OptionSpec messageFormatterOpt; +private final OptionSpec messageFormatterArgOpt; +private final OptionSpec messageFormatterConfigOpt; +private final OptionSpec resetBeginningOpt; +private final OptionSpec maxMessagesOpt; +private final OptionSpec timeoutMsOpt; +private final OptionSpec skipMessageOnErrorOpt; +private final OptionSpec bootstrapServerOpt; +private final OptionSpec keyDeserializerOpt; +private final OptionSpec valueDeserializerOpt; +private final OptionSpec enableSystestEventsLoggingOpt; +private final OptionSpec isolationLevelOpt; +private final OptionSpec groupIdOpt; + +private final Properties consumerProps; +private final long offset; +private final MessageFormatter formatter; + +public ConsoleConsumerOptions(String[] args) throws IOException { +super(args); +topicOpt = parser.accepts("topic", "The topic to consume on.") +.withRequiredArg() +.describedAs("topic") +.ofType(String.class); +whitelistOpt = parser.accepts("whitelist", +"DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.") +.withRequiredArg() +.describedAs("Java regex (String)") +.ofType(String.class); +includeOpt = parser.accepts("include", +"Regular expression specifying list of topics to include for consumption.") +.withRequiredArg() +.describedAs("Java regex (String)") +.ofType(String.class); +partitionIdOpt = parser.accepts("partition", +"The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.") +.withRequiredArg() +.describedAs("partition") +.ofType(Integer.class); +offsetOpt = parser.accepts("offset", "The offset to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end") +.withRequiredArg() +.describedAs("consume offset") +.ofType(String.class) +.defaultsTo("latest"); +OptionSpec consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") +.withRequiredArg() +.describedAs("consumer_prop") +.ofType(String.class); +OptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Consumer config
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
OmniaGM commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1472771173 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +class ConsoleConsumer { + +private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); +private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + +static int messageCount = 0; + +public static void main(String[] args) throws Exception { +ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); +try { +run(opts); +} catch (AuthenticationException ae) { +LOG.error("Authentication failed: terminating consumer process", ae); +Exit.exit(1); +} catch (Throwable t) { +LOG.error("Unknown error when running consumer: ", t); +Exit.exit(1); +} +} + +static void run(ConsoleConsumerOptions conf) { +long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : Long.MAX_VALUE; +Consumer consumer = new KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); +ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent() +? new ConsumerWrapper(Optional.of(conf.topicArg()), conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), consumer, timeoutMs) +: new ConsumerWrapper(Optional.of(conf.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs); + +addShutdownHook(consumerWrapper, conf); + +try { +process(conf.maxMessages(), conf.formatter(), consumerWrapper, System.out, conf.skipMessageOnError()); +} finally { +consumerWrapper.cleanup(); +conf.formatter().close(); +reportRecordCount(); + +SHUTDOWN_LATCH.countDown(); +} +} + +static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) { +Exit.addShutdownHook("consumer-shutdown-hook", () -> { +try { +consumer.wakeup(); +SHUTDOWN_LATCH.await(); +} catch (Throwable t) { +LOG.error("Exception while running shutdown hook " + t.getMessage()); +} +if (conf.enableSystestEventsLogging()) { +System.out.println("shutdown_complete"); +} +}); +} + +static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) { +while (messageCount < maxMessages || maxMessages == -1) { +ConsumerRecord msg; +try { +msg = consumer.receive(); +} catch (WakeupException we) { +LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); +
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
OmniaGM commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1472764137 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +class ConsoleConsumer { + +private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); +private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + +static int messageCount = 0; + +public static void main(String[] args) throws Exception { +ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); +try { +run(opts); +} catch (AuthenticationException ae) { +LOG.error("Authentication failed: terminating consumer process", ae); +Exit.exit(1); +} catch (Throwable t) { +LOG.error("Unknown error when running consumer: ", t); +Exit.exit(1); +} +} + +static void run(ConsoleConsumerOptions conf) { +long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : Long.MAX_VALUE; +Consumer consumer = new KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); +ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent() +? new ConsumerWrapper(Optional.of(conf.topicArg()), conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), consumer, timeoutMs) +: new ConsumerWrapper(Optional.of(conf.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs); + +addShutdownHook(consumerWrapper, conf); + +try { +process(conf.maxMessages(), conf.formatter(), consumerWrapper, System.out, conf.skipMessageOnError()); +} finally { +consumerWrapper.cleanup(); +conf.formatter().close(); +reportRecordCount(); + +SHUTDOWN_LATCH.countDown(); +} +} + +static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) { +Exit.addShutdownHook("consumer-shutdown-hook", () -> { +try { +consumer.wakeup(); +SHUTDOWN_LATCH.await(); +} catch (Throwable t) { +LOG.error("Exception while running shutdown hook " + t.getMessage()); +} +if (conf.enableSystestEventsLogging()) { +System.out.println("shutdown_complete"); +} +}); +} + +static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) { +while (messageCount < maxMessages || maxMessages == -1) { +ConsumerRecord msg; +try { +msg = consumer.receive(); +} catch (WakeupException we) { +LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); +
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
OmniaGM commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1472734283 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +class ConsoleConsumer { + +private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); +private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + +static int messageCount = 0; + +public static void main(String[] args) throws Exception { +ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); +try { +run(opts); +} catch (AuthenticationException ae) { +LOG.error("Authentication failed: terminating consumer process", ae); +Exit.exit(1); +} catch (Throwable t) { +LOG.error("Unknown error when running consumer: ", t); +Exit.exit(1); +} +} + +static void run(ConsoleConsumerOptions conf) { +long timeoutMs = conf.timeoutMs() >= 0 ? conf.timeoutMs() : Long.MAX_VALUE; +Consumer consumer = new KafkaConsumer<>(conf.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); +ConsumerWrapper consumerWrapper = conf.partitionArg().isPresent() +? new ConsumerWrapper(Optional.of(conf.topicArg()), conf.partitionArg(), OptionalLong.of(conf.offsetArg()), Optional.empty(), consumer, timeoutMs) +: new ConsumerWrapper(Optional.of(conf.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(conf.includedTopicsArg()), consumer, timeoutMs); + +addShutdownHook(consumerWrapper, conf); + +try { +process(conf.maxMessages(), conf.formatter(), consumerWrapper, System.out, conf.skipMessageOnError()); +} finally { +consumerWrapper.cleanup(); +conf.formatter().close(); +reportRecordCount(); + +SHUTDOWN_LATCH.countDown(); +} +} + +static void addShutdownHook(ConsumerWrapper consumer, ConsoleConsumerOptions conf) { +Exit.addShutdownHook("consumer-shutdown-hook", () -> { +try { +consumer.wakeup(); +SHUTDOWN_LATCH.await(); +} catch (Throwable t) { +LOG.error("Exception while running shutdown hook " + t.getMessage()); +} +if (conf.enableSystestEventsLogging()) { +System.out.println("shutdown_complete"); +} +}); +} + +static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) { +while (messageCount < maxMessages || maxMessages == -1) { +ConsumerRecord msg; +try { +msg = consumer.receive(); +} catch (WakeupException we) { +LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate."); +
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
OmniaGM commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1472710835 ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Consumer that dumps messages to standard out. + */ +class ConsoleConsumer { + +private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class); +private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1); + +static int messageCount = 0; + +public static void main(String[] args) throws Exception { +ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args); +try { +run(opts); +} catch (AuthenticationException ae) { +LOG.error("Authentication failed: terminating consumer process", ae); +Exit.exit(1); +} catch (Throwable t) { +LOG.error("Unknown error when running consumer: ", t); +Exit.exit(1); +} +} + +static void run(ConsoleConsumerOptions conf) { Review Comment: Just for consistency can we rename `conf` to `opts`like L.57? ## tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apac
[PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison opened a new pull request, #15274: URL: https://github.com/apache/kafka/pull/15274 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org