KAFKA-2693: Ducktape tests for SASL/PLAIN and multiple mechanisms

Run a sanity test with SASL/PLAIN and a couple of replication tests with 
SASL/PLAIN and multiple mechanisms.

Author: Rajini Sivaram <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava 
<[email protected]>

Closes #1282 from rajinisivaram/KAFKA-2693


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cea01af1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cea01af1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cea01af1

Branch: refs/heads/0.10.0
Commit: cea01af125a33b81f973a96501fe41ca9d698197
Parents: 4ab4e4a
Author: Rajini Sivaram <[email protected]>
Authored: Fri Apr 29 09:41:12 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Fri Apr 29 09:41:12 2016 -0700

----------------------------------------------------------------------
 .../sanity_checks/test_console_consumer.py      |  5 +-
 tests/kafkatest/services/kafka/kafka.py         | 14 +--
 .../services/kafka/templates/kafka.properties   |  5 +-
 .../services/security/security_config.py        | 27 ++++--
 .../security/templates/gssapi_jaas.conf         | 86 ------------------
 .../services/security/templates/jaas.conf       | 95 ++++++++++++++++++++
 tests/kafkatest/tests/core/replication_test.py  |  7 +-
 7 files changed, 136 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py 
b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 139c74a..d6a152a 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -45,11 +45,14 @@ class ConsoleConsumerTest(Test):
         self.zk.start()
 
     @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
+    @parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
     @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 
'SASL_SSL'])
-    def test_lifecycle(self, security_protocol, new_consumer=True):
+    def test_lifecycle(self, security_protocol, new_consumer=True, 
sasl_mechanism='GSSAPI'):
         """Check that console consumer starts/stops properly, and that we are 
capturing log output."""
 
         self.kafka.security_protocol = security_protocol
+        self.kafka.client_sasl_mechanism = sasl_mechanism
+        self.kafka.interbroker_sasl_mechanism = sasl_mechanism
         self.kafka.start()
 
         self.consumer.security_protocol = security_protocol

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 33ece35..a74bb00 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -63,7 +63,8 @@ class KafkaService(JmxMixin, Service):
     }
 
     def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAINTEXT, 
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
-                 sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, 
authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, 
jmx_object_names=None,
+                 client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
+                 authorizer_class_name=None, topics=None, version=TRUNK, 
quota_config=None, jmx_object_names=None,
                  jmx_attributes=[], zk_connect_timeout=5000):
         """
         :type context
@@ -78,7 +79,8 @@ class KafkaService(JmxMixin, Service):
 
         self.security_protocol = security_protocol
         self.interbroker_security_protocol = interbroker_security_protocol
-        self.sasl_mechanism = sasl_mechanism
+        self.client_sasl_mechanism = client_sasl_mechanism
+        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         self.topics = topics
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
@@ -108,7 +110,9 @@ class KafkaService(JmxMixin, Service):
 
     @property
     def security_config(self):
-        return SecurityConfig(self.security_protocol, 
self.interbroker_security_protocol, zk_sasl = self.zk.zk_sasl , 
sasl_mechanism=self.sasl_mechanism)
+        return SecurityConfig(self.security_protocol, 
self.interbroker_security_protocol,
+                              zk_sasl = self.zk.zk_sasl,
+                              
client_sasl_mechanism=self.client_sasl_mechanism, 
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
 
     def open_port(self, protocol):
         self.port_mappings[protocol] = 
self.port_mappings[protocol]._replace(open=True)
@@ -163,9 +167,7 @@ class KafkaService(JmxMixin, Service):
         # TODO - clean up duplicate configuration logic
         prop_file = cfg.render()
         prop_file += self.render('kafka.properties', node=node, 
broker_id=self.idx(node),
-                                 security_config=self.security_config,
-                                 
interbroker_security_protocol=self.interbroker_security_protocol,
-                                 sasl_mechanism=self.sasl_mechanism)
+                                 security_config=self.security_config)
         return prop_file
 
     def start_cmd(self, node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index a718ee2..1e4f17c 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -50,7 +50,7 @@ quota.producer.bytes.per.second.overrides={{ 
quota_config.quota_producer_bytes_p
 quota.consumer.bytes.per.second.overrides={{ 
quota_config.quota_consumer_bytes_per_second_overrides }}
 {% endif %}
 
-security.inter.broker.protocol={{ interbroker_security_protocol }}
+security.inter.broker.protocol={{ 
security_config.interbroker_security_protocol }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
@@ -59,7 +59,8 @@ ssl.keystore.type=JKS
 ssl.truststore.location=/mnt/security/test.truststore.jks
 ssl.truststore.password=test-ts-passwd
 ssl.truststore.type=JKS
-sasl.mechanism={{ sasl_mechanism }}
+sasl.mechanism.inter.broker.protocol={{ 
security_config.interbroker_sasl_mechanism }}
+sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
 sasl.kerberos.service.name=kafka
 {% if authorizer_class_name is not none %}
 ssl.client.auth=required

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 1bbabd2..d7cc3c0 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -75,7 +75,9 @@ class SecurityConfig(TemplateRenderer):
 
     ssl_stores = Keytool.generate_keystore_truststore('.')
 
-    def __init__(self, security_protocol=None, 
interbroker_security_protocol=None, sasl_mechanism=SASL_MECHANISM_GSSAPI, 
zk_sasl=False, template_props=""):
+    def __init__(self, security_protocol=None, 
interbroker_security_protocol=None,
+                 client_sasl_mechanism=SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
+                 zk_sasl=False, template_props=""):
         """
         Initialize the security properties for the node and copy
         keystore and truststore to the remote node if the transport protocol 
