KAFKA-4590; SASL/SCRAM system tests Runs sanity test and one replication test using SASL/SCRAM.
Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ewen Cheslack-Postava <e...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #2355 from rajinisivaram/KAFKA-4590 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3f4cdd0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3f4cdd0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3f4cdd0 Branch: refs/heads/0.10.2 Commit: e3f4cdd0e249f78a7f4e8f064533bcd15eb11cbf Parents: 621dff2 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Tue Jan 17 12:55:07 2017 +0000 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Tue Jan 17 12:56:48 2017 +0000 ---------------------------------------------------------------------- .../sanity_checks/test_console_consumer.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 6 +++++ .../services/security/security_config.py | 25 ++++++++++++++++++++ .../services/security/templates/jaas.conf | 9 +++++++ tests/kafkatest/tests/core/replication_test.py | 6 ++++- 5 files changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e3f4cdd0/tests/kafkatest/sanity_checks/test_console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 38db057..066d6d4 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -47,7 +47,7 @@ class ConsoleConsumerTest(Test): @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @matrix(security_protocol=['PLAINTEXT', 'SSL']) @cluster(num_nodes=4) - @parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN') + @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'): """Check that console consumer starts/stops properly, and that we are capturing log output.""" http://git-wip-us.apache.org/repos/asf/kafka/blob/e3f4cdd0/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 716c2d2..8ef0f35 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -208,6 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR)) self.security_config.setup_node(node) + self.security_config.setup_credentials(node, self.path, self.zk.connect_setting(), broker=True) cmd = self.start_cmd(node) self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) @@ -215,6 +216,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): node.account.ssh(cmd) monitor.wait_until("Kafka Server.*started", timeout_sec=30, backoff_sec=.25, err_msg="Kafka server didn't finish startup") + # Credentials for inter-broker communication are created before starting Kafka. + # Client credentials are created after starting Kafka so that both loading of + # existing credentials from ZK and dynamic update of credentials in Kafka are tested. + self.security_config.setup_credentials(node, self.path, self.zk.connect_setting(), broker=False) + self.start_jmx_tool(self.idx(node), node) if len(self.pids(node)) == 0: raise Exception("No process ids recorded on node %s" % str(node)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e3f4cdd0/tests/kafkatest/services/security/security_config.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 9b29217..864e0a3 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -94,6 +94,12 @@ class SecurityConfig(TemplateRenderer): SASL_SSL = 'SASL_SSL' SASL_MECHANISM_GSSAPI = 'GSSAPI' SASL_MECHANISM_PLAIN = 'PLAIN' + SASL_MECHANISM_SCRAM_SHA_256 = 'SCRAM-SHA-256' + SASL_MECHANISM_SCRAM_SHA_512 = 'SCRAM-SHA-512' + SCRAM_CLIENT_USER = "kafka-client" + SCRAM_CLIENT_PASSWORD = "client-secret" + SCRAM_BROKER_USER = "kafka-broker" + SCRAM_BROKER_PASSWORD = "broker-secret" CONFIG_DIR = "/mnt/security" KEYSTORE_PATH = "/mnt/security/test.keystore.jks" TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks" @@ -167,6 +173,7 @@ class SecurityConfig(TemplateRenderer): else: is_ibm_jdk = False jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk, + SecurityConfig=SecurityConfig, client_sasl_mechanism=self.client_sasl_mechanism, enabled_sasl_mechanisms=self.enabled_sasl_mechanisms) node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf) @@ -181,6 +188,21 @@ class SecurityConfig(TemplateRenderer): if self.has_sasl: self.setup_sasl(node) + def setup_credentials(self, node, path, zk_connect, broker): + if broker: + self.maybe_create_scram_credentials(node, zk_connect, path, self.interbroker_sasl_mechanism, + SecurityConfig.SCRAM_BROKER_USER, SecurityConfig.SCRAM_BROKER_PASSWORD) + else: + self.maybe_create_scram_credentials(node, zk_connect, path, self.client_sasl_mechanism, + SecurityConfig.SCRAM_CLIENT_USER, SecurityConfig.SCRAM_CLIENT_PASSWORD) + + def maybe_create_scram_credentials(self, node, zk_connect, path, mechanism, user_name, password): + if self.has_sasl and self.is_sasl_scram(mechanism): + cmd = "%s --zookeeper %s --entity-name %s --entity-type users --alter --add-config %s=[password=%s]" % \ + (path.script("kafka-configs.sh", node), zk_connect, + user_name, mechanism, password) + node.account.ssh(cmd) + def clean_node(self, node): if self.security_protocol != SecurityConfig.PLAINTEXT: node.account.ssh("rm -rf %s" % SecurityConfig.CONFIG_DIR, allow_fail=False) @@ -203,6 +225,9 @@ class SecurityConfig(TemplateRenderer): def is_sasl(self, security_protocol): return security_protocol == SecurityConfig.SASL_PLAINTEXT or security_protocol == SecurityConfig.SASL_SSL + def is_sasl_scram(self, sasl_mechanism): + return sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_256 or sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_512 + @property def security_protocol(self): return self.properties['security.protocol'] http://git-wip-us.apache.org/repos/asf/kafka/blob/e3f4cdd0/tests/kafkatest/services/security/templates/jaas.conf ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/security/templates/jaas.conf b/tests/kafkatest/services/security/templates/jaas.conf index fbfa8af..3667f87 100644 --- a/tests/kafkatest/services/security/templates/jaas.conf +++ b/tests/kafkatest/services/security/templates/jaas.conf @@ -31,6 +31,10 @@ KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; +{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512" %} + org.apache.kafka.common.security.scram.ScramLoginModule required + username="{{ SecurityConfig.SCRAM_CLIENT_USER }}" + password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}"; {% endif %} }; @@ -58,6 +62,11 @@ KafkaServer { user_client="client-secret" user_kafka="kafka-secret"; {% endif %} +{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512" %} + org.apache.kafka.common.security.scram.ScramLoginModule required + username="{{ SecurityConfig.SCRAM_BROKER_USER }}" + password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}"; +{% endif %} }; {% if zk_sasl %} http://git-wip-us.apache.org/repos/asf/kafka/blob/e3f4cdd0/tests/kafkatest/tests/core/replication_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index a95e9e5..3e17d56 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -16,6 +16,7 @@ from ducktape.utils.util import wait_until from ducktape.mark import matrix +from ducktape.mark import parametrize from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService @@ -122,13 +123,16 @@ class ReplicationTest(ProduceConsumeValidateTest): @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], - security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]) + security_protocol=["PLAINTEXT", "SASL_SSL"]) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["controller"], security_protocol=["PLAINTEXT", "SASL_SSL"]) @matrix(failure_mode=["hard_bounce"], broker_type=["leader"], security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"]) + @parametrize(failure_mode="hard_bounce", + broker_type="leader", + security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by