Repository: kafka Updated Branches: refs/heads/trunk 2adeb214b -> 3e5afbfa0
KAFKA-3078: Add ducktape tests for KafkaLog4jAppender producing to SASL enabled Kafka cluster Note that KAFKA-3077 will be required to run these tests. Author: Ashish Singh <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #747 from SinghAsDev/KAFKA-3078 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e5afbfa Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e5afbfa Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e5afbfa Branch: refs/heads/trunk Commit: 3e5afbfa0dd4ddfca65fae1f3b2a268ae1ed2025 Parents: 2adeb21 Author: Ashish Singh <[email protected]> Authored: Mon Jan 11 23:15:42 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jan 11 23:15:42 2016 -0800 ---------------------------------------------------------------------- .../kafkatest/services/kafka_log4j_appender.py | 12 +++++-- tests/kafkatest/tests/log4j_appender_test.py | 17 ++++++--- .../kafka/tools/VerifiableLog4jAppender.java | 36 ++++++++++++++++++-- 3 files changed, 56 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tests/kafkatest/services/kafka_log4j_appender.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index 0cc39c0..3732bb0 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -49,10 +49,18 @@ class KafkaLog4jAppender(BackgroundThreadService): if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) - if self.security_protocol == SecurityConfig.SSL: - cmd += " --security-protocol SSL" + if self.security_protocol != SecurityConfig.PLAINTEXT: + cmd += " --security-protocol %s" % str(self.security_protocol) + if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL: cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH) cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password']) + if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \ + self.security_protocol == SecurityConfig.SASL_SSL or \ + self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \ + self.security_protocol == SecurityConfig.SASL_MECHANISM_PLAIN: + cmd += " --sasl-kerberos-service-name %s" % str('kafka') + cmd += " --client-jaas-conf-path %s" % str(SecurityConfig.JAAS_CONF_PATH) + cmd += " --kerb5-conf-path %s" % str(SecurityConfig.KRB5CONF_PATH) cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log &" return cmd http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tests/kafkatest/tests/log4j_appender_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/log4j_appender_test.py b/tests/kafkatest/tests/log4j_appender_test.py index db33d76..42cfeea 100644 --- a/tests/kafkatest/tests/log4j_appender_test.py +++ b/tests/kafkatest/tests/log4j_appender_test.py @@ -35,6 +35,7 @@ class Log4jAppenderTest(Test): super(Log4jAppenderTest, self).__init__(test_context) self.num_zk = 1 self.num_brokers = 1 + self.messages_received_count = 0 self.topics = { TOPIC: {'partitions': 1, 'replication-factor': 1} } @@ -56,13 +57,20 @@ class Log4jAppenderTest(Test): security_protocol=security_protocol) self.appender.start() + def custom_message_validator(self, msg): + if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg: + self.logger.debug("Received message: %s" % msg) + self.messages_received_count += 1 + + def start_consumer(self, security_protocol): - enable_new_consumer = security_protocol == SecurityConfig.SSL + enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, - consumer_timeout_ms=1000, new_consumer=enable_new_consumer) + consumer_timeout_ms=1000, new_consumer=enable_new_consumer, + message_validator=self.custom_message_validator) self.consumer.start() - @matrix(security_protocol=['PLAINTEXT', 'SSL']) + @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) def test_log4j_appender(self, security_protocol='PLAINTEXT'): """ Tests if KafkaLog4jAppender is producing to Kafka topic @@ -79,8 +87,7 @@ class Log4jAppenderTest(Test): timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") # Verify consumed messages count - expected_lines_count = MAX_MESSAGES * 2 # two times to account for new lines introduced by log4j - wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count, timeout_sec=10, + wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10, err_msg="Timed out waiting to consume expected number of messages.") self.consumer.stop() http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java index a48b301..ffbf7dc 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java @@ -21,6 +21,7 @@ import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; @@ -96,7 +97,7 @@ public class VerifiableLog4jAppender { .required(false) .setDefault("PLAINTEXT") .type(String.class) - .choices("PLAINTEXT", "SSL") + .choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL") .metavar("SECURITY-PROTOCOL") .dest("securityProtocol") .help("Security protocol to be used while communicating with Kafka brokers."); @@ -124,6 +125,30 @@ public class VerifiableLog4jAppender { .metavar("CONFIG_FILE") .help("Log4jAppender config properties file."); + parser.addArgument("--sasl-kerberos-service-name") + .action(store()) + .required(false) + .type(String.class) + .metavar("SASL-KERBEROS-SERVICE-NAME") + .dest("saslKerberosServiceName") + .help("Name of sasl kerberos service."); + + parser.addArgument("--client-jaas-conf-path") + .action(store()) + .required(false) + .type(String.class) + .metavar("CLIENT-JAAS-CONF-PATH") + .dest("clientJaasConfPath") + .help("Path of JAAS config file of Kafka client."); + + parser.addArgument("--kerb5-conf-path") + .action(store()) + .required(false) + .type(String.class) + .metavar("KERB5-CONF-PATH") + .dest("kerb5ConfPath") + .help("Path of Kerb5 config file."); + return parser; } @@ -171,11 +196,18 @@ public class VerifiableLog4jAppender { props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks")); props.setProperty("log4j.appender.KAFKA.SyncSend", "true"); final String securityProtocol = res.getString("securityProtocol"); - if (securityProtocol != null && securityProtocol.equals("SSL")) { + if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) { props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol); + } + if (securityProtocol != null && securityProtocol.contains("SSL")) { props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation")); props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword")); } + if (securityProtocol != null && securityProtocol.contains("SASL")) { + props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName")); + props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath")); + props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath")); + } props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA"); if (configFile != null) {