@@ -104,13 +106,14 @@ class SecurityConfig(TemplateRenderer):
             'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
             'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
             'ssl.truststore.password' : 
SecurityConfig.ssl_stores['ssl.truststore.password'],
-            'sasl.mechanism' : sasl_mechanism,
+            'sasl.mechanism' : client_sasl_mechanism,
+            'sasl.mechanism.inter.broker.protocol' : 
interbroker_sasl_mechanism,
             'sasl.kerberos.service.name' : 'kafka'
         }
 
 
     def client_config(self, template_props=""):
-        return SecurityConfig(self.security_protocol, 
sasl_mechanism=self.sasl_mechanism, template_props=template_props)
+        return SecurityConfig(self.security_protocol, 
client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
 
     def setup_node(self, node):
         if self.has_ssl:
@@ -120,13 +123,15 @@ class SecurityConfig(TemplateRenderer):
 
         if self.has_sasl:
             node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, 
allow_fail=False)
-            jaas_conf_file = self.sasl_mechanism.lower() + "_jaas.conf"
+            jaas_conf_file = "jaas.conf"
             java_version = node.account.ssh_capture("java -version")
             if any('IBM' in line for line in java_version):
                 is_ibm_jdk = True
             else:
                 is_ibm_jdk = False
-            jaas_conf = self.render(jaas_conf_file,  node=node, 
is_ibm_jdk=is_ibm_jdk)
+            jaas_conf = self.render(jaas_conf_file,  node=node, 
is_ibm_jdk=is_ibm_jdk,
+                                    
client_sasl_mechanism=self.client_sasl_mechanism,
+                                    
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
             node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
             if self.has_sasl_kerberos:
                 node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, 
SecurityConfig.KEYTAB_PATH)
@@ -159,12 +164,20 @@ class SecurityConfig(TemplateRenderer):
         return self.properties['security.protocol']
 
     @property
-    def sasl_mechanism(self):
+    def client_sasl_mechanism(self):
         return self.properties['sasl.mechanism']
 
     @property
+    def interbroker_sasl_mechanism(self):
+        return self.properties['sasl.mechanism.inter.broker.protocol']
+
+    @property
+    def enabled_sasl_mechanisms(self):
+        return set([self.client_sasl_mechanism, 
self.interbroker_sasl_mechanism])
+
+    @property
     def has_sasl_kerberos(self):
