Repository: kafka Updated Branches: refs/heads/trunk 34a6be2cc -> 69a1cced4
KAFKA-2643: Run mirror maker ducktape tests with SSL and SASL Run tests with SSL, SASL_PLAINTEXT and SASL_SSL. Same security protocol is used for source and target Kafka. Author: Rajini Sivaram <[email protected]> Reviewers: Geoff Andreson, Ben Stopford Closes #559 from rajinisivaram/KAFKA-2643 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69a1cced Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69a1cced Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69a1cced Branch: refs/heads/trunk Commit: 69a1cced49d7d0c805adeb1dfd327f8bb5c7ce9a Parents: 34a6be2 Author: Rajini Sivaram <[email protected]> Authored: Wed Nov 25 15:05:31 2015 -0800 Committer: Gwen Shapira <[email protected]> Committed: Wed Nov 25 15:05:31 2015 -0800 ---------------------------------------------------------------------- tests/kafkatest/services/kafka/kafka.py | 8 ++--- tests/kafkatest/services/mirror_maker.py | 10 +++++++ tests/kafkatest/services/verifiable_producer.py | 6 ++-- tests/kafkatest/tests/mirror_maker_test.py | 31 +++++++++++++++----- 4 files changed, 41 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 15f541d..4669a35 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -71,6 +71,7 @@ class KafkaService(JmxMixin, Service): self.interbroker_security_protocol = interbroker_security_protocol self.sasl_mechanism = sasl_mechanism self.topics = topics + self.minikdc = None for node in self.nodes: node.version = version @@ -82,10 +83,9 @@ class KafkaService(JmxMixin, Service): def start(self): if self.security_config.has_sasl_kerberos: - self.minikdc = MiniKdc(self.context, self.nodes) - self.minikdc.start() - else: - self.minikdc = None + if self.minikdc is None: + self.minikdc = MiniKdc(self.context, self.nodes) + self.minikdc.start() Service.start(self) # Create topics if necessary http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/services/mirror_maker.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py index 0bba115..4386788 100644 --- a/tests/kafkatest/services/mirror_maker.py +++ b/tests/kafkatest/services/mirror_maker.py @@ -18,6 +18,7 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.security.security_config import SecurityConfig import os import subprocess @@ -113,6 +114,7 @@ class MirrorMaker(Service): def start_cmd(self, node): cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node) cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG @@ -147,16 +149,23 @@ class MirrorMaker(Service): node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False) + self.security_config = self.source.security_config.client_config() + self.security_config.setup_node(node) + # Create, upload one consumer config file for source cluster consumer_props = self.render("mirror_maker_consumer.properties") + consumer_props += str(self.security_config) + node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props) self.logger.info("Mirrormaker consumer props:\n" + consumer_props) # Create, upload producer properties file for target cluster producer_props = self.render('mirror_maker_producer.properties') + producer_props += str(self.security_config) self.logger.info("Mirrormaker producer props:\n" + producer_props) node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props) + # Create and upload log properties log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE) node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config) @@ -180,3 +189,4 @@ class MirrorMaker(Service): (self.__class__.__name__, node.account)) node.account.kill_process("java", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) + self.security_config.clean_node(node) http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index b2d2b97..c0dec4d 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -55,15 +55,15 @@ class VerifiableProducer(BackgroundThreadService): node.version = version self.acked_values = [] self.not_acked_values = [] - self.prop_file = "" - self.security_config = kafka.security_config.client_config(self.prop_file) - self.prop_file += str(self.security_config) + def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False) # Create and upload log properties + self.security_config = self.kafka.security_config.client_config(self.prop_file) + self.prop_file += str(self.security_config) log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE) node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config) http://git-wip-us.apache.org/repos/asf/kafka/blob/69a1cced/tests/kafkatest/tests/mirror_maker_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker_test.py index ad252ee..0244f81 100644 --- a/tests/kafkatest/tests/mirror_maker_test.py +++ b/tests/kafkatest/tests/mirror_maker_test.py @@ -21,6 +21,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer, is_int from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.mirror_maker import MirrorMaker +from kafkatest.services.security.minikdc import MiniKdc from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest import time @@ -39,7 +40,6 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): topics={self.topic: {"partitions": 1, "replication-factor": 1}}) self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}) - # This will produce to source kafka cluster self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic, throughput=1000) @@ -52,10 +52,21 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): def setUp(self): # Source cluster self.source_zk.start() - self.source_kafka.start() # Target cluster self.target_zk.start() + + def start_kafka(self, security_protocol): + self.source_kafka.security_protocol = security_protocol + self.source_kafka.interbroker_security_protocol = security_protocol + self.target_kafka.security_protocol = security_protocol + self.target_kafka.interbroker_security_protocol = security_protocol + if self.source_kafka.security_config.has_sasl_kerberos: + minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes) + self.source_kafka.minikdc = minikdc + self.target_kafka.minikdc = minikdc + minikdc.start() + self.source_kafka.start() self.target_kafka.start() def bounce(self, clean_shutdown=True): @@ -98,9 +109,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10, err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages) - @parametrize(new_consumer=True) - @parametrize(new_consumer=False) - def test_simple_end_to_end(self, new_consumer): + @parametrize(security_protocol='PLAINTEXT', new_consumer=False) + @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True]) + def test_simple_end_to_end(self, security_protocol, new_consumer): """ Test end-to-end behavior under non-failure conditions. @@ -112,6 +123,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): - Consume messages from target. - Verify that number of consumed messages matches the number produced. """ + self.start_kafka(security_protocol) + self.consumer.new_consumer = new_consumer + self.mirror_maker.new_consumer = new_consumer self.mirror_maker.start() @@ -126,8 +140,8 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): self.mirror_maker.stop() @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False]) - @matrix(new_consumer=[True], clean_shutdown=[True, False]) - def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True): + @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) + def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'): """ Test end-to-end behavior under failure conditions. @@ -145,6 +159,9 @@ class TestMirrorMakerService(ProduceConsumeValidateTest): # the group until the previous session times out self.consumer.consumer_timeout_ms = 60000 + self.start_kafka(security_protocol) + self.consumer.new_consumer = new_consumer + self.mirror_maker.offsets_storage = offsets_storage self.mirror_maker.new_consumer = new_consumer self.mirror_maker.start()
