Repository: kafka
Updated Branches:
  refs/heads/trunk b4d8668d6 -> 55abe65e0


KAFKA-4590; SASL/SCRAM system tests

Runs sanity test and one replication test using SASL/SCRAM.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>, Ismael Juma 
<ism...@juma.me.uk>

Closes #2355 from rajinisivaram/KAFKA-4590


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55abe65e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55abe65e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55abe65e

Branch: refs/heads/trunk
Commit: 55abe65e0996008d54bd5bb8440906ac4a359937
Parents: b4d8668
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Tue Jan 17 12:55:07 2017 +0000
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Jan 17 12:55:07 2017 +0000

----------------------------------------------------------------------
 .../sanity_checks/test_console_consumer.py      |  2 +-
 tests/kafkatest/services/kafka/kafka.py         |  6 +++++
 .../services/security/security_config.py        | 25 ++++++++++++++++++++
 .../services/security/templates/jaas.conf       |  9 +++++++
 tests/kafkatest/tests/core/replication_test.py  |  6 ++++-
 5 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py 
b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 38db057..066d6d4 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -47,7 +47,7 @@ class ConsoleConsumerTest(Test):
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
     @matrix(security_protocol=['PLAINTEXT', 'SSL'])
     @cluster(num_nodes=4)
-    @parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
+    @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 
'SCRAM-SHA-256', 'SCRAM-SHA-512'])
     @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
     def test_lifecycle(self, security_protocol, new_consumer=True, 
sasl_mechanism='GSSAPI'):
         """Check that console consumer starts/stops properly, and that we are 
capturing log output."""

