Repository: kafka Updated Branches: refs/heads/trunk 667ff7ef7 -> c1694833d
KAFKA-3490; Multiple version support for ducktape performance tests Author: Ismael Juma <[email protected]> Author: Geoff Anderson <[email protected]> Reviewers: Geoff Anderson <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1173 from ijuma/kafka-3490-multiple-version-support-perf-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1694833 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1694833 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1694833 Branch: refs/heads/trunk Commit: c1694833d5c095e47e5767f38c3e85bbe927a0a7 Parents: 667ff7e Author: Ismael Juma <[email protected]> Authored: Wed Apr 13 13:50:49 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Apr 13 13:50:49 2016 -0700 ---------------------------------------------------------------------- .../kafkatest/benchmarks/core/benchmark_test.py | 85 +++++----- .../sanity_checks/test_performance_services.py | 88 ++++++++++ .../kafkatest/services/performance/__init__.py | 4 +- .../performance/consumer_performance.py | 45 ++++-- .../services/performance/end_to_end_latency.py | 62 ++++++-- .../services/performance/performance.py | 23 +++ .../performance/producer_performance.py | 159 ++++++++++++++----- .../services/templates/tools_log4j.properties | 2 +- vagrant/base.sh | 2 + 9 files changed, 364 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/benchmarks/core/benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 9c2e32d..d252e5d 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -20,7 +20,8 @@ from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService -from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService +from kafkatest.services.kafka.version import TRUNK, KafkaVersion +from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput TOPIC_REP_ONE = "topic-replication-factor-one" @@ -54,11 +55,12 @@ class Benchmark(Test): def setUp(self): self.zk.start() - def start_kafka(self, security_protocol, interbroker_security_protocol): + def start_kafka(self, security_protocol, interbroker_security_protocol, version): self.kafka = KafkaService( self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, + version=version) self.kafka.log_level = "INFO" # We don't DEBUG logging here self.kafka.start() @@ -67,7 +69,8 @@ class Benchmark(Test): @parametrize(acks=-1, topic=TOPIC_REP_THREE) @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL']) - def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'): + def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT', + client_version=str(TRUNK), broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, @@ -76,13 +79,16 @@ class Benchmark(Test): Collect and return aggregate throughput statistics after all messages have been acknowledged. (This runs ProducerPerformance.java under the hood) """ - self.start_kafka(security_protocol, security_protocol) + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) + self.start_kafka(security_protocol, security_protocol, broker_version) # Always generate the same total amount of data nrecords = int(self.target_data_size / message_size) self.producer = ProducerPerformanceService( self.test_context, num_producers, self.kafka, topic=topic, - num_records=nrecords, record_size=message_size, throughput=-1, + num_records=nrecords, record_size=message_size, throughput=-1, version=client_version, settings={ 'acks': acks, 'batch.size': self.batch_size, @@ -92,7 +98,8 @@ class Benchmark(Test): @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None): + def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol=None, + client_version=str(TRUNK), broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. @@ -101,13 +108,16 @@ class Benchmark(Test): (This runs ProducerPerformance.java under the hood) """ + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, - throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, + throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, intermediate_stats=True ) self.producer.run() @@ -135,10 +145,10 @@ class Benchmark(Test): self.logger.info("\n".join(summary)) return data - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) - def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None): + def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol=None, + client_version=str(TRUNK), broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, @@ -148,13 +158,16 @@ class Benchmark(Test): (Under the hood, this simply runs EndToEndLatency.scala) """ + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=10000 + topic=TOPIC_REP_THREE, num_records=10000, version=client_version ) self.perf.run() return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) @@ -162,7 +175,8 @@ class Benchmark(Test): @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True): + def test_producer_and_consumer(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, + client_version=str(TRUNK), broker_version=str(TRUNK)): """ Setup: 1 node zk + 3 node kafka cluster Concurrently produce and consume 10e6 messages with a single producer and a single consumer, @@ -172,15 +186,18 @@ class Benchmark(Test): (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) """ + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) num_records = 10 * 1000 * 1000 # 10e6 self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, - num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.consumer = ConsumerPerformanceService( @@ -200,21 +217,25 @@ class Benchmark(Test): @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1): + def test_consumer_throughput(self, security_protocol, interbroker_security_protocol=None, new_consumer=True, num_consumers=1, + client_version=str(TRUNK), broker_version=str(TRUNK)): """ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions (using new consumer iff new_consumer == True), and report throughput. """ + client_version = KafkaVersion(client_version) + broker_version = KafkaVersion(broker_version) + self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) num_records = 10 * 1000 * 1000 # 10e6 # seed kafka w/messages self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, - num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, version=client_version, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.producer.run() @@ -227,27 +248,5 @@ class Benchmark(Test): self.consumer.run() return compute_aggregate_throughput(self.consumer) - -def throughput(records_per_sec, mb_per_sec): - """Helper method to ensure uniform representation of throughput data""" - return { - "records_per_sec": records_per_sec, - "mb_per_sec": mb_per_sec - } - - -def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms): - """Helper method to ensure uniform representation of latency data""" - return { - "latency_50th_ms": latency_50th_ms, - "latency_99th_ms": latency_99th_ms, - "latency_999th_ms": latency_999th_ms - } - - -def compute_aggregate_throughput(perf): - """Helper method for computing throughput after running a performance service.""" - aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) - aggregate_mbps = sum([r['mbps'] for r in perf.results]) - - return throughput(aggregate_rate, aggregate_mbps) + def validate_versions(self, client_version, broker_version): + assert client_version <= broker_version, "Client version %s should be <= than broker version %s" (client_version, broker_version) http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/sanity_checks/test_performance_services.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py new file mode 100644 index 0000000..16d5d32 --- /dev/null +++ b/tests/kafkatest/sanity_checks/test_performance_services.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.mark import parametrize + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion +from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService +from kafkatest.services.performance import latency, compute_aggregate_throughput + + +class PerformanceServiceTest(Test): + def __init__(self, test_context): + super(PerformanceServiceTest, self).__init__(test_context) + self.record_size = 100 + self.num_records = 10000 + self.topic = "topic" + + self.zk = ZookeeperService(test_context, 1) + + def setUp(self): + self.zk.start() + + # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, + # the overhead should be manageable. + @parametrize(version=str(LATEST_0_8_2)) + @parametrize(version=str(LATEST_0_9), new_consumer=False) + @parametrize(version=str(LATEST_0_9), new_consumer=True) + @parametrize(version=str(TRUNK), new_consumer=False) + @parametrize(version=str(TRUNK), new_consumer=True) + def test_version(self, version=str(LATEST_0_9), new_consumer=False): + """ + Sanity check out producer performance service - verify that we can run the service with a small + number of messages. The actual stats here are pretty meaningless since the number of messages is quite small. + """ + version = KafkaVersion(version) + self.kafka = KafkaService( + self.test_context, 1, + self.zk, topics={self.topic: {'partitions': 1, 'replication-factor': 1}}, version=version) + self.kafka.start() + + # check basic run of producer performance + self.producer_perf = ProducerPerformanceService( + self.test_context, 1, self.kafka, topic=self.topic, + num_records=self.num_records, record_size=self.record_size, + throughput=1000000000, # Set impossibly for no throttling for equivalent behavior between 0.8.X and 0.9.X + version=version, + settings={ + 'acks': 1, + 'batch.size': 8*1024, + 'buffer.memory': 64*1024*1024}) + self.producer_perf.run() + producer_perf_data = compute_aggregate_throughput(self.producer_perf) + + # check basic run of end to end latency + self.end_to_end = EndToEndLatencyService( + self.test_context, 1, self.kafka, + topic=self.topic, num_records=self.num_records, version=version) + self.end_to_end.run() + end_to_end_data = latency(self.end_to_end.results[0]['latency_50th_ms'], self.end_to_end.results[0]['latency_99th_ms'], self.end_to_end.results[0]['latency_999th_ms']) + + # check basic run of consumer performance service + self.consumer_perf = ConsumerPerformanceService( + self.test_context, 1, self.kafka, new_consumer=new_consumer, + topic=self.topic, version=version, messages=self.num_records) + self.consumer_perf.group = "test-consumer-group" + self.consumer_perf.run() + consumer_perf_data = compute_aggregate_throughput(self.consumer_perf) + + return { + "producer_performance": producer_perf_data, + "end_to_end_latency": end_to_end_data, + "consumer_performance": consumer_perf_data + } http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py index a72e3b7..9eddcaa 100644 --- a/tests/kafkatest/services/performance/__init__.py +++ b/tests/kafkatest/services/performance/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from performance import PerformanceService +from performance import PerformanceService, throughput, latency, compute_aggregate_throughput from end_to_end_latency import EndToEndLatencyService from producer_performance import ProducerPerformanceService -from consumer_performance import ConsumerPerformanceService \ No newline at end of file +from consumer_performance import ConsumerPerformanceService http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/consumer_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index f8289bc..def27b1 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -14,8 +14,9 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService -from kafkatest.services.kafka.directory import kafka_dir from kafkatest.services.security.security_config import SecurityConfig +from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0 import os @@ -69,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}): + def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=False, settings={}): super(ConsumerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka self.security_config = kafka.security_config.client_config() @@ -78,6 +79,13 @@ class ConsumerPerformanceService(PerformanceService): self.new_consumer = new_consumer self.settings = settings + assert version >= V_0_9_0_0 or (not new_consumer), \ + "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version) + + security_protocol = self.security_config.security_protocol + assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \ + "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) + # These less-frequently used settings can be updated manually after instantiation self.fetch_size = None self.socket_buffer_size = None @@ -86,6 +94,9 @@ class ConsumerPerformanceService(PerformanceService): self.group = None self.from_latest = None + for node in self.nodes: + node.version = version + @property def args(self): """Dictionary of arguments used to start the Consumer Performance script.""" @@ -127,7 +138,10 @@ class ConsumerPerformanceService(PerformanceService): cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node) for key, value in self.args.items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE + + if node.version >= V_0_9_0_0: + # This is only used for security settings + cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) @@ -136,6 +150,22 @@ class ConsumerPerformanceService(PerformanceService): 'stderr': ConsumerPerformanceService.STDERR_CAPTURE} return cmd + def parse_results(self, line, version): + parts = line.split(',') + if version >= V_0_9_0_0: + result = { + 'total_mb': float(parts[2]), + 'mbps': float(parts[3]), + 'records_per_sec': float(parts[5]), + } + else: + result = { + 'total_mb': float(parts[3]), + 'mbps': float(parts[4]), + 'records_per_sec': float(parts[6]), + } + return result + def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False) @@ -149,11 +179,6 @@ class ConsumerPerformanceService(PerformanceService): last = None for line in node.account.ssh_capture(cmd): last = line - # Parse and save the last line's information - parts = last.split(',') - self.results[idx-1] = { - 'total_mb': float(parts[2]), - 'mbps': float(parts[3]), - 'records_per_sec': float(parts[5]), - } + # Parse and save the last line's information + self.results[idx-1] = self.parse_results(last, node.version) http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/end_to_end_latency.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 049eebc..08eff70 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -17,9 +17,11 @@ from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0 class EndToEndLatencyService(PerformanceService): + MESSAGE_BYTES = 21 # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions logs = { "end_to_end_latency_log": { @@ -27,37 +29,79 @@ class EndToEndLatencyService(PerformanceService): "collect_default": True}, } - def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + def __init__(self, context, num_nodes, kafka, topic, num_records, version=TRUNK, consumer_fetch_max_wait=100, acks=1): super(EndToEndLatencyService, self).__init__(context, num_nodes) self.kafka = kafka self.security_config = kafka.security_config.client_config() + + security_protocol = self.security_config.security_protocol + assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \ + "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) + self.args = { 'topic': topic, 'num_records': num_records, 'consumer_fetch_max_wait': consumer_fetch_max_wait, 'acks': acks, - 'kafka_opts': self.security_config.kafka_opts + 'kafka_opts': self.security_config.kafka_opts, + 'message_bytes': EndToEndLatencyService.MESSAGE_BYTES } - def _worker(self, idx, node): - args = self.args.copy() - self.security_config.setup_node(node) + for node in self.nodes: + node.version = version + + @property + def security_config_file(self): if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: security_config_file = SecurityConfig.CONFIG_DIR + "/security.properties" - node.account.create_file(security_config_file, str(self.security_config)) else: security_config_file = "" + return security_config_file + + def start_cmd(self, node): + args = self.args.copy() args.update({ 'zk_connect': self.kafka.zk.connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), - 'security_config_file': security_config_file, + 'security_config_file': self.security_config_file, 'kafka_dir': kafka_dir(node) }) - cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args - cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(security_config_file)s" % args + if node.version >= V_0_9_0_0: + """ + val brokerList = args(0) + val topic = args(1) + val numMessages = args(2).toInt + val producerAcks = args(3) + val messageLen = args(4).toInt + """ + + cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args + cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(security_config_file)s" % args + else: + """ + val brokerList = args(0) + val zkConnect = args(1) + val topic = args(2) + val numMessages = args(3).toInt + val consumerFetchMaxWait = args(4).toInt + val producerAcks = args(5).toInt + """ + + # Set fetch max wait to 0 to match behavior in later versions + cmd = "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args + cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args + cmd += " | tee /mnt/end-to-end-latency.log" + return cmd + + def _worker(self, idx, node): + self.security_config.setup_node(node) + if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: + node.account.create_file(self.security_config_file, str(self.security_config)) + + cmd = self.start_cmd(node) self.logger.debug("End-to-end latency %d command: %s", idx, cmd) results = {} for line in node.account.ssh_capture(cmd): http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py index 6d286f6..1eab197 100644 --- a/tests/kafkatest/services/performance/performance.py +++ b/tests/kafkatest/services/performance/performance.py @@ -27,3 +27,26 @@ class PerformanceService(BackgroundThreadService): node.account.kill_process("java", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf /mnt/*", allow_fail=False) +def throughput(records_per_sec, mb_per_sec): + """Helper method to ensure uniform representation of throughput data""" + return { + "records_per_sec": records_per_sec, + "mb_per_sec": mb_per_sec + } + + +def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms): + """Helper method to ensure uniform representation of latency data""" + return { + "latency_50th_ms": latency_50th_ms, + "latency_99th_ms": latency_99th_ms, + "latency_999th_ms": latency_999th_ms + } + + +def compute_aggregate_throughput(perf): + """Helper method for computing throughput after running a performance service.""" + aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) + aggregate_mbps = sum([r['mbps'] for r in perf.results]) + + return throughput(aggregate_rate, aggregate_mbps) http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/performance/producer_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 7cbc7bb..f4887ed 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -13,26 +13,56 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.utils.util import wait_until + from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.performance import PerformanceService -import itertools from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK +from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0 + +import os +import subprocess + class ProducerPerformanceService(JmxMixin, PerformanceService): - logs = { - "producer_performance_log": { - "path": "/mnt/producer-performance.log", - "collect_default": True}, - } + PERSISTENT_ROOT = "/mnt/producer_performance" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "producer_performance.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") - def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, + def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings={}, intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]): JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) PerformanceService.__init__(self, context, num_nodes) + + self.logs = { + "producer_performance_stdout": { + "path": ProducerPerformanceService.STDOUT_CAPTURE, + "collect_default": True}, + "producer_performance_stderr": { + "path": ProducerPerformanceService.STDERR_CAPTURE, + "collect_default": True}, + "producer_performance_log": { + "path": ProducerPerformanceService.LOG_FILE, + "collect_default": True}, + "jmx_log": { + "path": "/mnt/jmx_tool.log", + "collect_default": jmx_object_names is not None + } + + } + self.kafka = kafka self.security_config = kafka.security_config.client_config() + + security_protocol = self.security_config.security_protocol + assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \ + "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) + self.args = { 'topic': topic, 'kafka_opts': self.security_config.kafka_opts, @@ -44,7 +74,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): self.intermediate_stats = intermediate_stats self.client_id = client_id - def _worker(self, idx, node): + for node in self.nodes: + node.version = version + + def start_cmd(self, node): args = self.args.copy() args.update({ 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), @@ -52,48 +85,92 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): 'client_id': self.client_id, 'kafka_directory': kafka_dir(node) }) - cmd = "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ + + cmd = "" + + if node.version < TRUNK: + # In order to ensure more consistent configuration between versions, always use the ProducerPerformance + # tool from trunk + cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK + cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK + cmd += "export CLASSPATH; " + + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG + cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args self.security_config.setup_node(node) if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: self.settings.update(self.security_config.properties) + for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) - cmd += " | tee /mnt/producer-performance.log" + cmd += " 2>>%s | tee %s" % (ProducerPerformanceService.STDERR_CAPTURE, ProducerPerformanceService.STDOUT_CAPTURE) + return cmd + + def pids(self, node): + try: + cmd = "jps | grep -i ProducerPerformance | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (subprocess.CalledProcessError, ValueError) as e: + return [] + + def alive(self, node): + return len(self.pids(node)) > 0 + + def _worker(self, idx, node): + + node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=ProducerPerformanceService.LOG_FILE) + node.account.create_file(ProducerPerformanceService.LOG4J_CONFIG, log_config) + + cmd = self.start_cmd(node) self.logger.debug("Producer performance %d command: %s", idx, cmd) - def parse_stats(line): - parts = line.split(',') - return { - 'records': int(parts[0].split()[0]), - 'records_per_sec': float(parts[1].split()[0]), - 'mbps': float(parts[1].split('(')[1].split()[0]), - 'latency_avg_ms': float(parts[2].split()[0]), - 'latency_max_ms': float(parts[3].split()[0]), - 'latency_50th_ms': float(parts[4].split()[0]), - 'latency_95th_ms': float(parts[5].split()[0]), - 'latency_99th_ms': float(parts[6].split()[0]), - 'latency_999th_ms': float(parts[7].split()[0]), - } - last = None + # start ProducerPerformance process producer_output = node.account.ssh_capture(cmd) + wait_until(lambda: self.alive(node), timeout_sec=20, err_msg="ProducerPerformance failed to start") + # block until there is at least one line of output first_line = next(producer_output, None) + if first_line is None: + raise Exception("No output from ProducerPerformance") + + self.start_jmx_tool(idx, node) + wait_until(lambda: not self.alive(node), timeout_sec=1200, err_msg="ProducerPerformance failed to finish") + self.read_jmx_output(idx, node) + + # parse producer output from file + last = None + producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE) + for line in producer_output: + if self.intermediate_stats: + try: + self.stats[idx-1].append(self.parse_stats(line)) + except: + # Sometimes there are extraneous log messages + pass - if first_line is not None: - self.start_jmx_tool(idx, node) - for line in itertools.chain([first_line], producer_output): - if self.intermediate_stats: - try: - self.stats[idx-1].append(parse_stats(line)) - except: - # Sometimes there are extraneous log messages - pass - - last = line - try: - self.results[idx-1] = parse_stats(last) - except: - raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last)) - self.read_jmx_output(idx, node) + last = line + try: + self.results[idx-1] = self.parse_stats(last) + except: + raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last)) + + def parse_stats(self, line): + + parts = line.split(',') + return { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/tests/kafkatest/services/templates/tools_log4j.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties index 6fec1d6..55ae4e0 100644 --- a/tests/kafkatest/services/templates/tools_log4j.properties +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -22,4 +22,4 @@ log4j.appender.FILE.ImmediateFlush=true # Set the append to true log4j.appender.FILE.Append=true log4j.appender.FILE.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n \ No newline at end of file +log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n http://git-wip-us.apache.org/repos/asf/kafka/blob/c1694833/vagrant/base.sh ---------------------------------------------------------------------- diff --git a/vagrant/base.sh b/vagrant/base.sh index d271f87..da7737c 100644 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -63,7 +63,9 @@ get_kafka() { } get_kafka 0.8.2.2 +chmod a+rw /opt/kafka-0.8.2.2 get_kafka 0.9.0.1 +chmod a+rw /opt/kafka-0.9.0.1 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local # VMs, we can just create it if it doesn't exist and use it like we'd use
