Repository: kafka
Updated Branches:
  refs/heads/0.10.2 e3f4cdd0e -> 2b19ad9d8


KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to use 
new sasl.jaas_config property instead of static JAAS configuration file when 
used with SASL_PLAINTEXT.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>, Ismael Juma 
<ism...@juma.me.uk>

Closes #2323 from rajinisivaram/KAFKA-4580

(cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b19ad9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b19ad9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b19ad9d

Branch: refs/heads/0.10.2
Commit: 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f
Parents: e3f4cdd
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Tue Jan 17 18:42:55 2017 +0000
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Jan 17 18:43:25 2017 +0000

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py    |  4 +--
 .../services/security/security_config.py        | 28 +++++++++++++++-----
 .../services/security/templates/jaas.conf       |  4 +++
 tests/kafkatest/services/verifiable_consumer.py |  5 ++--
 tests/kafkatest/services/verifiable_producer.py |  9 +++----
 5 files changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 17ddb6b..cdc46cd 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
 
         # Add security properties to the config. If security protocol is not 
specified,
         # use the default in the template properties.
-        self.security_config = 
self.kafka.security_config.client_config(prop_file)
+        self.security_config = 
self.kafka.security_config.client_config(prop_file, node)
+        self.security_config.setup_node(node)
 
         prop_file += str(self.security_config)
         return prop_file
@@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
         prop_file = self.prop_file(node)
         self.logger.info(prop_file)
         node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
-        self.security_config.setup_node(node)
 
         # Create and upload log properties
         log_config = self.render('tools_log4j.properties', 
log_file=ConsoleConsumer.LOG_FILE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 864e0a3..846d9b1 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer):
 
     def __init__(self, context, security_protocol=None, 
interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
-                 zk_sasl=False, template_props=""):
+                 zk_sasl=False, template_props="", static_jaas_conf=True):
         """
         Initialize the security properties for the node and copy
         keystore and truststore to the remote node if the transport protocol 
@@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer):
         self.has_sasl = self.is_sasl(security_protocol) or 
self.is_sasl(interbroker_security_protocol) or zk_sasl
         self.has_ssl = self.is_ssl(security_protocol) or 
self.is_ssl(interbroker_security_protocol)
         self.zk_sasl = zk_sasl
+        self.static_jaas_conf = static_jaas_conf
         self.properties = {
             'security.protocol' : security_protocol,
             'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer):
             'sasl.kerberos.service.name' : 'kafka'
         }
 
-    def client_config(self, template_props=""):
-        return SecurityConfig(self.context, self.security_protocol, 
client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
+    def client_config(self, template_props="", node=None):
+        # If node is not specified, use static jaas config which will be 
created later.
+        # Otherwise use static JAAS configuration files with SASL_SSL and 
sasl.jaas.config
+        # property with SASL_PLAINTEXT so that both code paths are tested by 
existing tests.
+        # Note that this is an artibtrary choice and it is possible to run all 
tests with
+        # either static or dynamic jaas config files if required.
+        static_jaas_conf = node is None or (self.has_sasl and self.has_ssl)
+        return SecurityConfig(self.context, self.security_protocol, 
client_sasl_mechanism=self.client_sasl_mechanism, 
template_props=template_props, static_jaas_conf=static_jaas_conf)
 
     def setup_ssl(self, node):
         node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, 
allow_fail=False)
@@ -175,8 +182,12 @@ class SecurityConfig(TemplateRenderer):
         jaas_conf = self.render(jaas_conf_file,  node=node, 
is_ibm_jdk=is_ibm_jdk,
                                 SecurityConfig=SecurityConfig,
                                 
client_sasl_mechanism=self.client_sasl_mechanism,
-                                
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
-        node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
+                                
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms,
+                                static_jaas_conf=self.static_jaas_conf)
+        if self.static_jaas_conf:
+            node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
+        else:
+            self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " 
\\\n")
         if self.has_sasl_kerberos:
             node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, 
SecurityConfig.KEYTAB_PATH)
             node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, 
SecurityConfig.KRB5CONF_PATH)
@@ -251,7 +262,10 @@ class SecurityConfig(TemplateRenderer):
     @property
     def kafka_opts(self):
         if self.has_sasl:
-            return "\"-Djava.security.auth.login.config=%s 
-Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, 
SecurityConfig.KRB5CONF_PATH)
+            if self.static_jaas_conf:
+                return "\"-Djava.security.auth.login.config=%s 
-Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, 
SecurityConfig.KRB5CONF_PATH)
+            else:
+                return "\"-Djava.security.krb5.conf=%s\"" % 
SecurityConfig.KRB5CONF_PATH
         else:
             return ""
 
@@ -265,6 +279,8 @@ class SecurityConfig(TemplateRenderer):
         """
         if self.security_protocol == SecurityConfig.PLAINTEXT:
             return ""
