Repository: kafka Updated Branches: refs/heads/trunk c9114488b -> 2adeb214b
KAFKA-3077: Enable KafkaLog4jAppender to work with SASL enabled brokers Author: Ashish Singh <[email protected]> Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #740 from SinghAsDev/KAFKA-3077 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2adeb214 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2adeb214 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2adeb214 Branch: refs/heads/trunk Commit: 2adeb214b1e366e36deef045f8049406f7b3773d Parents: c911448 Author: Ashish Singh <[email protected]> Authored: Mon Jan 11 23:14:14 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jan 11 23:14:14 2016 -0800 ---------------------------------------------------------------------- .../kafka/log4jappender/KafkaLog4jAppender.java | 42 +++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2adeb214/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java ---------------------------------------------------------------------- diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index dbbee3c..5759105 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.helpers.LogLog; @@ -51,6 +52,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton { private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; + private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME; private String brokerList = null; private String topic = null; @@ -61,6 +63,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton { private String sslKeystoreType = null; private String sslKeystoreLocation = null; private String sslKeystorePassword = null; + private String saslKerberosServiceName = null; + private String clientJaasConfPath = null; + private String kerb5ConfPath = null; private int retries = 0; private int requiredNumAcks = Integer.MAX_VALUE; @@ -155,6 +160,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton { this.sslKeystoreLocation = sslKeystoreLocation; } + public void setSaslKerberosServiceName(String saslKerberosServiceName) { + this.saslKerberosServiceName = saslKerberosServiceName; + } + + public void setClientJaasConfPath(String clientJaasConfPath) { + this.clientJaasConfPath = clientJaasConfPath; + } + + public void setKerb5ConfPath(String kerb5ConfPath) { + this.kerb5ConfPath = kerb5ConfPath; + } + public String getSslKeystoreLocation() { return sslKeystoreLocation; } @@ -167,6 +184,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton { return sslKeystorePassword; } + public String getSaslKerberosServiceName() { + return saslKerberosServiceName; + } + + public String getClientJaasConfPath() { + return clientJaasConfPath; + } + + public String getKerb5ConfPath() { + return kerb5ConfPath; + } + @Override public void activateOptions() { // check for config parameter validity @@ -183,9 +212,11 @@ public class KafkaLog4jAppender extends AppenderSkeleton { props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks)); if (retries > 0) props.put(RETRIES_CONFIG, retries); - if (securityProtocol != null && sslTruststoreLocation != null && - sslTruststorePassword != null) { + if (securityProtocol != null) { props.put(SECURITY_PROTOCOL, securityProtocol); + } + if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && + sslTruststorePassword != null) { props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation); props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword); @@ -196,6 +227,13 @@ public class KafkaLog4jAppender extends AppenderSkeleton { props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword); } } + if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) { + props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName); + System.setProperty("java.security.auth.login.config", clientJaasConfPath); + if (kerb5ConfPath != null) { + System.setProperty("java.security.krb5.conf", kerb5ConfPath); + } + } props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