-        return self.has_sasl and self.sasl_mechanism == 
SecurityConfig.SASL_MECHANISM_GSSAPI
+        return self.has_sasl and (SecurityConfig.SASL_MECHANISM_GSSAPI in 
self.enabled_sasl_mechanisms)
 
     @property
     def kafka_opts(self):

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/services/security/templates/gssapi_jaas.conf
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/templates/gssapi_jaas.conf 
b/tests/kafkatest/services/security/templates/gssapi_jaas.conf
deleted file mode 100644
index 6a629d9..0000000
--- a/tests/kafkatest/services/security/templates/gssapi_jaas.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-
-{% if is_ibm_jdk %}
-
-KafkaClient {
-    com.ibm.security.auth.module.Krb5LoginModule required debug=false
-    credsType=both
-    useKeytab="file:/mnt/security/keytab"
-    principal="[email protected]";
-};
-
-KafkaServer {
-    com.ibm.security.auth.module.Krb5LoginModule required debug=false
-    credsType=both
-    useKeytab="file:/mnt/security/keytab"
-    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
-};
-{% if zk_sasl %}
-Client {
-    com.ibm.security.auth.module.Krb5LoginModule required debug=false
-    credsType=both
-    useKeytab="file:/mnt/security/keytab"
-    principal="[email protected]";
-};
-
-Server {
-       com.ibm.security.auth.module.Krb5LoginModule required debug=false
-       credsType=both
-       useKeyTab="file:/mnt/security/keytab"
-       principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
-};
-{% endif %}
-{% else %}
-
-KafkaClient {
-    com.sun.security.auth.module.Krb5LoginModule required debug=false
-    doNotPrompt=true
-    useKeyTab=true
-    storeKey=true
-    keyTab="/mnt/security/keytab"
-    principal="[email protected]";
-};
-
-KafkaServer {
-    com.sun.security.auth.module.Krb5LoginModule required debug=false
-    doNotPrompt=true
-    useKeyTab=true
-    storeKey=true
-    keyTab="/mnt/security/keytab"
-    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
-};
-
-{% if zk_sasl %}
-Client {
-       com.sun.security.auth.module.Krb5LoginModule required
-       useKeyTab=true
-       keyTab="/mnt/security/keytab"
-       storeKey=true
-       useTicketCache=false
-       principal="[email protected]";
-};
-
-Server {
-       com.sun.security.auth.module.Krb5LoginModule required
-       useKeyTab=true
-       keyTab="/mnt/security/keytab"
-       storeKey=true
-       useTicketCache=false
-       principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
-};
-{% endif %}
-{% endif %}
-
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/services/security/templates/jaas.conf
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/templates/jaas.conf 
b/tests/kafkatest/services/security/templates/jaas.conf
new file mode 100644
index 0000000..fbfa8af
--- /dev/null
+++ b/tests/kafkatest/services/security/templates/jaas.conf
@@ -0,0 +1,95 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+
+
+KafkaClient {
+{% if client_sasl_mechanism == "GSSAPI" %}
+{% if is_ibm_jdk %}
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="[email protected]";
+{% else %}
+    com.sun.security.auth.module.Krb5LoginModule required debug=false
+    doNotPrompt=true
+    useKeyTab=true
+    storeKey=true
+    keyTab="/mnt/security/keytab"
+    principal="[email protected]";
+{% endif %}
+{% elif client_sasl_mechanism == "PLAIN" %}
+       org.apache.kafka.common.security.plain.PlainLoginModule required
+       username="client"
+       password="client-secret";
+{% endif %}
+
+};
+
+KafkaServer {
+{% if "GSSAPI" in enabled_sasl_mechanisms %}
+{% if is_ibm_jdk %}
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+{% else %}
+    com.sun.security.auth.module.Krb5LoginModule required debug=false
+    doNotPrompt=true
+    useKeyTab=true
+    storeKey=true
+    keyTab="/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+{% endif %}
+{% endif %}
+{% if "PLAIN" in enabled_sasl_mechanisms %}
+       org.apache.kafka.common.security.plain.PlainLoginModule required
+       username="kafka"
+       password="kafka-secret"
+       user_client="client-secret"
+       user_kafka="kafka-secret";
+{% endif %}
+};
+
+{% if zk_sasl %}
+Client {
+{% if is_ibm_jdk %}
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="[email protected]";
+{% else %}
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="/mnt/security/keytab"
+   storeKey=true
+   useTicketCache=false
+   principal="[email protected]";
+{% endif %}
+};
+
+Server {
+{% if is_ibm_jdk %}
+   com.ibm.security.auth.module.Krb5LoginModule required debug=false
+   credsType=both
+   useKeyTab="file:/mnt/security/keytab"
+   principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
+{% else %}
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="/mnt/security/keytab"
+   storeKey=true
+   useTicketCache=false
+   principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
+{% endif %}
+};
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea01af1/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py 
b/tests/kafkatest/tests/core/replication_test.py
index 7b360ab..8e9474a 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -128,7 +128,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["controller"],
             security_protocol=["PLAINTEXT", "SASL_SSL"])
-    def test_replication_with_broker_failure(self, failure_mode, 
security_protocol, broker_type):
+    @matrix(failure_mode=["hard_bounce"],
+            broker_type=["leader"],
+            security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], 
interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
+    def test_replication_with_broker_failure(self, failure_mode, 
security_protocol, broker_type, client_sasl_mechanism="GSSAPI", 
interbroker_sasl_mechanism="GSSAPI"):
         """Replication tests.
         These tests verify that replication provides simple durability 
guarantees by checking that data acked by
         brokers is still available for consumption in the face of various 
failure scenarios.
@@ -144,6 +147,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
 
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.client_sasl_mechanism = client_sasl_mechanism
+        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" 
else True
         self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, 
message_validator=is_int)

Reply via email to