Repository: kafka Updated Branches: refs/heads/trunk fbc518554 -> 61c568d83
MINOR: Added simple streams benchmark to system tests Author: Eno Thereska <[email protected]> Reviewers: Geoff Anderson, Guozhang Wang, Ismael Juma Closes #1621 from enothereska/simple-benchmark-streams-system-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/61c568d8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/61c568d8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/61c568d8 Branch: refs/heads/trunk Commit: 61c568d8391de6fceb6c8d6a33d349def8d2ada8 Parents: fbc5185 Author: Eno Thereska <[email protected]> Authored: Mon Jul 18 12:16:58 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Jul 18 12:16:58 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/perf/SimpleBenchmark.java | 14 +- .../streams/streams_simple_benchmark_test.py | 43 ++++++ .../services/performance/streams_performance.py | 132 +++++++++++++++++++ 3 files changed, 184 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/61c568d8/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index a92fb1b..93bb571 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -74,15 +74,19 @@ public class SimpleBenchmark { } public static void main(String[] args) throws Exception { - final File stateDir = new File("/tmp/kafka-streams-simple-benchmark"); - stateDir.mkdir(); + String kafka = args.length > 0 ? args[0] : "localhost:9092"; + String zookeeper = args.length > 1 ? args[1] : "localhost:2181"; + String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark"; + final File stateDir = new File(stateDirStr); + stateDir.mkdir(); final File rocksdbDir = new File(stateDir, "rocksdb-test"); rocksdbDir.mkdir(); - - final String kafka = "localhost:9092"; - final String zookeeper = "localhost:2181"; + System.out.println("SimpleBenchmark instance started"); + System.out.println("kafka=" + kafka); + System.out.println("zookeeper=" + zookeeper); + System.out.println("stateDir=" + stateDir); SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper); http://git-wip-us.apache.org/repos/asf/kafka/blob/61c568d8/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py new file mode 100644 index 0000000..5eb2663 --- /dev/null +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import ignore + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService +import time + +class StreamsSimpleBenchmarkTest(KafkaTest): + """ + Simple benchmark of Kafka Streams. + """ + + def __init__(self, test_context): + super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1) + + self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka) + + def test_simple_benchmark(self): + """ + Run simple Kafka Streams benchmark + """ + + self.driver.start() + self.driver.wait() + self.driver.stop() + node = self.driver.node + node.account.ssh("grep Performance %s" % self.driver.STDOUT_FILE, allow_fail=False) + + return self.driver.collect_data(node) http://git-wip-us.apache.org/repos/asf/kafka/blob/61c568d8/tests/kafkatest/services/performance/streams_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py new file mode 100644 index 0000000..8002bbe --- /dev/null +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os.path +import signal + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin + +# +# Class used to start the simple Kafka Streams benchmark +# +class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service): + """Base class for simple Kafka Streams benchmark""" + + PERSISTENT_ROOT = "/mnt/streams" + # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately + LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log") + STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout") + STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr") + LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid") + + logs = { + "streams_log": { + "path": LOG_FILE, + "collect_default": True}, + "streams_stdout": { + "path": STDOUT_FILE, + "collect_default": True}, + "streams_stderr": { + "path": STDERR_FILE, + "collect_default": True}, + } + + def __init__(self, context, kafka): + super(StreamsSimpleBenchmarkService, self).__init__(context, 1) + self.kafka = kafka + + @property + def node(self): + return self.nodes[0] + + def pids(self, node): + try: + return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)] + except: + return [] + + def stop_node(self, node, clean_shutdown=True): + self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping SimpleBenchmark on " + str(node.account)) + pids = self.pids(node) + sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL + + for pid in pids: + node.account.signal(pid, sig, allow_fail=True) + if clean_shutdown: + for pid in pids: + wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit") + + node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) + + def wait(self): + for node in self.nodes: + for pid in self.pids(node): + wait_until(lambda: not node.account.alive(pid), timeout_sec=600, err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit") + + def clean_node(self, node): + node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + + def start_cmd(self, node): + args = {} + args['kafka'] = self.kafka.bootstrap_servers() + args['zk'] = self.kafka.zk.connect_setting() + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark " \ + " %(kafka)s %(zk)s %(state_dir)s " \ + " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd + + def start_node(self, node): + node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) + + self.logger.info("Starting SimpleBenchmark process on " + str(node.account)) + results = {} + with node.account.monitor_log(self.STDOUT_FILE) as monitor: + node.account.ssh(self.start_cmd(node)) + monitor.wait_until('SimpleBenchmark instance started', timeout_sec=15, err_msg="Never saw message indicating SimpleBenchmark finished startup on " + str(node.account)) + + if len(self.pids(node)) == 0: + raise RuntimeError("No process ids recorded") + + def collect_data(self, node): + # Collect the data and return it to the framework + output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE) + data = [] + data.append('{') + for line in output: + parts = line.split(':') + data.append('\'') + data.append(parts[0]) + data.append('\'') + data.append(':') + data.append(parts[1]); + data.append('}') + data = ''.join(data) + return data
