KAFKA-3597; Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly
Even if a test calls stop() on console_consumer or verifiable_producer, it is still possible that producer/consumer will not shutdown cleanly, and will be killed forcefully after a timeout. It will be useful for some tests to know whether a clean shutdown happened or not. This PR adds methods to console_consumer and verifiable_producer to query whether clean shutdown happened or not. hachikuji and/or granders Please review. Author: Anna Povzner <[email protected]> Reviewers: Jason Gustafson, Geoff Anderson, Gwen Shapira Closes #1278 from apovzner/kafka-3597 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e29eac4b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e29eac4b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e29eac4b Branch: refs/heads/0.10.0 Commit: e29eac4bbb678aa3d5a29a75f413a7b10cc2f0b1 Parents: eb50d2f Author: Anna Povzner <[email protected]> Authored: Fri Apr 29 10:51:29 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Fri Apr 29 10:51:29 2016 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/tools/ConsoleConsumer.scala | 8 ++++++++ tests/kafkatest/services/console_consumer.py | 15 +++++++++++---- tests/kafkatest/services/verifiable_producer.py | 6 ++++++ .../org/apache/kafka/tools/VerifiableProducer.java | 8 ++++++++ 4 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index e9a43f2..8953640 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -103,6 +103,10 @@ object ConsoleConsumer extends Logging { consumer.stop() shutdownLatch.await() + + if (conf.enableSystestEventsLogging) { + System.out.println("shutdown_complete") + } } }) } @@ -253,6 +257,9 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) + val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", + "Log lifecycle events of the consumer in addition to logging consumed " + + "messages. (This is specific for system tests.)") if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -260,6 +267,7 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) val useNewConsumer = options.has(useNewConsumerOpt) + val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt) // If using old consumer, exactly one of whitelist/blacklist/topic is required. // If using new consumer, topic must be specified. http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index e5f2196..37638e2 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -123,6 +123,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + self.clean_shutdown_nodes = set() self.client_id = client_id self.print_key = print_key self.log_level = "TRACE" @@ -185,6 +186,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): if node.version > LATEST_0_9: cmd+=" --formatter kafka.tools.LoggingMessageFormatter" + cmd += " --enable-systest-events" cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd @@ -226,10 +228,15 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): for line in itertools.chain([first_line], consumer_output): msg = line.strip() - if self.message_validator is not None: - msg = self.message_validator(msg) - if msg is not None: - self.messages_consumed[idx].append(msg) + if msg == "shutdown_complete": + if node in self.clean_shutdown_nodes: + raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx) + self.clean_shutdown_nodes.add(node) + else: + if self.message_validator is not None: + msg = self.message_validator(msg) + if msg is not None: + self.messages_consumed[idx].append(msg) self.read_jmx_output(idx, node) http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 500410f..4fec776 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -71,6 +71,7 @@ class VerifiableProducer(BackgroundThreadService): self.acked_values = [] self.not_acked_values = [] self.produced_count = {} + self.clean_shutdown_nodes = set() self.acks = acks @@ -139,6 +140,11 @@ class VerifiableProducer(BackgroundThreadService): last_produced_time = t prev_msg = data + elif data["name"] == "shutdown_complete": + if node in self.clean_shutdown_nodes: + raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx) + self.clean_shutdown_nodes.add(node) + def start_cmd(self, node, idx): cmd = "" http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 9b10a9f..b511fb9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -247,6 +247,14 @@ public class VerifiableProducer { /** Close the producer to flush any remaining messages. */ public void close() { producer.close(); + System.out.println(shutdownString()); + } + + String shutdownString() { + Map<String, Object> data = new HashMap<>(); + data.put("class", this.getClass().toString()); + data.put("name", "shutdown_complete"); + return toJsonString(data); } /**
