Repository: kafka Updated Branches: refs/heads/trunk b09663eee -> 5b5f6bbe6
KAFKA-2825: Add controller failover to existing replication tests Author: Anna Povzner <[email protected]> Reviewers: Geoff Anderson Closes #618 from apovzner/kafka_2825_01 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b5f6bbe Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b5f6bbe Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b5f6bbe Branch: refs/heads/trunk Commit: 5b5f6bbe68a82ce5eae946e0a1a199e9713a6ff7 Parents: b09663e Author: Anna Povzner <[email protected]> Authored: Thu Dec 3 10:43:44 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Dec 3 10:43:44 2015 -0800 ---------------------------------------------------------------------- tests/kafkatest/services/kafka/kafka.py | 36 ++++++++------- tests/kafkatest/services/zookeeper.py | 22 ++++++++- tests/kafkatest/tests/replication_test.py | 62 +++++++++++++++++--------- 3 files changed, 81 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5f6bbe/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 809e87f..b2dc260 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -321,21 +321,9 @@ class KafkaService(JmxMixin, Service): def leader(self, topic, partition=0): """ Get the leader replica for the given topic and partition. """ - kafka_dir = KAFKA_TRUNK - cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " %\ - (kafka_dir, self.zk.connect_setting()) - cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition) - self.logger.debug(cmd) - - node = self.zk.nodes[0] - self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic)) - partition_state = None - for line in node.account.ssh_capture(cmd): - # loop through all lines in the output, but only hold on to the first match - if partition_state is None: - match = re.match("^({.+})$", line) - if match is not None: - partition_state = match.groups()[0] + self.logger.debug("Querying zookeeper to find leader replica for topic: \n%s" % (topic)) + zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition) + partition_state = self.zk.query(zk_path) if partition_state is None: raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) @@ -358,4 +346,20 @@ class KafkaService(JmxMixin, Service): if not port_mapping.open: raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping)) - return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes]) \ No newline at end of file + return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes]) + + def controller(self): + """ Get the controller node + """ + self.logger.debug("Querying zookeeper to find controller broker") + controller_info = self.zk.query("/controller") + + if controller_info is None: + raise Exception("Error finding controller info") + + controller_info = json.loads(controller_info) + self.logger.debug(controller_info) + + controller_idx = int(controller_info["brokerid"]) + self.logger.info("Controller's ID: %d" % (controller_idx)) + return self.get_node(controller_idx) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5f6bbe/tests/kafkatest/services/zookeeper.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index cae4268..5b64750 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -16,10 +16,11 @@ from ducktape.services.service import Service -from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK import subprocess import time +import re class ZookeeperService(Service): @@ -83,3 +84,22 @@ class ZookeeperService(Service): def connect_setting(self): return ','.join([node.account.hostname + ':2181' for node in self.nodes]) + + def query(self, path): + """ + Queries zookeeper for data associated with 'path' and returns all fields in the schema + """ + kafka_dir = KAFKA_TRUNK + cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \ + (kafka_dir, self.connect_setting(), path) + self.logger.debug(cmd) + + node = self.nodes[0] + result = None + for line in node.account.ssh_capture(cmd): + # loop through all lines in the output, but only hold on to the first match + if result is None: + match = re.match("^({.+})$", line) + if match is not None: + result = match.groups()[0] + return result \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5f6bbe/tests/kafkatest/tests/replication_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index a8f2337..4909a9a 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -25,41 +25,55 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest import signal +def broker_node(test, broker_type): + """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0 + """ + if broker_type == "leader": + node = test.kafka.leader(test.topic, partition=0) + elif broker_type == "controller": + node = test.kafka.controller() + else: + raise Exception("Unexpected broker type %s." % (broker_type)) + + return node -def clean_shutdown(test): - """Discover leader node for our topic and shut it down cleanly.""" - test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM) +def clean_shutdown(test, broker_type): + """Discover broker node of requested type and shut it down cleanly. + """ + node = broker_node(test, broker_type) + test.kafka.signal_node(node, sig=signal.SIGTERM) -def hard_shutdown(test): - """Discover leader node for our topic and shut it down with a hard kill.""" - test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL) +def hard_shutdown(test, broker_type): + """Discover broker node of requested type and shut it down with a hard kill.""" + node = broker_node(test, broker_type) + test.kafka.signal_node(node, sig=signal.SIGKILL) -def clean_bounce(test): +def clean_bounce(test, broker_type): """Chase the leader of one partition and restart it cleanly.""" for i in range(5): - prev_leader_node = test.kafka.leader(topic=test.topic, partition=0) - test.kafka.restart_node(prev_leader_node, clean_shutdown=True) + prev_broker_node = broker_node(test, broker_type) + test.kafka.restart_node(prev_broker_node, clean_shutdown=True) -def hard_bounce(test): +def hard_bounce(test, broker_type): """Chase the leader and restart it with a hard kill.""" for i in range(5): - prev_leader_node = test.kafka.leader(topic=test.topic, partition=0) - test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL) + prev_broker_node = broker_node(test, broker_type) + test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL) # Since this is a hard kill, we need to make sure the process is down and that - # zookeeper and the broker cluster have registered the loss of the leader. - # Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic for this. + # zookeeper and the broker cluster have registered the loss of the leader/controller. + # Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this. - def leader_changed(): - current_leader = test.kafka.leader(topic=test.topic, partition=0) - return current_leader is not None and current_leader != prev_leader_node + def role_reassigned(): + current_elected_broker = broker_node(test, broker_type) + return current_elected_broker is not None and current_elected_broker != prev_broker_node - wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5) - wait_until(leader_changed, timeout_sec=10, backoff_sec=.5) - test.kafka.start_node(prev_leader_node) + wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5) + wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5) + test.kafka.start_node(prev_broker_node) failures = { "clean_shutdown": clean_shutdown, @@ -108,8 +122,12 @@ class ReplicationTest(ProduceConsumeValidateTest): @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], + broker_type=["leader"], security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]) - def test_replication_with_broker_failure(self, failure_mode, security_protocol): + @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], + broker_type=["controller"], + security_protocol=["PLAINTEXT", "SASL_SSL"]) + def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. @@ -130,4 +148,4 @@ class ReplicationTest(ProduceConsumeValidateTest): self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) self.kafka.start() - self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self)) + self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))