+        if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' 
not in self.properties:
+            raise Exception("JAAS configuration property has not yet been 
initialized")
         config_lines = (prefix + key + "=" + value for key, value in 
self.properties.iteritems())
         # Extra blank lines ensure this can be appended/prepended safely
         return "\n".join(itertools.chain([""], config_lines, [""]))

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/security/templates/jaas.conf
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/templates/jaas.conf 
b/tests/kafkatest/services/security/templates/jaas.conf
index 3667f87..e251145 100644
--- a/tests/kafkatest/services/security/templates/jaas.conf
+++ b/tests/kafkatest/services/security/templates/jaas.conf
@@ -12,7 +12,9 @@
   */
 
 
+{% if static_jaas_conf %}
 KafkaClient {
+{% endif %}
 {% if client_sasl_mechanism == "GSSAPI" %}
 {% if is_ibm_jdk %}
     com.ibm.security.auth.module.Krb5LoginModule required debug=false
@@ -37,6 +39,7 @@ KafkaClient {
        password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}";
 {% endif %}
 
+{% if static_jaas_conf %}
 };
 
 KafkaServer {
@@ -102,3 +105,4 @@ Server {
 {% endif %}
 };
 {% endif %}
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index c593e2a..090bcda 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -148,8 +148,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
BackgroundThreadService):
         self.enable_autocommit = enable_autocommit
         self.assignment_strategy = assignment_strategy
         self.prop_file = ""
-        self.security_config = 
kafka.security_config.client_config(self.prop_file)
-        self.prop_file += str(self.security_config)
         self.stop_timeout_sec = stop_timeout_sec
 
         self.event_handlers = {}
@@ -171,6 +169,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
BackgroundThreadService):
         node.account.create_file(VerifiableConsumer.LOG4J_CONFIG, log_config)
 
         # Create and upload config file
+        self.security_config = 
self.kafka.security_config.client_config(self.prop_file, node)
+        self.security_config.setup_node(node)
+        self.prop_file += str(self.security_config)
         self.logger.info("verifiable_consumer.properties:")
         self.logger.info(self.prop_file)
         node.account.create_file(VerifiableConsumer.CONFIG_FILE, 
self.prop_file)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index 205143e..d873e1f 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -83,10 +83,6 @@ class VerifiableProducer(KafkaPathResolverMixin, 
BackgroundThreadService):
         self.acks = acks
         self.stop_timeout_sec = stop_timeout_sec
 
-    @property
-    def security_config(self):
-        return self.kafka.security_config.client_config()
-
     def prop_file(self, node):
         idx = self.idx(node)
         prop_file = str(self.security_config)
@@ -104,6 +100,10 @@ class VerifiableProducer(KafkaPathResolverMixin, 
BackgroundThreadService):
         log_config = self.render('tools_log4j.properties', 
log_file=VerifiableProducer.LOG_FILE)
         node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
 
+        # Configure security
+        self.security_config = 
self.kafka.security_config.client_config(node=node)
+        self.security_config.setup_node(node)
+
         # Create and upload config file
         producer_prop_file = self.prop_file(node)
         if self.acks is not None:
@@ -112,7 +112,6 @@ class VerifiableProducer(KafkaPathResolverMixin, 
BackgroundThreadService):
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, 
producer_prop_file)
-        self.security_config.setup_node(node)
 
         cmd = self.start_cmd(node, idx)
         self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))

Reply via email to