Repository: kafka Updated Branches: refs/heads/trunk f72203ee9 -> 6508e63c7
KAFKA-4180; Support clients with different authentication credentials in the same JVM Changed caching in LoginManager to allow one LoginManager per client JAAS configuration. Added test to End2EndAuthorization for SASL Plain and GSSAPI with two consumers with different credentials. Developed with mimaison. Author: Edoardo Comar <eco...@uk.ibm.com> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk> Closes #2293 from edoardocomar/KAFKA-4180d Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6508e63c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6508e63c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6508e63c Branch: refs/heads/trunk Commit: 6508e63c7ab1fbabb322754fabf41eb75bbbcec2 Parents: f72203e Author: Edoardo Comar <eco...@uk.ibm.com> Authored: Wed Jan 11 22:01:34 2017 +0000 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu Jan 12 01:36:31 2017 +0000 ---------------------------------------------------------------------- .../security/authenticator/LoginManager.java | 57 +++++++++---- .../common/security/kerberos/KerberosLogin.java | 59 ++++++------- .../kafka/api/EndToEndAuthorizationTest.scala | 22 ++--- .../api/SaslEndToEndAuthorizationTest.scala | 89 ++++++++++++++++++++ ...SaslGssapiSslEndToEndAuthorizationTest.scala | 36 ++++++++ .../SaslPlainSslEndToEndAuthorizationTest.scala | 19 +---- .../SaslScramSslEndToEndAuthorizationTest.scala | 5 +- .../scala/integration/kafka/api/SaslSetup.scala | 4 +- .../api/SaslSslEndToEndAuthorizationTest.scala | 32 ------- .../api/SslEndToEndAuthorizationTest.scala | 8 +- .../integration/kafka/api/UserQuotaTest.scala | 3 +- .../scala/unit/kafka/utils/JaasTestUtils.scala | 47 +++++++---- 12 files changed, 250 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index 3b1af1c..a28afae 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -25,22 +25,30 @@ import javax.security.auth.login.LoginException; import java.io.IOException; import java.util.ArrayList; import java.util.EnumMap; +import java.util.HashMap; import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.LoginType; import org.apache.kafka.common.security.auth.Login; import org.apache.kafka.common.security.kerberos.KerberosLogin; public class LoginManager { - private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class); + // static configs (broker or client) + private static final EnumMap<LoginType, LoginManager> LOGIN_TYPE_INSTANCES = new EnumMap<>(LoginType.class); + + // dynamic configs (client-only) + private static final Map<Password, LoginManager> JAAS_CONF_INSTANCES = new HashMap<>(); private final Login login; - private final LoginType loginType; + private final Object cacheKey; private int refCount; - private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig) throws IOException, LoginException { - this.loginType = loginType; + private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig, + Password jaasConfigValue) throws IOException, LoginException { + this.cacheKey = jaasConfigValue != null ? jaasConfigValue : loginType; String loginContext = loginType.contextName(); login = hasKerberos ? new KerberosLogin() : new DefaultLogin(); login.configure(configs, jaasConfig, loginContext); @@ -51,8 +59,7 @@ public class LoginManager { * Returns an instance of `LoginManager` and increases its reference count. * * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an - * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for - * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`. + * existing `LoginManager` for the provided `loginType` and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`, if available. * * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more @@ -60,14 +67,28 @@ public class LoginManager { * * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients * (i.e. consumer and producer) + * @param hasKerberos * @param configs configuration as key/value pairs + * @param jaasConfig JAAS Configuration object */ - public static final LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig) throws IOException, LoginException { + public static LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, + Configuration jaasConfig) throws IOException, LoginException { synchronized (LoginManager.class) { - LoginManager loginManager = CACHED_INSTANCES.get(loginType); - if (loginManager == null) { - loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig); - CACHED_INSTANCES.put(loginType, loginManager); + // SASL_JAAS_CONFIG is only supported by clients + LoginManager loginManager; + Password jaasConfigValue = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); + if (loginType == LoginType.CLIENT && jaasConfigValue != null) { + loginManager = JAAS_CONF_INSTANCES.get(jaasConfigValue); + if (loginManager == null) { + loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig, jaasConfigValue); + JAAS_CONF_INSTANCES.put(jaasConfigValue, loginManager); + } + } else { + loginManager = LOGIN_TYPE_INSTANCES.get(loginType); + if (loginManager == null) { + loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig, jaasConfigValue); + LOGIN_TYPE_INSTANCES.put(loginType, loginManager); + } } return loginManager.acquire(); } @@ -94,7 +115,11 @@ public class LoginManager { if (refCount == 0) throw new IllegalStateException("release called on LoginManager with refCount == 0"); else if (refCount == 1) { - CACHED_INSTANCES.remove(loginType); + if (cacheKey instanceof Password) { + JAAS_CONF_INSTANCES.remove(cacheKey); + } else { + LOGIN_TYPE_INSTANCES.remove(cacheKey); + } login.close(); } --refCount; @@ -104,10 +129,10 @@ public class LoginManager { /* Should only be used in tests. */ public static void closeAll() { synchronized (LoginManager.class) { - for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) { - LoginManager loginManager = CACHED_INSTANCES.remove(loginType); - loginManager.login.close(); - } + for (LoginType key : new ArrayList<>(LOGIN_TYPE_INSTANCES.keySet())) + LOGIN_TYPE_INSTANCES.remove(key).login.close(); + for (Password key : new ArrayList<>(JAAS_CONF_INSTANCES.keySet())) + JAAS_CONF_INSTANCES.remove(key).login.close(); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java index d2baf91..32a0929 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java @@ -129,20 +129,20 @@ public class KerberosLogin extends AbstractLogin { } if (!isKrbTicket) { - log.debug("It is not a Kerberos ticket"); + log.debug("[Principal={}]: It is not a Kerberos ticket", principal); t = null; // if no TGT, do not bother with ticket management. return loginContext; } - log.debug("It is a Kerberos ticket"); + log.debug("[Principal={}]: It is a Kerberos ticket", principal); // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development, // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running: // "modprinc -maxlife 3mins <principal>" in kadmin. - t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() { + t = Utils.newThread(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() { public void run() { - log.info("TGT refresh thread started."); + log.info("[Principal={}]: TGT refresh thread started.", principal); while (true) { // renewal thread's main loop. if it exits from here, thread will exit. KerberosTicket tgt = getTGT(); long now = currentWallTime(); @@ -151,7 +151,7 @@ public class KerberosLogin extends AbstractLogin { if (tgt == null) { nextRefresh = now + minTimeBeforeRelogin; nextRefreshDate = new Date(nextRefresh); - log.warn("No TGT found: will try again at {}", nextRefreshDate); + log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate); } else { nextRefresh = getRefreshTime(tgt); long expiry = tgt.getEndTime().getTime(); @@ -173,41 +173,41 @@ public class KerberosLogin extends AbstractLogin { // would cause ticket expiration. if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) { // expiry is before next scheduled refresh). - log.info("Refreshing now because expiry is before next scheduled refresh time."); + log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal); nextRefresh = now; } else { if (nextRefresh < (now + minTimeBeforeRelogin)) { // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN). Date until = new Date(nextRefresh); Date newUntil = new Date(now + minTimeBeforeRelogin); - log.warn("TGT refresh thread time adjusted from {} to {} since the former is sooner " + + log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " + "than the minimum refresh interval ({} seconds) from now.", - until, newUntil, minTimeBeforeRelogin / 1000); + principal, until, newUntil, minTimeBeforeRelogin / 1000); } nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin); } nextRefreshDate = new Date(nextRefresh); if (nextRefresh > expiry) { - log.error("Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." + + log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." + "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.", - nextRefreshDate, expiryDate); + principal, nextRefreshDate, expiryDate); return; } } if (now < nextRefresh) { Date until = new Date(nextRefresh); - log.info("TGT refresh sleeping until: {}", until); + log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until); try { Thread.sleep(nextRefresh - now); } catch (InterruptedException ie) { - log.warn("TGT renewal thread has been interrupted and will exit."); + log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal); return; } } else { - log.error("NextRefresh: {} is in the past: exiting refresh thread. Check" + log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check" + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)." + " Manual intervention will be required for this client to successfully authenticate." - + " Exiting refresh thread.", nextRefreshDate); + + " Exiting refresh thread.", principal, nextRefreshDate); return; } if (isUsingTicketCache) { @@ -215,7 +215,7 @@ public class KerberosLogin extends AbstractLogin { int retry = 1; while (retry >= 0) { try { - log.debug("Running ticket cache refresh command: {} {}", kinitCmd, kinitArgs); + log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs); Shell.execCommand(kinitCmd, kinitArgs); break; } catch (Exception e) { @@ -225,12 +225,13 @@ public class KerberosLogin extends AbstractLogin { try { Thread.sleep(10 * 1000); } catch (InterruptedException ie) { - log.error("Interrupted while renewing TGT, exiting Login thread"); + log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal); return; } } else { - log.warn("Could not renew TGT due to problem running shell command: '" + kinitCmd - + " " + kinitArgs + "'" + "; exception was: " + e + ". Exiting refresh thread.", e); + log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'; " + + "exception was: %s. Exiting refresh thread.", + principal, kinitCmd, kinitArgs, e, e); return; } } @@ -249,16 +250,16 @@ public class KerberosLogin extends AbstractLogin { try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { - log.error("Interrupted during login retry after LoginException:", le); + log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le); throw le; } } else { - log.error("Could not refresh TGT for principal: " + principal + ".", le); + log.error("[Principal={}]: Could not refresh TGT.", principal, le); } } } } catch (LoginException le) { - log.error("Failed to refresh TGT: refresh thread exiting now.", le); + log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le); return; } } @@ -275,7 +276,7 @@ public class KerberosLogin extends AbstractLogin { try { t.join(); } catch (InterruptedException e) { - log.warn("Error while waiting for Login thread to shutdown: " + e, e); + log.warn("[Principal={}]: Error while waiting for Login thread to shutdown.", principal, e); Thread.currentThread().interrupt(); } } @@ -300,8 +301,8 @@ public class KerberosLogin extends AbstractLogin { } String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME); if (jaasServiceName != null && configServiceName != null && !jaasServiceName.equals(configServiceName)) { - String message = "Conflicting serviceName values found in JAAS and Kafka configs " + - "value in JAAS file " + jaasServiceName + ", value in Kafka config " + configServiceName; + String message = String.format("Conflicting serviceName values found in JAAS and Kafka configs " + + "value in JAAS file %s, value in Kafka config %s", jaasServiceName, configServiceName); throw new IllegalArgumentException(message); } @@ -317,8 +318,8 @@ public class KerberosLogin extends AbstractLogin { private long getRefreshTime(KerberosTicket tgt) { long start = tgt.getStartTime().getTime(); long expires = tgt.getEndTime().getTime(); - log.info("TGT valid starting at: {}", tgt.getStartTime()); - log.info("TGT expires: {}", tgt.getEndTime()); + log.info("[Principal={}]: TGT valid starting at: {}", principal, tgt.getStartTime()); + log.info("[Principal={}]: TGT expires: {}", principal, tgt.getEndTime()); long proposedRefresh = start + (long) ((expires - start) * (ticketRenewWindowFactor + (ticketRenewJitter * RNG.nextDouble()))); @@ -345,15 +346,15 @@ public class KerberosLogin extends AbstractLogin { private boolean hasSufficientTimeElapsed() { long now = currentElapsedTime(); if (now - lastLogin < minTimeBeforeRelogin) { - log.warn("Not attempting to re-login since the last re-login was attempted less than {} seconds before.", - minTimeBeforeRelogin / 1000); + log.warn("[Principal={}]: Not attempting to re-login since the last re-login was attempted less than {} seconds before.", + principal, minTimeBeforeRelogin / 1000); return false; } return true; } /** - * Re-login a principal. This method assumes that {@link #login(String)} has happened already. + * Re-login a principal. This method assumes that {@link #login()} has happened already. * @throws javax.security.auth.login.LoginException on a failure */ private synchronized void reLogin() throws LoginException { http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 4d86040..3e391d3 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -30,7 +30,6 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerReco import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.errors.{GroupAuthorizationException, TimeoutException, TopicAuthorizationException} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -75,9 +74,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val kafkaPrincipal: String override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) - protected def kafkaClientSaslMechanism = "GSSAPI" - protected def kafkaServerSaslMechanisms = List("GSSAPI") - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) val topicResource = new Resource(Topic, topic) val groupResource = new Resource(Group, group) @@ -159,12 +155,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Before override def setUp { - securityProtocol match { - case SecurityProtocol.SSL => - startSasl(ZkSasl, null, null) - case _ => - startSasl(Both, List(kafkaClientSaslMechanism), kafkaServerSaslMechanisms) - } super.setUp AclCommand.main(topicBrokerReadAclArgs) servers.foreach { s => @@ -210,7 +200,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas consumeRecords(this.consumers.head, numRecords) } - private def setAclsAndProduce() { + protected def setAclsAndProduce() { AclCommand.main(produceAclArgs) AclCommand.main(consumeAclArgs) servers.foreach { s => @@ -276,8 +266,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas AclCommand.main(deleteDescribeAclArgs) AclCommand.main(deleteWriteAclArgs) - servers.foreach { _ => - TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + servers.foreach { s => + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } } @@ -318,7 +308,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas AclCommand.main(groupAclArgs) servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) } @@ -343,7 +333,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas assertEquals(group, e.groupId()) } } - + private def sendRecords(numRecords: Int, tp: TopicPartition) { val futures = (0 until numRecords).map { i => val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes) @@ -357,7 +347,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } - private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], + protected def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int = 0, topic: String = topic, http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala new file mode 100644 index 0000000..992649a --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import java.io.File +import java.util.Properties + +import kafka.utils.{JaasTestUtils,TestUtils} +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.errors.GroupAuthorizationException +import org.junit.{Before,Test} + +import scala.collection.immutable.List +import scala.collection.JavaConverters._ + +abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { + override protected def securityProtocol = SecurityProtocol.SASL_SSL + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + + protected def kafkaClientSaslMechanism: String + protected def kafkaServerSaslMechanisms: List[String] + + @Before + override def setUp { + startSasl(Both, List(kafkaClientSaslMechanism), kafkaServerSaslMechanisms) + super.setUp + } + + // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests + override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanisms: List[String], + serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) { + // create static config with client login context with credentials for JaasTestUtils 'client2' + super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanisms, serverKeytabFile, clientKeytabFile) + // set dynamic properties with credentials for JaasTestUtils 'client1' + val clientLoginContext = JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, clientKeytabFile) + producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) + consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) + } + + /** + * Test with two consumers, each with different valid SASL credentials. + * The first consumer succeeds because it is allowed by the ACL, + * the second one connects ok, but fails to consume messages due to the ACL. + */ + @Test(timeout = 15000) + def testTwoConsumersWithDifferentSaslCredentials { + setAclsAndProduce() + val consumer1 = consumers.head + + val consumer2Config = new Properties + consumer2Config.putAll(consumerConfig) + // consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path + consumer2Config.remove(SaslConfigs.SASL_JAAS_CONFIG) + + val consumer2 = TestUtils.createNewConsumer(brokerList, + securityProtocol = securityProtocol, + trustStoreFile = trustStoreFile, + saslProperties = saslProperties, + props = Some(consumer2Config)) + consumers += consumer2 + + consumer1.assign(List(tp).asJava) + consumer2.assign(List(tp).asJava) + + consumeRecords(consumer1, numRecords) + + try { + consumeRecords(consumer2) + fail("Expected exception as consumer2 has no access to group") + } catch { + case _: GroupAuthorizationException => //expected + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala new file mode 100644 index 0000000..a9b4a60 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import kafka.server.KafkaConfig +import kafka.utils.JaasTestUtils + +import scala.collection.immutable.List + +class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { + override val clientPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName + override val kafkaPrincipal = JaasTestUtils.KafkaServerPrincipalUnqualifiedName + + override protected def kafkaClientSaslMechanism = "GSSAPI" + override protected def kafkaServerSaslMechanisms = List("GSSAPI") + + // Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the + // client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client + // authentication or SASL authentication with SSL as the transport layer (but not both). + serverConfig.put(KafkaConfig.SslClientAuthProp, "required") + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index 214cd0b..bfccd28 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -16,24 +16,11 @@ */ package kafka.api -import java.io.File -import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.config.SaslConfigs import kafka.utils.JaasTestUtils -class SaslPlainSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { - override protected def securityProtocol = SecurityProtocol.SASL_SSL +class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { override protected def kafkaClientSaslMechanism = "PLAIN" override protected def kafkaServerSaslMechanisms = List("PLAIN") - override val clientPrincipal = "testuser" - override val kafkaPrincipal = "admin" - - // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests - override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanisms: List[String], - serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) { - super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, List()) // create static config without client login contexts - val clientLoginModule = JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, None) - producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginModule) - consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginModule) - } + override val clientPrincipal = JaasTestUtils.KafkaPlainUser + override val kafkaPrincipal = JaasTestUtils.KafkaPlainAdmin } http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index b483884..fe0204a 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -16,15 +16,13 @@ */ package kafka.api -import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.scram.ScramMechanism import kafka.utils.JaasTestUtils import kafka.admin.ConfigCommand import kafka.utils.ZkUtils import scala.collection.JavaConverters._ -class SaslScramSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { - override protected def securityProtocol = SecurityProtocol.SASL_SSL +class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256" override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList override val clientPrincipal = JaasTestUtils.KafkaScramUser @@ -45,5 +43,6 @@ class SaslScramSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { } ConfigCommand.main(configCommandArgs(kafkaPrincipal, kafkaPassword)) ConfigCommand.main(configCommandArgs(clientPrincipal, clientPassword)) + ConfigCommand.main(configCommandArgs(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 765f191..c1e2da2 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -54,8 +54,8 @@ trait SaslSetup { setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, Some(serverKeytabFile), Some(clientKeytabFile)) kdc = new MiniKdc(kdcConf, workDir) kdc.start() - kdc.createPrincipal(serverKeytabFile, "kafka/localhost") - kdc.createPrincipal(clientKeytabFile, "client") + kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") + kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) } else { setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms) } http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala deleted file mode 100644 index 470ec84..0000000 --- a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.api - -import kafka.server.KafkaConfig -import org.apache.kafka.common.protocol.SecurityProtocol - -class SaslSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { - override protected def securityProtocol = SecurityProtocol.SASL_SSL - override val clientPrincipal = "client" - override val kafkaPrincipal = "kafka" - - // Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the - // client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client - // authentication or SASL authentication with SSL as the transport layer (but not both). - serverConfig.put(KafkaConfig.SslClientAuthProp, "required") - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 812359e..365c0ba 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -19,11 +19,17 @@ package kafka.api import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.protocol.SecurityProtocol - +import org.junit.Before class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { override protected def securityProtocol = SecurityProtocol.SSL this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required") override val clientPrincipal = "O=A client,CN=localhost" override val kafkaPrincipal = "O=A server,CN=localhost" + + @Before + override def setUp { + startSasl(ZkSasl, List.empty, List.empty) + super.setUp + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala index 3d5d702..4677c8c 100644 --- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala @@ -19,6 +19,7 @@ import java.util.Properties import kafka.admin.AdminUtils import kafka.server.{KafkaConfig, ConfigEntityName, QuotaId} +import kafka.utils.JaasTestUtils import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Before @@ -30,7 +31,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness { override protected val zkSaslEnabled = false override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) - override val userPrincipal = "client" + override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2 override val producerQuotaId = QuotaId(Some(userPrincipal), None) override val consumerQuotaId = QuotaId(Some(userPrincipal), None) http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index ed08e8a..a7f71b9 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -101,17 +101,25 @@ object JaasTestUtils { private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" private val KafkaServerContextName = "KafkaServer" - private val KafkaServerPrincipal = "kafka/localh...@example.com" + val KafkaServerPrincipalUnqualifiedName = "kafka" + private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + "/localh...@example.com" private val KafkaClientContextName = "KafkaClient" - private val KafkaClientPrincipal = "cli...@example.com" + val KafkaClientPrincipalUnqualifiedName = "client" + private val KafkaClientPrincipal = KafkaClientPrincipalUnqualifiedName + "@EXAMPLE.COM" + val KafkaClientPrincipalUnqualifiedName2 = "client2" + private val KafkaClientPrincipal2 = KafkaClientPrincipalUnqualifiedName2 + "@EXAMPLE.COM" - private val KafkaPlainUser = "testuser" - private val KafkaPlainPassword = "testuser-secret" - private val KafkaPlainAdmin = "admin" - private val KafkaPlainAdminPassword = "admin-secret" + val KafkaPlainUser = "plain-user" + private val KafkaPlainPassword = "plain-user-secret" + val KafkaPlainUser2 = "plain-user2" + val KafkaPlainPassword2 = "plain-user2-secret" + val KafkaPlainAdmin = "plain-admin" + private val KafkaPlainAdminPassword = "plain-admin-secret" val KafkaScramUser = "scram-user" val KafkaScramPassword = "scram-user-secret" + val KafkaScramUser2 = "scram-user2" + val KafkaScramPassword2 = "scram-user2-secret" val KafkaScramAdmin = "scram-admin" val KafkaScramAdminPassword = "scram-admin-secret" @@ -135,8 +143,9 @@ object JaasTestUtils { jaasFile.getCanonicalPath } + // Returns the dynamic configuration, using credentials for user #1 def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String = - kafkaClientModule(mechanism, keytabLocation).toString + kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString private def zkSections: Seq[JaasSection] = Seq( new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), @@ -158,7 +167,7 @@ object JaasTestUtils { KafkaPlainAdmin, KafkaPlainAdminPassword, debug = false, - Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> KafkaPlainPassword)).toJaasModule + Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> KafkaPlainPassword, KafkaPlainUser2 -> KafkaPlainPassword2)).toJaasModule case "SCRAM-SHA-256" | "SCRAM-SHA-512" => ScramLoginModule( KafkaScramAdmin, @@ -169,33 +178,41 @@ object JaasTestUtils { new JaasSection(KafkaServerContextName, modules) } - def kafkaClientModule(mechanism: String, keytabLocation: Option[File]): JaasModule = { + // consider refactoring if more mechanisms are added + private def kafkaClientModule(mechanism: String, + keytabLocation: Option[File], clientPrincipal: String, + plainUser: String, plainPassword: String, + scramUser: String, scramPassword: String): JaasModule = { mechanism match { case "GSSAPI" => Krb5LoginModule( useKeyTab = true, storeKey = true, keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath, - principal = KafkaClientPrincipal, + principal = clientPrincipal, debug = true, serviceName = Some("kafka") ).toJaasModule case "PLAIN" => PlainLoginModule( - KafkaPlainUser, - KafkaPlainPassword + plainUser, + plainPassword ).toJaasModule case "SCRAM-SHA-256" | "SCRAM-SHA-512" => ScramLoginModule( - KafkaScramUser, - KafkaScramPassword + scramUser, + scramPassword ).toJaasModule case mechanism => throw new IllegalArgumentException("Unsupported client mechanism " + mechanism) } } + /* + * Used for the static JAAS configuration and it uses the credentials for client#2 + */ private def kafkaClientSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { - new JaasSection(KafkaClientContextName, mechanisms.map(m => kafkaClientModule(m, keytabLocation))) + new JaasSection(KafkaClientContextName, mechanisms.map(m => + kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2))) } private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =