Repository: kafka Updated Branches: refs/heads/trunk 263c10ab7 -> b62f8ea43
KAFKA-2531: Add Ducktape based tests for KafkaLog4jAppender Author: Ashish Singh <[email protected]> Reviewers: Geoff Anderson, Edwerd Ribeiro, Ewen Cheslack-Postava, Gwen Shapira Closes #235 from SinghAsDev/KAFKA-2531 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b62f8ea4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b62f8ea4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b62f8ea4 Branch: refs/heads/trunk Commit: b62f8ea43b6d5307f7274fbe8b7984dd5ee22239 Parents: 263c10a Author: Ashish Singh <[email protected]> Authored: Sat Sep 26 18:32:49 2015 -0700 Committer: Gwen Shapira <[email protected]> Committed: Sat Sep 26 18:32:49 2015 -0700 ---------------------------------------------------------------------- .../sanity_checks/test_console_consumer.py | 20 +-- .../kafkatest/services/kafka_log4j_appender.py | 61 +++++++ tests/kafkatest/tests/log4j_appender_test.py | 62 +++++++ tests/kafkatest/utils/__init__.py | 15 ++ tests/kafkatest/utils/remote_account.py | 32 ++++ .../clients/tools/VerifiableLog4jAppender.java | 162 +++++++++++++++++++ .../kafka/clients/tools/VerifiableProducer.java | 3 +- 7 files changed, 335 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/sanity_checks/test_console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 5b061aa..4544c00 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -19,28 +19,10 @@ from ducktape.utils.util import wait_until from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.utils.remote_account import line_count, file_exists import time - -def file_exists(node, file): - """Quick and dirty check for existence of remote file.""" - try: - node.account.ssh("cat " + file, allow_fail=False) - return True - except: - return False - - -def line_count(node, file): - """Return the line count of file on node""" - out = [line for line in node.account.ssh_capture("wc -l %s" % file)] - if len(out) != 1: - raise Exception("Expected single line of output from wc -l") - - return int(out[0].strip().split(" ")[0]) - - class ConsoleConsumerTest(Test): """Sanity checks on console consumer service class.""" def __init__(self, test_context): http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/services/kafka_log4j_appender.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py new file mode 100644 index 0000000..11369aa --- /dev/null +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + + +class KafkaLog4jAppender(BackgroundThreadService): + + logs = { + "producer_log": { + "path": "/mnt/kafka_log4j_appender.log", + "collect_default": False} + } + + def __init__(self, context, num_nodes, kafka, topic, max_messages=-1): + super(KafkaLog4jAppender, self).__init__(context, num_nodes) + + self.kafka = kafka + self.topic = topic + self.max_messages = max_messages + + def _worker(self, idx, node): + cmd = self.start_cmd + self.logger.debug("VerifiableKafkaLog4jAppender %d command: %s" % (idx, cmd)) + node.account.ssh(cmd) + + @property + def start_cmd(self): + cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableLog4jAppender" \ + " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + if self.max_messages > 0: + cmd += " --max-messages %s" % str(self.max_messages) + + cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log &" + return cmd + + def stop_node(self, node): + node.account.kill_process("VerifiableKafkaLog4jAppender", allow_fail=False) + if self.worker_threads is None: + return + + # block until the corresponding thread exits + if len(self.worker_threads) >= self.idx(node): + # Need to guard this because stop is preemptively called before the worker threads are added and started + self.worker_threads[self.idx(node) - 1].join() + + def clean_node(self, node): + node.account.kill_process("VerifiableKafkaLog4jAppender", clean_shutdown=False, allow_fail=False) + node.account.ssh("rm -rf /mnt/kafka_log4j_appender.log", allow_fail=False) http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/tests/log4j_appender_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/log4j_appender_test.py b/tests/kafkatest/tests/log4j_appender_test.py new file mode 100644 index 0000000..0875dbe --- /dev/null +++ b/tests/kafkatest/tests/log4j_appender_test.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender + +import time + +TOPIC = "topic-log4j-appender" +MAX_MESSAGES = 100 + +class Log4jAppenderTest(KafkaTest): + """ + Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic + """ + def __init__(self, test_context): + super(Log4jAppenderTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + TOPIC: {'partitions': 1, 'replication-factor': 1} + }) + self.num_nodes = 1 + + self.appender = KafkaLog4jAppender(self.test_context, self.num_nodes, self.kafka, TOPIC, MAX_MESSAGES) + self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_nodes, kafka=self.kafka, topic=TOPIC, consumer_timeout_ms=1000) + + def test_log4j_appender(self): + """ + Tests if KafkaLog4jAppender is producing to Kafka topic + :return: None + """ + self.appender.start() + self.appender.wait() + + t0 = time.time() + self.consumer.start() + node = self.consumer.nodes[0] + + wait_until(lambda: self.consumer.alive(node), + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") + self.logger.info("consumer started in %s seconds " % str(time.time() - t0)) + + # Verify consumed messages count + expected_lines_count = MAX_MESSAGES * 2 # two times to account for new lines introduced by log4j + wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count, timeout_sec=10, + err_msg="Timed out waiting to consume expected number of messages.") + + self.consumer.stop_node(node) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/utils/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py new file mode 100644 index 0000000..cff6d2b --- /dev/null +++ b/tests/kafkatest/utils/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/utils/remote_account.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/utils/remote_account.py b/tests/kafkatest/utils/remote_account.py new file mode 100644 index 0000000..b69a591 --- /dev/null +++ b/tests/kafkatest/utils/remote_account.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def file_exists(node, file): + """Quick and dirty check for existence of remote file.""" + try: + node.account.ssh("cat " + file, allow_fail=False) + return True + except: + return False + + +def line_count(node, file): + """Return the line count of file on node""" + out = [line for line in node.account.ssh_capture("wc -l %s" % file)] + if len(out) != 1: + raise Exception("Expected single line of output from wc -l") + + return int(out[0].strip().split(" ")[0]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java new file mode 100644 index 0000000..e78f96a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.tools; + + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.log4j.Logger; +import org.apache.log4j.PropertyConfigurator; + +import java.io.IOException; +import java.util.Properties; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +/** + * Primarily intended for use with system testing, this appender produces message + * to Kafka on each "append" request. For example, this helps with end-to-end tests + * of KafkaLog4jAppender. + * + * When used as a command-line tool, it appends increasing integers. It will produce a + * fixed number of messages unless the default max-messages -1 is used, in which case + * it appends indefinitely. + */ + +public class VerifiableLog4jAppender { + Logger logger = Logger.getLogger(VerifiableLog4jAppender.class); + + // If maxMessages < 0, log until the process is killed externally + private long maxMessages = -1; + + // Hook to trigger logging thread to stop logging messages + private volatile boolean stopLogging = false; + + /** Get the command-line argument parser. */ + private static ArgumentParser argParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("verifiable-log4j-appender") + .defaultHelp(true) + .description("This tool produces increasing integers to the specified topic using KafkaLog4jAppender."); + + parser.addArgument("--topic") + .action(store()) + .required(true) + .type(String.class) + .metavar("TOPIC") + .help("Produce messages to this topic."); + + parser.addArgument("--broker-list") + .action(store()) + .required(true) + .type(String.class) + .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") + .dest("brokerList") + .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); + + parser.addArgument("--max-messages") + .action(store()) + .required(false) + .setDefault(-1) + .type(Integer.class) + .metavar("MAX-MESSAGES") + .dest("maxMessages") + .help("Produce this many messages. If -1, produce messages until the process is killed externally."); + + parser.addArgument("--acks") + .action(store()) + .required(false) + .setDefault("-1") + .type(String.class) + .choices("0", "1", "-1") + .metavar("ACKS") + .help("Acks required on each produced message. See Kafka docs on request.required.acks for details."); + + return parser; + } + + /** Construct a VerifiableLog4jAppender object from command-line arguments. */ + public static VerifiableLog4jAppender createFromArgs(String[] args) { + ArgumentParser parser = argParser(); + VerifiableLog4jAppender producer = null; + + try { + Namespace res = parser.parseArgs(args); + + int maxMessages = res.getInt("maxMessages"); + String topic = res.getString("topic"); + + + Properties props = new Properties(); + props.setProperty("log4j.rootLogger", "INFO, KAFKA"); + props.setProperty("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender"); + props.setProperty("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); + props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); + props.setProperty("log4j.appender.KAFKA.BrokerList", res.getString("brokerList")); + props.setProperty("log4j.appender.KAFKA.Topic", topic); + props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks")); + props.setProperty("log4j.appender.KAFKA.SyncSend", "true"); + props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA"); + + producer = new VerifiableLog4jAppender(props, maxMessages); + } catch (ArgumentParserException e) { + if (args.length == 0) { + parser.printHelp(); + System.exit(0); + } else { + parser.handleError(e); + System.exit(1); + } + } + + return producer; + } + + + public VerifiableLog4jAppender(Properties props, int maxMessages) { + this.maxMessages = maxMessages; + PropertyConfigurator.configure(props); + } + + public static void main(String[] args) throws IOException { + + final VerifiableLog4jAppender appender = createFromArgs(args); + boolean infinite = appender.maxMessages < 0; + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Trigger main thread to stop producing messages + appender.stopLogging = true; + } + }); + + long maxMessages = infinite ? Long.MAX_VALUE: appender.maxMessages; + for (long i = 0; i < maxMessages; i++) { + if (appender.stopLogging) { + break; + } + appender.append(String.format("%d", i)); + } + } + + private void append(String msg) { + logger.info(msg); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java index b04876f..b195093 100644 --- a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java @@ -291,7 +291,8 @@ public class VerifiableProducer { }); ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs); - for (int i = 0; i < producer.maxMessages || infinite; i++) { + long maxMessages = infinite ? Long.MAX_VALUE: producer.maxMessages; + for (long i = 0; i < maxMessages; i++) { if (producer.stopProducing) { break; }
