Repository: kafka Updated Branches: refs/heads/trunk a35334908 -> 6b4cc2ea2
KAFKA-2771: Added rolling upgrade system test (ducktape) for Secured Cluster Tests rolling upgrade from PLAINTEXT to SSL Author: Ben Stopford <[email protected]> Reviewers: Geoff Anderson, Ismael Juma Closes #496 from benstopford/security-upgrade-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b4cc2ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b4cc2ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b4cc2ea Branch: refs/heads/trunk Commit: 6b4cc2ea2b141b25852e2110ac4a400905154b92 Parents: a353349 Author: Ben Stopford <[email protected]> Authored: Mon Nov 30 14:13:50 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Nov 30 14:13:50 2015 -0800 ---------------------------------------------------------------------- tests/kafkatest/services/console_consumer.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 57 ++++++++- .../services/kafka/templates/kafka.properties | 16 +-- .../kafkatest/services/kafka_log4j_appender.py | 2 +- .../performance/consumer_performance.py | 2 +- .../services/performance/end_to_end_latency.py | 2 +- .../performance/producer_performance.py | 2 +- tests/kafkatest/services/verifiable_consumer.py | 5 +- tests/kafkatest/services/verifiable_producer.py | 2 +- .../kafkatest/tests/produce_consume_validate.py | 8 +- .../tests/security_rolling_upgrade_test.py | 124 +++++++++++++++++++ 11 files changed, 197 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index e42b20e..b8ad8ab 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -164,7 +164,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE args['jmx_port'] = self.jmx_port args['kafka_dir'] = kafka_dir(node) - args['broker_list'] = self.kafka.bootstrap_servers() + args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) args['kafka_opts'] = self.security_config.kafka_opts cmd = "export JMX_PORT=%(jmx_port)s; " \ http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 4669a35..809e87f 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -30,6 +30,9 @@ import signal import subprocess import time import os.path +import collections + +Port = collections.namedtuple('Port', ['name', 'number', 'open']) class KafkaService(JmxMixin, Service): @@ -73,6 +76,13 @@ class KafkaService(JmxMixin, Service): self.topics = topics self.minikdc = None + self.port_mappings = { + 'PLAINTEXT': Port('PLAINTEXT', 9092, False), + 'SSL': Port('SSL', 9093, False), + 'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False), + 'SASL_SSL': Port('SASL_SSL', 9095, False) + } + for node in self.nodes: node.version = version node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)}) @@ -81,11 +91,25 @@ class KafkaService(JmxMixin, Service): def security_config(self): return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, sasl_mechanism=self.sasl_mechanism) - def start(self): + def open_port(self, protocol): + self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True) + + def close_port(self, protocol): + self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False) + + def start_minikdc(self): if self.security_config.has_sasl_kerberos: if self.minikdc is None: self.minikdc = MiniKdc(self.context, self.nodes) self.minikdc.start() + else: + self.minikdc = None + + def start(self): + self.open_port(self.security_protocol) + self.open_port(self.interbroker_security_protocol) + + self.start_minikdc() Service.start(self) # Create topics if necessary @@ -97,17 +121,32 @@ class KafkaService(JmxMixin, Service): topic_cfg["topic"] = topic self.create_topic(topic_cfg) + def set_protocol_and_port(self, node): + listeners = [] + advertised_listeners = [] + + for protocol in self.port_mappings: + port = self.port_mappings[protocol] + if port.open: + listeners.append(port.name + "://:" + str(port.number)) + advertised_listeners.append(port.name + "://" + node.account.hostname + ":" + str(port.number)) + + self.listeners = ','.join(listeners) + self.advertised_listeners = ','.join(advertised_listeners) + def prop_file(self, node): cfg = KafkaConfig(**node.config) cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting() + self.set_protocol_and_port(node) + # TODO - clean up duplicate configuration logic prop_file = cfg.render() prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node), - security_config=self.security_config, - interbroker_security_protocol=self.interbroker_security_protocol, - sasl_mechanism=self.sasl_mechanism) + security_config=self.security_config, + interbroker_security_protocol=self.interbroker_security_protocol, + sasl_mechanism=self.sasl_mechanism) return prop_file def start_cmd(self, node): @@ -308,9 +347,15 @@ class KafkaService(JmxMixin, Service): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) - def bootstrap_servers(self): + def bootstrap_servers(self, protocol='PLAINTEXT'): """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... This is the format expected by many config files. """ - return ','.join([node.account.hostname + ":9092" for node in self.nodes]) + port_mapping = self.port_mappings[protocol] + self.logger.info("Bootstrap client port is: " + str(port_mapping.number)) + + if not port_mapping.open: + raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping)) + + return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index e938ac8..a2baac1 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -17,13 +17,9 @@ advertised.host.name={{ node.account.hostname }} -{% if security_protocol == interbroker_security_protocol %} -listeners={{ security_protocol }}://:9092 -advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092 -{% else %} -listeners={{ security_protocol }}://:9092,{{ interbroker_security_protocol }}://:9093 -advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092,{{ interbroker_security_protocol }}://{{ node.account.hostname }}:9093 -{% endif %} + +listeners={{ listeners }} +advertised.listeners={{ advertised_listeners }} num.network.threads=3 num.io.threads=8 @@ -65,3 +61,9 @@ ssl.truststore.password=test-ts-passwd ssl.truststore.type=JKS sasl.mechanism={{ sasl_mechanism }} sasl.kerberos.service.name=kafka + +{% if replica_lag is defined %} +replica.lag.time.max.ms={{replica_lag}} +{% endif %} + + http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka_log4j_appender.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index af65eea..0cc39c0 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -45,7 +45,7 @@ class KafkaLog4jAppender(BackgroundThreadService): def start_cmd(self, node): cmd = "/opt/%s/bin/" % kafka_dir(node) cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" - cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol)) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/consumer_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 4d24628..f8289bc 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -96,7 +96,7 @@ class ConsumerPerformanceService(PerformanceService): if self.new_consumer: args['new-consumer'] = "" - args['broker-list'] = self.kafka.bootstrap_servers() + args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) else: args['zookeeper'] = self.kafka.zk.connect_setting() http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/end_to_end_latency.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index e7147c8..049eebc 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -49,7 +49,7 @@ class EndToEndLatencyService(PerformanceService): security_config_file = "" args.update({ 'zk_connect': self.kafka.zk.connect_setting(), - 'bootstrap_servers': self.kafka.bootstrap_servers(), + 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'security_config_file': security_config_file, 'kafka_dir': kafka_dir(node) }) http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/producer_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index b94aab6..7cbc7bb 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -47,7 +47,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): def _worker(self, idx, node): args = self.args.copy() args.update({ - 'bootstrap_servers': self.kafka.bootstrap_servers(), + 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'jmx_port': self.jmx_port, 'client_id': self.client_id, 'kafka_directory': kafka_dir(node) http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/verifiable_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 23b0586..51013c0 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -223,9 +223,10 @@ class VerifiableConsumer(BackgroundThreadService): cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \ - " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \ - (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout, + " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \ + (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout, "--enable-autocommit" if self.enable_autocommit else "") + if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index c0dec4d..62c4002 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -118,7 +118,7 @@ class VerifiableProducer(BackgroundThreadService): cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \ - " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol)) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) if self.throughput > 0: http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/tests/produce_consume_validate.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index 3d5d565..e01c70f 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -38,7 +38,7 @@ class ProduceConsumeValidateTest(Test): wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10, err_msg="Producer failed to start in a reasonable amount of time.") self.consumer.start() - wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10, + wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=30, err_msg="Consumer failed to start in a reasonable amount of time.") def stop_producer_and_consumer(self): @@ -51,19 +51,19 @@ class ProduceConsumeValidateTest(Test): # Check that producer is still successfully producing currently_acked = self.producer.num_acked - wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10, + wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=30, err_msg="Expected producer to still be producing.") self.producer.stop() self.consumer.wait() - def run_produce_consume_validate(self, core_test_action=None): + def run_produce_consume_validate(self, core_test_action=None, *args): """Top-level template for simple produce/consume/validate tests.""" self.start_producer_and_consumer() if core_test_action is not None: - core_test_action() + core_test_action(*args) self.stop_producer_and_consumer() self.validate() http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/tests/security_rolling_upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py new file mode 100644 index 0000000..279cd26 --- /dev/null +++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer, is_int +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from ducktape.mark import matrix +import time +import random + + +class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): + """Tests a rolling upgrade from PLAINTEXT to a secured cluster + """ + + def __init__(self, test_context): + super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.producer_throughput = 100 + self.num_producers = 1 + self.num_consumers = 1 + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2}}) + self.zk.start() + + #reduce replica.lag.time.max.ms due to KAFKA-2827 + self.kafka.replica_lag = 2000 + + def create_producer_and_consumer(self): + self.producer = VerifiableProducer( + self.test_context, self.num_producers, self.kafka, self.topic, + throughput=self.producer_throughput) + + self.consumer = ConsoleConsumer( + self.test_context, self.num_consumers, self.kafka, self.topic, + consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True) + + self.consumer.group_id = "unique-test-group-" + str(random.random()) + + def bounce(self): + #Sleeps reduce the intermittent failures reported in KAFKA-2891. Should be removed once resolved. + for node in self.kafka.nodes: + self.kafka.stop_node(node) + time.sleep(10) + self.kafka.start_node(node) + time.sleep(10) + + def roll_in_secured_settings(self, upgrade_protocol): + self.kafka.interbroker_security_protocol = upgrade_protocol + + # Roll cluster to include inter broker security protocol. + self.kafka.open_port(upgrade_protocol) + self.bounce() + + # Roll cluster to disable PLAINTEXT port + self.kafka.close_port('PLAINTEXT') + self.bounce() + + def open_secured_port(self, upgrade_protocol): + self.kafka.security_protocol = upgrade_protocol + self.kafka.open_port(upgrade_protocol) + self.kafka.start_minikdc() + self.bounce() + + @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"]) + def test_rolling_upgrade_phase_one(self, upgrade_protocol): + """ + Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce + and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port. + """ + self.kafka.interbroker_security_protocol = "PLAINTEXT" + self.kafka.security_protocol = "PLAINTEXT" + self.kafka.start() + + #Create PLAINTEXT producer and consumer + self.create_producer_and_consumer() + + # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run + self.run_produce_consume_validate(self.open_secured_port, upgrade_protocol) + + # Now we can produce and consume via the secured port + self.kafka.security_protocol = upgrade_protocol + self.create_producer_and_consumer() + self.run_produce_consume_validate(lambda: time.sleep(1)) + + @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"]) + def test_rolling_upgrade_phase_two(self, upgrade_protocol): + """ + Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one). + Start an Producer and Consumer via the SECURED port + Rolling upgrade to add inter-broker be the secure protocol + Rolling upgrade again to disable PLAINTEXT + Ensure the producer and consumer ran throughout + """ + #Given we have a broker that has both secure and PLAINTEXT ports open + self.kafka.security_protocol = upgrade_protocol + self.kafka.interbroker_security_protocol = "PLAINTEXT" + self.kafka.start() + + #Create Secured Producer and Consumer + self.create_producer_and_consumer() + + #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout + self.run_produce_consume_validate(self.roll_in_secured_settings, upgrade_protocol)
