Repository: kafka Updated Branches: refs/heads/0.10.2 e3f4cdd0e -> 2b19ad9d8
KAFKA-4580; Use sasl.jaas.config for some system tests Switched console_consumer, verifiable_consumer and verifiable_producer to use new sasl.jaas_config property instead of static JAAS configuration file when used with SASL_PLAINTEXT. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ewen Cheslack-Postava <e...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #2323 from rajinisivaram/KAFKA-4580 (cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a) Signed-off-by: Ismael Juma <ism...@juma.me.uk> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b19ad9d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b19ad9d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b19ad9d Branch: refs/heads/0.10.2 Commit: 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f Parents: e3f4cdd Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Tue Jan 17 18:42:55 2017 +0000 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Tue Jan 17 18:43:25 2017 +0000 ---------------------------------------------------------------------- tests/kafkatest/services/console_consumer.py | 4 +-- .../services/security/security_config.py | 28 +++++++++++++++----- .../services/security/templates/jaas.conf | 4 +++ tests/kafkatest/services/verifiable_consumer.py | 5 ++-- tests/kafkatest/services/verifiable_producer.py | 9 +++---- 5 files changed, 35 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 17ddb6b..cdc46cd 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) # Add security properties to the config. If security protocol is not specified, # use the default in the template properties. - self.security_config = self.kafka.security_config.client_config(prop_file) + self.security_config = self.kafka.security_config.client_config(prop_file, node) + self.security_config.setup_node(node) prop_file += str(self.security_config) return prop_file @@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) prop_file = self.prop_file(node) self.logger.info(prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) - self.security_config.setup_node(node) # Create and upload log properties log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/security/security_config.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 864e0a3..846d9b1 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer): def __init__(self, context, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, - zk_sasl=False, template_props=""): + zk_sasl=False, template_props="", static_jaas_conf=True): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer): self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) self.zk_sasl = zk_sasl + self.static_jaas_conf = static_jaas_conf self.properties = { 'security.protocol' : security_protocol, 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, @@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer): 'sasl.kerberos.service.name' : 'kafka' } - def client_config(self, template_props=""): - return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props) + def client_config(self, template_props="", node=None): + # If node is not specified, use static jaas config which will be created later. + # Otherwise use static JAAS configuration files with SASL_SSL and sasl.jaas.config + # property with SASL_PLAINTEXT so that both code paths are tested by existing tests. + # Note that this is an artibtrary choice and it is possible to run all tests with + # either static or dynamic jaas config files if required. + static_jaas_conf = node is None or (self.has_sasl and self.has_ssl) + return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props, static_jaas_conf=static_jaas_conf) def setup_ssl(self, node): node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False) @@ -175,8 +182,12 @@ class SecurityConfig(TemplateRenderer): jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk, SecurityConfig=SecurityConfig, client_sasl_mechanism=self.client_sasl_mechanism, - enabled_sasl_mechanisms=self.enabled_sasl_mechanisms) - node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf) + enabled_sasl_mechanisms=self.enabled_sasl_mechanisms, + static_jaas_conf=self.static_jaas_conf) + if self.static_jaas_conf: + node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf) + else: + self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n") if self.has_sasl_kerberos: node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH) node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH) @@ -251,7 +262,10 @@ class SecurityConfig(TemplateRenderer): @property def kafka_opts(self): if self.has_sasl: - return "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, SecurityConfig.KRB5CONF_PATH) + if self.static_jaas_conf: + return "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, SecurityConfig.KRB5CONF_PATH) + else: + return "\"-Djava.security.krb5.conf=%s\"" % SecurityConfig.KRB5CONF_PATH else: return "" @@ -265,6 +279,8 @@ class SecurityConfig(TemplateRenderer): """ if self.security_protocol == SecurityConfig.PLAINTEXT: return "" + if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties: + raise Exception("JAAS configuration property has not yet been initialized") config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems()) # Extra blank lines ensure this can be appended/prepended safely return "\n".join(itertools.chain([""], config_lines, [""])) http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/security/templates/jaas.conf ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/security/templates/jaas.conf b/tests/kafkatest/services/security/templates/jaas.conf index 3667f87..e251145 100644 --- a/tests/kafkatest/services/security/templates/jaas.conf +++ b/tests/kafkatest/services/security/templates/jaas.conf @@ -12,7 +12,9 @@ */ +{% if static_jaas_conf %} KafkaClient { +{% endif %} {% if client_sasl_mechanism == "GSSAPI" %} {% if is_ibm_jdk %} com.ibm.security.auth.module.Krb5LoginModule required debug=false @@ -37,6 +39,7 @@ KafkaClient { password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}"; {% endif %} +{% if static_jaas_conf %} }; KafkaServer { @@ -102,3 +105,4 @@ Server { {% endif %} }; {% endif %} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/verifiable_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index c593e2a..090bcda 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -148,8 +148,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService): self.enable_autocommit = enable_autocommit self.assignment_strategy = assignment_strategy self.prop_file = "" - self.security_config = kafka.security_config.client_config(self.prop_file) - self.prop_file += str(self.security_config) self.stop_timeout_sec = stop_timeout_sec self.event_handlers = {} @@ -171,6 +169,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService): node.account.create_file(VerifiableConsumer.LOG4J_CONFIG, log_config) # Create and upload config file + self.security_config = self.kafka.security_config.client_config(self.prop_file, node) + self.security_config.setup_node(node) + self.prop_file += str(self.security_config) self.logger.info("verifiable_consumer.properties:") self.logger.info(self.prop_file) node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file) http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 205143e..d873e1f 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -83,10 +83,6 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): self.acks = acks self.stop_timeout_sec = stop_timeout_sec - @property - def security_config(self): - return self.kafka.security_config.client_config() - def prop_file(self, node): idx = self.idx(node) prop_file = str(self.security_config) @@ -104,6 +100,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE) node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config) + # Configure security + self.security_config = self.kafka.security_config.client_config(node=node) + self.security_config.setup_node(node) + # Create and upload config file producer_prop_file = self.prop_file(node) if self.acks is not None: @@ -112,7 +112,6 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService): self.logger.info("verifiable_producer.properties:") self.logger.info(producer_prop_file) node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file) - self.security_config.setup_node(node) cmd = self.start_cmd(node, idx) self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))