http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 716c2d2..8ef0f35 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -208,6 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         node.account.create_file(self.LOG4J_CONFIG, 
self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
 
         self.security_config.setup_node(node)
+        self.security_config.setup_credentials(node, self.path, 
self.zk.connect_setting(), broker=True)
 
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start KafkaService on %s with 
command: %s" % (str(node.account), cmd))
@@ -215,6 +216,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             node.account.ssh(cmd)
             monitor.wait_until("Kafka Server.*started", timeout_sec=30, 
backoff_sec=.25, err_msg="Kafka server didn't finish startup")
 
+        # Credentials for inter-broker communication are created before 
starting Kafka.
+        # Client credentials are created after starting Kafka so that both 
loading of
+        # existing credentials from ZK and dynamic update of credentials in 
Kafka are tested.
+        self.security_config.setup_credentials(node, self.path, 
self.zk.connect_setting(), broker=False)
+
         self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
             raise Exception("No process ids recorded on node %s" % str(node))

http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 9b29217..864e0a3 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -94,6 +94,12 @@ class SecurityConfig(TemplateRenderer):
     SASL_SSL = 'SASL_SSL'
     SASL_MECHANISM_GSSAPI = 'GSSAPI'
     SASL_MECHANISM_PLAIN = 'PLAIN'
+    SASL_MECHANISM_SCRAM_SHA_256 = 'SCRAM-SHA-256'
+    SASL_MECHANISM_SCRAM_SHA_512 = 'SCRAM-SHA-512'
+    SCRAM_CLIENT_USER = "kafka-client"
+    SCRAM_CLIENT_PASSWORD = "client-secret"
+    SCRAM_BROKER_USER = "kafka-broker"
+    SCRAM_BROKER_PASSWORD = "broker-secret"
     CONFIG_DIR = "/mnt/security"
     KEYSTORE_PATH = "/mnt/security/test.keystore.jks"
     TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks"
@@ -167,6 +173,7 @@ class SecurityConfig(TemplateRenderer):
         else:
             is_ibm_jdk = False
         jaas_conf = self.render(jaas_conf_file,  node=node, 
is_ibm_jdk=is_ibm_jdk,
+                                SecurityConfig=SecurityConfig,
                                 
client_sasl_mechanism=self.client_sasl_mechanism,
                                 
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
         node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
@@ -181,6 +188,21 @@ class SecurityConfig(TemplateRenderer):
         if self.has_sasl:
             self.setup_sasl(node)
 
+    def setup_credentials(self, node, path, zk_connect, broker):
+        if broker:
+            self.maybe_create_scram_credentials(node, zk_connect, path, 
self.interbroker_sasl_mechanism,
+                 SecurityConfig.SCRAM_BROKER_USER, 
SecurityConfig.SCRAM_BROKER_PASSWORD)
+        else:
+            self.maybe_create_scram_credentials(node, zk_connect, path, 
self.client_sasl_mechanism,
+                 SecurityConfig.SCRAM_CLIENT_USER, 
SecurityConfig.SCRAM_CLIENT_PASSWORD)
+
+    def maybe_create_scram_credentials(self, node, zk_connect, path, 
mechanism, user_name, password):
+        if self.has_sasl and self.is_sasl_scram(mechanism):
+            cmd = "%s --zookeeper %s --entity-name %s --entity-type users 
--alter --add-config %s=[password=%s]" % \
+                  (path.script("kafka-configs.sh", node), zk_connect,
+                  user_name, mechanism, password)
+            node.account.ssh(cmd)
+
     def clean_node(self, node):
         if self.security_protocol != SecurityConfig.PLAINTEXT:
             node.account.ssh("rm -rf %s" % SecurityConfig.CONFIG_DIR, 
allow_fail=False)
@@ -203,6 +225,9 @@ class SecurityConfig(TemplateRenderer):
     def is_sasl(self, security_protocol):
         return security_protocol == SecurityConfig.SASL_PLAINTEXT or 
security_protocol == SecurityConfig.SASL_SSL
 
+    def is_sasl_scram(self, sasl_mechanism):
+        return sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_256 
or sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_512
+
     @property
     def security_protocol(self):
         return self.properties['security.protocol']

http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/security/templates/jaas.conf
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/templates/jaas.conf 
b/tests/kafkatest/services/security/templates/jaas.conf
index fbfa8af..3667f87 100644
--- a/tests/kafkatest/services/security/templates/jaas.conf
+++ b/tests/kafkatest/services/security/templates/jaas.conf
@@ -31,6 +31,10 @@ KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="client"
        password="client-secret";
+{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == 
"SCRAM-SHA-512" %}
+       org.apache.kafka.common.security.scram.ScramLoginModule required
+       username="{{ SecurityConfig.SCRAM_CLIENT_USER }}"
+       password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}";
 {% endif %}
 
 };
@@ -58,6 +62,11 @@ KafkaServer {
        user_client="client-secret"
        user_kafka="kafka-secret";
 {% endif %}
+{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == 
"SCRAM-SHA-512" %}
+       org.apache.kafka.common.security.scram.ScramLoginModule required
+       username="{{ SecurityConfig.SCRAM_BROKER_USER }}"
+       password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}";
+{% endif %}
 };
 
 {% if zk_sasl %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py 
b/tests/kafkatest/tests/core/replication_test.py
index a95e9e5..3e17d56 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -16,6 +16,7 @@
 from ducktape.utils.util import wait_until
 
 from ducktape.mark import matrix
+from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.zookeeper import ZookeeperService
@@ -122,13 +123,16 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["leader"],
-            security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", 
"SASL_SSL"])
+            security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["controller"],
             security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["hard_bounce"],
             broker_type=["leader"],
             security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], 
interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
+    @parametrize(failure_mode="hard_bounce",
+            broker_type="leader",
+            security_protocol="SASL_SSL", 
client_sasl_mechanism="SCRAM-SHA-256", 
interbroker_sasl_mechanism="SCRAM-SHA-512")
     def test_replication_with_broker_failure(self, failure_mode, 
security_protocol, broker_type, client_sasl_mechanism="GSSAPI", 
interbroker_sasl_mechanism="GSSAPI"):
         """Replication tests.
         These tests verify that replication provides simple durability 
guarantees by checking that data acked by

Reply via email to