KAFKA-3382: Add system test for ReplicationVerificationTool Author: Ashish Singh <[email protected]>
Reviewers: Geoff Anderson <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1160 from SinghAsDev/KAFKA-3382 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ada3b1f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ada3b1f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ada3b1f Branch: refs/heads/0.10.0 Commit: 0ada3b1fc215bb8efdf5c7ae27eb52b29e0fbbdc Parents: 346df72 Author: Ashish Singh <[email protected]> Authored: Thu Apr 28 15:49:01 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Apr 28 15:49:01 2016 -0700 ---------------------------------------------------------------------- .../services/replica_verification_tool.py | 81 ++++++++++++++++++ tests/kafkatest/services/verifiable_producer.py | 8 +- .../tests/tools/replica_verification_test.py | 88 ++++++++++++++++++++ 3 files changed, 176 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0ada3b1f/tests/kafkatest/services/replica_verification_tool.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py new file mode 100644 index 0000000..f6374fb --- /dev/null +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.security.security_config import SecurityConfig + +import re + +class ReplicaVerificationTool(BackgroundThreadService): + + logs = { + "producer_log": { + "path": "/mnt/replica_verification_tool.log", + "collect_default": False} + } + + def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT"): + super(ReplicaVerificationTool, self).__init__(context, num_nodes) + + self.kafka = kafka + self.topic = topic + self.report_interval_ms = report_interval_ms + self.security_protocol = security_protocol + self.security_config = SecurityConfig(security_protocol) + self.partition_lag = {} + + def _worker(self, idx, node): + cmd = self.start_cmd(node) + self.logger.debug("ReplicaVerificationTool %d command: %s" % (idx, cmd)) + self.security_config.setup_node(node) + for line in node.account.ssh_capture(cmd): + self.logger.debug("Parsing line:{}".format(line)) + + parsed = re.search('.*max lag is (.+?) for partition \[(.+?)\] at', line) + if parsed: + lag = int(parsed.group(1)) + topic_partition = parsed.group(2) + self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag)) + self.partition_lag[topic_partition] = lag + + def get_lag_for_partition(self, topic, partition): + """ + Get latest lag for given topic-partition + + Args: + topic: a topic + partition: a partition of the topic + """ + topic_partition = topic + ',' + str(partition) + lag = self.partition_lag[topic_partition] + self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag)) + return lag + + def start_cmd(self, node): + cmd = "/opt/%s/bin/" % kafka_dir(node) + cmd += "kafka-run-class.sh kafka.tools.ReplicaVerificationTool" + cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms) + + cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &" + return cmd + + def stop_node(self, node): + node.account.kill_process("java", clean_shutdown=True, allow_fail=True) + + def clean_node(self, node): + node.account.kill_process("java", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/0ada3b1f/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 414da84..500410f 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -43,7 +43,7 @@ class VerifiableProducer(BackgroundThreadService): } def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, - message_validator=is_int, compression_types=None, version=TRUNK): + message_validator=is_int, compression_types=None, version=TRUNK, acks=None): """ :param max_messages is a number of messages to be produced per producer :param message_validator checks for an expected format of messages produced. There are @@ -71,6 +71,7 @@ class VerifiableProducer(BackgroundThreadService): self.acked_values = [] self.not_acked_values = [] self.produced_count = {} + self.acks = acks @property @@ -96,6 +97,9 @@ class VerifiableProducer(BackgroundThreadService): # Create and upload config file producer_prop_file = self.prop_file(node) + if self.acks is not None: + self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks) + producer_prop_file += "\nacks=%s\n" % self.acks self.logger.info("verifiable_producer.properties:") self.logger.info(producer_prop_file) node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file) @@ -156,6 +160,8 @@ class VerifiableProducer(BackgroundThreadService): cmd += " --throughput %s" % str(self.throughput) if self.message_validator == is_int_with_prefix: cmd += " --value-prefix %s" % str(idx) + if self.acks is not None: + cmd += " --acks %s\n" % str(self.acks) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) http://git-wip-us.apache.org/repos/asf/kafka/blob/0ada3b1f/tests/kafkatest/tests/tools/replica_verification_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py new file mode 100644 index 0000000..1b625e9 --- /dev/null +++ b/tests/kafkatest/tests/tools/replica_verification_test.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from kafkatest.services.verifiable_producer import VerifiableProducer + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.replica_verification_tool import ReplicaVerificationTool + +TOPIC = "topic-replica-verification" +REPORT_INTERVAL_MS = 1000 + +class ReplicaVerificationToolTest(Test): + """ + Tests ReplicaVerificationTool + """ + def __init__(self, test_context): + super(ReplicaVerificationToolTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 2 + self.messages_received_count = 0 + self.topics = { + TOPIC: {'partitions': 1, 'replication-factor': 2} + } + + self.zk = ZookeeperService(test_context, self.num_zk) + self.kafka = None + self.producer = None + self.replica_verifier = None + + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + def start_replica_verification_tool(self, security_protocol): + self.replica_verifier = ReplicaVerificationTool(self.test_context, 1, self.kafka, TOPIC, report_interval_ms=REPORT_INTERVAL_MS, security_protocol=security_protocol) + self.replica_verifier.start() + + def start_producer(self, max_messages, acks, timeout): + # This will produce to kafka cluster + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages) + current_acked = self.producer.num_acked + self.logger.info("current_acked = %s" % current_acked) + self.producer.start() + wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout, + err_msg="Timeout awaiting messages to be produced and acked") + + def stop_producer(self): + self.producer.stop() + + def test_replica_lags(self, security_protocol='PLAINTEXT'): + """ + Tests ReplicaVerificationTool + :return: None + """ + self.start_kafka(security_protocol, security_protocol) + self.start_replica_verification_tool(security_protocol) + self.start_producer(max_messages=10, acks=-1, timeout=15) + # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, + err_msg="Timed out waiting to reach zero replica lags.") + self.stop_producer() + + self.start_producer(max_messages=1000, acks=0, timeout=5) + # Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10, + err_msg="Timed out waiting to reach non-zero number of replica lags.") \ No newline at end of file
