Repository: kafka
Updated Branches:
  refs/heads/trunk f72203ee9 -> 6508e63c7


KAFKA-4180; Support clients with different authentication credentials in the 
same JVM

Changed caching in LoginManager to allow one LoginManager per client
JAAS configuration.

Added test to End2EndAuthorization for SASL Plain and GSSAPI with two
consumers with different credentials.

Developed with mimaison.

Author: Edoardo Comar <eco...@uk.ibm.com>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma 
<ism...@juma.me.uk>

Closes #2293 from edoardocomar/KAFKA-4180d


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6508e63c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6508e63c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6508e63c

Branch: refs/heads/trunk
Commit: 6508e63c7ab1fbabb322754fabf41eb75bbbcec2
Parents: f72203e
Author: Edoardo Comar <eco...@uk.ibm.com>
Authored: Wed Jan 11 22:01:34 2017 +0000
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu Jan 12 01:36:31 2017 +0000

----------------------------------------------------------------------
 .../security/authenticator/LoginManager.java    | 57 +++++++++----
 .../common/security/kerberos/KerberosLogin.java | 59 ++++++-------
 .../kafka/api/EndToEndAuthorizationTest.scala   | 22 ++---
 .../api/SaslEndToEndAuthorizationTest.scala     | 89 ++++++++++++++++++++
 ...SaslGssapiSslEndToEndAuthorizationTest.scala | 36 ++++++++
 .../SaslPlainSslEndToEndAuthorizationTest.scala | 19 +----
 .../SaslScramSslEndToEndAuthorizationTest.scala |  5 +-
 .../scala/integration/kafka/api/SaslSetup.scala |  4 +-
 .../api/SaslSslEndToEndAuthorizationTest.scala  | 32 -------
 .../api/SslEndToEndAuthorizationTest.scala      |  8 +-
 .../integration/kafka/api/UserQuotaTest.scala   |  3 +-
 .../scala/unit/kafka/utils/JaasTestUtils.scala  | 47 +++++++----
 12 files changed, 250 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 3b1af1c..a28afae 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -25,22 +25,30 @@ import javax.security.auth.login.LoginException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.LoginType;
 import org.apache.kafka.common.security.auth.Login;
 import org.apache.kafka.common.security.kerberos.KerberosLogin;
 
 public class LoginManager {
 
-    private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = 
new EnumMap<>(LoginType.class);
+    // static configs (broker or client)
+    private static final EnumMap<LoginType, LoginManager> LOGIN_TYPE_INSTANCES 
= new EnumMap<>(LoginType.class);
+
+    // dynamic configs (client-only)
+    private static final Map<Password, LoginManager> JAAS_CONF_INSTANCES = new 
HashMap<>();
 
     private final Login login;
-    private final LoginType loginType;
+    private final Object cacheKey;
     private int refCount;
 
-    private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, 
?> configs, Configuration jaasConfig) throws IOException, LoginException {
-        this.loginType = loginType;
+    private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, 
?> configs, Configuration jaasConfig,
+                         Password jaasConfigValue) throws IOException, 
LoginException {
+        this.cacheKey = jaasConfigValue != null ? jaasConfigValue : loginType;
         String loginContext = loginType.contextName();
         login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
         login.configure(configs, jaasConfig, loginContext);
@@ -51,8 +59,7 @@ public class LoginManager {
      * Returns an instance of `LoginManager` and increases its reference count.
      *
      * `release()` should be invoked when the `LoginManager` is no longer 
needed. This method will try to reuse an
-     * existing `LoginManager` for the provided `mode` if available. However, 
it expects `configs` to be the same for
-     * every invocation and it will ignore them in the case where it's 
returning a cached instance of `LoginManager`.
+     * existing `LoginManager` for the provided `loginType` and 
`SaslConfigs.SASL_JAAS_CONFIG` in `configs`, if available. 
      *
      * This is a bit ugly and it would be nicer if we could pass the 
`LoginManager` to `ChannelBuilders.create` and
      * shut it down when the broker or clients are closed. It's 
straightforward to do the former, but it's more
@@ -60,14 +67,28 @@ public class LoginManager {
      *
      * @param loginType the type of the login context, it should be SERVER for 
the broker and CLIENT for the clients
      *                  (i.e. consumer and producer)
+     * @param hasKerberos 
      * @param configs configuration as key/value pairs
+     * @param jaasConfig JAAS Configuration object
      */
-    public static final LoginManager acquireLoginManager(LoginType loginType, 
boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig) throws 
IOException, LoginException {
+    public static LoginManager acquireLoginManager(LoginType loginType, 
boolean hasKerberos, Map<String, ?> configs,
+                                                   Configuration jaasConfig) 
throws IOException, LoginException {
         synchronized (LoginManager.class) {
-            LoginManager loginManager = CACHED_INSTANCES.get(loginType);
-            if (loginManager == null) {
-                loginManager = new LoginManager(loginType, hasKerberos, 
configs, jaasConfig);
-                CACHED_INSTANCES.put(loginType, loginManager);
+            // SASL_JAAS_CONFIG is only supported by clients
+            LoginManager loginManager;
+            Password jaasConfigValue = (Password) 
configs.get(SaslConfigs.SASL_JAAS_CONFIG);
+            if (loginType == LoginType.CLIENT && jaasConfigValue != null) {
+                loginManager = JAAS_CONF_INSTANCES.get(jaasConfigValue);
+                if (loginManager == null) {
+                    loginManager = new LoginManager(loginType, hasKerberos, 
configs, jaasConfig, jaasConfigValue);
+                    JAAS_CONF_INSTANCES.put(jaasConfigValue, loginManager);
+                }
+            } else {
+                loginManager = LOGIN_TYPE_INSTANCES.get(loginType);
+                if (loginManager == null) {
+                    loginManager = new LoginManager(loginType, hasKerberos, 
configs, jaasConfig, jaasConfigValue);
+                    LOGIN_TYPE_INSTANCES.put(loginType, loginManager);
+                }
             }
             return loginManager.acquire();
         }
@@ -94,7 +115,11 @@ public class LoginManager {
             if (refCount == 0)
                 throw new IllegalStateException("release called on 
LoginManager with refCount == 0");
             else if (refCount == 1) {
-                CACHED_INSTANCES.remove(loginType);
+                if (cacheKey instanceof Password) {
+                    JAAS_CONF_INSTANCES.remove(cacheKey);
+                } else {
+                    LOGIN_TYPE_INSTANCES.remove(cacheKey);
+                }
                 login.close();
             }
             --refCount;
@@ -104,10 +129,10 @@ public class LoginManager {
     /* Should only be used in tests. */
     public static void closeAll() {
         synchronized (LoginManager.class) {
-            for (LoginType loginType : new 
ArrayList<>(CACHED_INSTANCES.keySet())) {
-                LoginManager loginManager = CACHED_INSTANCES.remove(loginType);
-                loginManager.login.close();
-            }
+            for (LoginType key : new 
ArrayList<>(LOGIN_TYPE_INSTANCES.keySet()))
+                LOGIN_TYPE_INSTANCES.remove(key).login.close();
+            for (Password key : new ArrayList<>(JAAS_CONF_INSTANCES.keySet()))
+                JAAS_CONF_INSTANCES.remove(key).login.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index d2baf91..32a0929 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -129,20 +129,20 @@ public class KerberosLogin extends AbstractLogin {
         }
 
         if (!isKrbTicket) {
-            log.debug("It is not a Kerberos ticket");
+            log.debug("[Principal={}]: It is not a Kerberos ticket", 
principal);
             t = null;
             // if no TGT, do not bother with ticket management.
             return loginContext;
         }
-        log.debug("It is a Kerberos ticket");
+        log.debug("[Principal={}]: It is a Kerberos ticket", principal);
 
         // Refresh the Ticket Granting Ticket (TGT) periodically. How often to 
refresh is determined by the
         // TGT's existing expiry date and the configured minTimeBeforeRelogin. 
For testing and development,
         // you can decrease the interval of expiration of tickets (for 
example, to 3 minutes) by running:
         //  "modprinc -maxlife 3mins <principal>" in kadmin.
-        t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() {
+        t = Utils.newThread(String.format("kafka-kerberos-refresh-thread-%s", 
principal), new Runnable() {
             public void run() {
-                log.info("TGT refresh thread started.");
+                log.info("[Principal={}]: TGT refresh thread started.", 
principal);
                 while (true) {  // renewal thread's main loop. if it exits 
from here, thread will exit.
                     KerberosTicket tgt = getTGT();
                     long now = currentWallTime();
@@ -151,7 +151,7 @@ public class KerberosLogin extends AbstractLogin {
                     if (tgt == null) {
                         nextRefresh = now + minTimeBeforeRelogin;
                         nextRefreshDate = new Date(nextRefresh);
-                        log.warn("No TGT found: will try again at {}", 
nextRefreshDate);
+                        log.warn("[Principal={}]: No TGT found: will try again 
at {}", principal, nextRefreshDate);
                     } else {
                         nextRefresh = getRefreshTime(tgt);
                         long expiry = tgt.getEndTime().getTime();
@@ -173,41 +173,41 @@ public class KerberosLogin extends AbstractLogin {
                         // would cause ticket expiration.
                         if ((nextRefresh > expiry) || (now + 
minTimeBeforeRelogin > expiry)) {
                             // expiry is before next scheduled refresh).
-                            log.info("Refreshing now because expiry is before 
next scheduled refresh time.");
+                            log.info("[Principal={}]: Refreshing now because 
expiry is before next scheduled refresh time.", principal);
                             nextRefresh = now;
                         } else {
                             if (nextRefresh < (now + minTimeBeforeRelogin)) {
                                 // next scheduled refresh is sooner than (now 
+ MIN_TIME_BEFORE_LOGIN).
                                 Date until = new Date(nextRefresh);
                                 Date newUntil = new Date(now + 
minTimeBeforeRelogin);
-                                log.warn("TGT refresh thread time adjusted 
from {} to {} since the former is sooner " +
+                                log.warn("[Principal={}]: TGT refresh thread 
time adjusted from {} to {} since the former is sooner " +
                                         "than the minimum refresh interval ({} 
seconds) from now.",
-                                        until, newUntil, minTimeBeforeRelogin 
/ 1000);
+                                        principal, until, newUntil, 
minTimeBeforeRelogin / 1000);
                             }
                             nextRefresh = Math.max(nextRefresh, now + 
minTimeBeforeRelogin);
                         }
                         nextRefreshDate = new Date(nextRefresh);
                         if (nextRefresh > expiry) {
-                            log.error("Next refresh: {} is later than expiry 
{}. This may indicate a clock skew problem." +
+                            log.error("[Principal={}]: Next refresh: {} is 
later than expiry {}. This may indicate a clock skew problem." +
                                     "Check that this host and the KDC hosts' 
clocks are in sync. Exiting refresh thread.",
-                                    nextRefreshDate, expiryDate);
+                                    principal, nextRefreshDate, expiryDate);
                             return;
                         }
                     }
                     if (now < nextRefresh) {
                         Date until = new Date(nextRefresh);
-                        log.info("TGT refresh sleeping until: {}", until);
+                        log.info("[Principal={}]: TGT refresh sleeping until: 
{}", principal, until);
                         try {
                             Thread.sleep(nextRefresh - now);
                         } catch (InterruptedException ie) {
-                            log.warn("TGT renewal thread has been interrupted 
and will exit.");
+                            log.warn("[Principal={}]: TGT renewal thread has 
been interrupted and will exit.", principal);
                             return;
                         }
                     } else {
-                        log.error("NextRefresh: {} is in the past: exiting 
refresh thread. Check"
+                        log.error("[Principal={}]: NextRefresh: {} is in the 
past: exiting refresh thread. Check"
                                 + " clock sync between this host and KDC - 
(KDC's clock is likely ahead of this host)."
                                 + " Manual intervention will be required for 
this client to successfully authenticate."
-                                + " Exiting refresh thread.", nextRefreshDate);
+                                + " Exiting refresh thread.", principal, 
nextRefreshDate);
                         return;
                     }
                     if (isUsingTicketCache) {
@@ -215,7 +215,7 @@ public class KerberosLogin extends AbstractLogin {
                         int retry = 1;
                         while (retry >= 0) {
                             try {
-                                log.debug("Running ticket cache refresh 
command: {} {}", kinitCmd, kinitArgs);
+                                log.debug("[Principal={}]: Running ticket 
cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
                                 Shell.execCommand(kinitCmd, kinitArgs);
                                 break;
                             } catch (Exception e) {
@@ -225,12 +225,13 @@ public class KerberosLogin extends AbstractLogin {
                                     try {
                                         Thread.sleep(10 * 1000);
                                     } catch (InterruptedException ie) {
-                                        log.error("Interrupted while renewing 
TGT, exiting Login thread");
+                                        log.error("[Principal={}]: Interrupted 
while renewing TGT, exiting Login thread", principal);
                                         return;
                                     }
                                 } else {
-                                    log.warn("Could not renew TGT due to 
problem running shell command: '" + kinitCmd
-                                            + " " + kinitArgs + "'" + "; 
exception was: " + e + ". Exiting refresh thread.", e);
+                                    log.warn("[Principal={}]: Could not renew 
TGT due to problem running shell command: '{} {}'; " +
+                                            "exception was: %s. Exiting 
refresh thread.", 
+                                            principal, kinitCmd, kinitArgs, e, 
e);
                                     return;
                                 }
                             }
@@ -249,16 +250,16 @@ public class KerberosLogin extends AbstractLogin {
                                     try {
                                         Thread.sleep(10 * 1000);
                                     } catch (InterruptedException e) {
-                                        log.error("Interrupted during login 
retry after LoginException:", le);
+                                        log.error("[Principal={}]: Interrupted 
during login retry after LoginException:", principal, le);
                                         throw le;
                                     }
                                 } else {
-                                    log.error("Could not refresh TGT for 
principal: " + principal + ".", le);
+                                    log.error("[Principal={}]: Could not 
refresh TGT.", principal, le);
                                 }
                             }
                         }
                     } catch (LoginException le) {
-                        log.error("Failed to refresh TGT: refresh thread 
exiting now.", le);
+                        log.error("[Principal={}]: Failed to refresh TGT: 
refresh thread exiting now.", principal, le);
                         return;
                     }
                 }
@@ -275,7 +276,7 @@ public class KerberosLogin extends AbstractLogin {
             try {
                 t.join();
             } catch (InterruptedException e) {
-                log.warn("Error while waiting for Login thread to shutdown: " 
+ e, e);
+                log.warn("[Principal={}]: Error while waiting for Login thread 
to shutdown.", principal, e);
                 Thread.currentThread().interrupt();
             }
         }
@@ -300,8 +301,8 @@ public class KerberosLogin extends AbstractLogin {
         }
         String configServiceName = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
         if (jaasServiceName != null && configServiceName != null && 
!jaasServiceName.equals(configServiceName)) {
-            String message = "Conflicting serviceName values found in JAAS and 
Kafka configs " +
-                "value in JAAS file " + jaasServiceName + ", value in Kafka 
config " + configServiceName;
+            String message = String.format("Conflicting serviceName values 
found in JAAS and Kafka configs " +
+                "value in JAAS file %s, value in Kafka config %s", 
jaasServiceName, configServiceName);
             throw new IllegalArgumentException(message);
         }
 
@@ -317,8 +318,8 @@ public class KerberosLogin extends AbstractLogin {
     private long getRefreshTime(KerberosTicket tgt) {
         long start = tgt.getStartTime().getTime();
         long expires = tgt.getEndTime().getTime();
-        log.info("TGT valid starting at: {}", tgt.getStartTime());
-        log.info("TGT expires: {}", tgt.getEndTime());
+        log.info("[Principal={}]: TGT valid starting at: {}", principal, 
tgt.getStartTime());
+        log.info("[Principal={}]: TGT expires: {}", principal, 
tgt.getEndTime());
         long proposedRefresh = start + (long) ((expires - start) *
                 (ticketRenewWindowFactor + (ticketRenewJitter * 
RNG.nextDouble())));
 
@@ -345,15 +346,15 @@ public class KerberosLogin extends AbstractLogin {
     private boolean hasSufficientTimeElapsed() {
         long now = currentElapsedTime();
         if (now - lastLogin < minTimeBeforeRelogin) {
-            log.warn("Not attempting to re-login since the last re-login was 
attempted less than {} seconds before.",
-                    minTimeBeforeRelogin / 1000);
+            log.warn("[Principal={}]: Not attempting to re-login since the 
last re-login was attempted less than {} seconds before.",
+                    principal, minTimeBeforeRelogin / 1000);
             return false;
         }
         return true;
     }
 
     /**
-     * Re-login a principal. This method assumes that {@link #login(String)} 
has happened already.
+     * Re-login a principal. This method assumes that {@link #login()} has 
happened already.
      * @throws javax.security.auth.login.LoginException on a failure
      */
     private synchronized void reLogin() throws LoginException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4d86040..3e391d3 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.consumer.{Consumer, 
ConsumerConfig, ConsumerReco
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.errors.{GroupAuthorizationException, 
TimeoutException, TopicAuthorizationException}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -75,9 +74,6 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   val kafkaPrincipal: String
 
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
-  protected def kafkaClientSaslMechanism = "GSSAPI"
-  protected def kafkaServerSaslMechanisms = List("GSSAPI")
-  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
 
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
@@ -159,12 +155,6 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     */
   @Before
   override def setUp {
-    securityProtocol match {
-      case SecurityProtocol.SSL =>
-        startSasl(ZkSasl, null, null)
-      case _ =>
-        startSasl(Both, List(kafkaClientSaslMechanism), 
kafkaServerSaslMechanisms)
-    }
     super.setUp
     AclCommand.main(topicBrokerReadAclArgs)
     servers.foreach { s =>
@@ -210,7 +200,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     consumeRecords(this.consumers.head, numRecords)
   }
 
-  private def setAclsAndProduce() {
+  protected def setAclsAndProduce() {
     AclCommand.main(produceAclArgs)
     AclCommand.main(consumeAclArgs)
     servers.foreach { s =>
@@ -276,8 +266,8 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
-    servers.foreach { _ =>
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
servers.head.apis.authorizer.get, groupResource)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
     }
   }
  
@@ -318,7 +308,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, 
s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
servers.head.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
     }
     sendRecords(numRecords, tp)
   }
@@ -343,7 +333,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
         assertEquals(group, e.groupId())
     }
   }
-  
+
   private def sendRecords(numRecords: Int, tp: TopicPartition) {
     val futures = (0 until numRecords).map { i =>
       val record = new ProducerRecord(tp.topic(), tp.partition(), 
s"$i".getBytes, s"$i".getBytes)
@@ -357,7 +347,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     }
   }
 
-  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+  protected def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
                              numRecords: Int = 1,
                              startingOffset: Int = 0,
                              topic: String = topic,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..992649a
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -0,0 +1,89 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+import java.util.Properties
+
+import kafka.utils.{JaasTestUtils,TestUtils}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.GroupAuthorizationException
+import org.junit.{Before,Test}
+
+import scala.collection.immutable.List
+import scala.collection.JavaConverters._
+
+abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest 
{
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
+  
+  protected def kafkaClientSaslMechanism: String
+  protected def kafkaServerSaslMechanisms: List[String]
+  
+  @Before
+  override def setUp {
+    startSasl(Both, List(kafkaClientSaslMechanism), kafkaServerSaslMechanisms)
+    super.setUp
+  }
+
+  // Use JAAS configuration properties for clients so that dynamic JAAS 
configuration is also tested by this set of tests
+  override protected def setJaasConfiguration(mode: SaslSetupMode, 
serverMechanisms: List[String], clientMechanisms: List[String],
+      serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = 
None) {
+    // create static config with client login context with credentials for 
JaasTestUtils 'client2'
+    super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, 
clientMechanisms, serverKeytabFile, clientKeytabFile) 
+    // set dynamic properties with credentials for JaasTestUtils 'client1'
+    val clientLoginContext = 
JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, clientKeytabFile)
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+    consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+  }
+
+  /**
+    * Test with two consumers, each with different valid SASL credentials.
+    * The first consumer succeeds because it is allowed by the ACL, 
+    * the second one connects ok, but fails to consume messages due to the ACL.
+    */
+  @Test(timeout = 15000)
+  def testTwoConsumersWithDifferentSaslCredentials {
+    setAclsAndProduce()
+    val consumer1 = consumers.head
+
+    val consumer2Config = new Properties
+    consumer2Config.putAll(consumerConfig)
+    // consumer2 retrieves its credentials from the static JAAS configuration, 
so we test also this path
+    consumer2Config.remove(SaslConfigs.SASL_JAAS_CONFIG)
+
+    val consumer2 = TestUtils.createNewConsumer(brokerList,
+                                                securityProtocol = 
securityProtocol,
+                                                trustStoreFile = 
trustStoreFile,
+                                                saslProperties = 
saslProperties,
+                                                props = Some(consumer2Config))
+    consumers += consumer2
+
+    consumer1.assign(List(tp).asJava)
+    consumer2.assign(List(tp).asJava)
+
+    consumeRecords(consumer1, numRecords)
+
+    try {
+      consumeRecords(consumer2)
+      fail("Expected exception as consumer2 has no access to group")
+    } catch {
+      case _: GroupAuthorizationException => //expected
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..a9b4a60
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -0,0 +1,36 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils
+
+import scala.collection.immutable.List
+
+class SaslGssapiSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTest {
+  override val clientPrincipal = 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName
+  override val kafkaPrincipal = 
JaasTestUtils.KafkaServerPrincipalUnqualifiedName
+  
+  override protected def kafkaClientSaslMechanism = "GSSAPI"
+  override protected def kafkaServerSaslMechanisms = List("GSSAPI")
+  
+  // Configure brokers to require SSL client authentication in order to verify 
that SASL_SSL works correctly even if the
+  // client doesn't have a keystore. We want to cover the scenario where a 
broker requires either SSL client
+  // authentication or SASL authentication with SSL as the transport layer 
(but not both).
+  serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 214cd0b..bfccd28 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -16,24 +16,11 @@
   */
 package kafka.api
 
-import java.io.File
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.config.SaslConfigs
 import kafka.utils.JaasTestUtils
 
-class SaslPlainSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
-  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+class SaslPlainSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTest {
   override protected def kafkaClientSaslMechanism = "PLAIN"
   override protected def kafkaServerSaslMechanisms = List("PLAIN")
-  override val clientPrincipal = "testuser"
-  override val kafkaPrincipal = "admin"
-
-  // Use JAAS configuration properties for clients so that dynamic JAAS 
configuration is also tested by this set of tests
-  override protected def setJaasConfiguration(mode: SaslSetupMode, 
serverMechanisms: List[String], clientMechanisms: List[String],
-      serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = 
None) {
-    super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, List()) // 
create static config without client login contexts
-    val clientLoginModule = 
JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, None)
-    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginModule)
-    consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginModule)
-  }
+  override val clientPrincipal = JaasTestUtils.KafkaPlainUser
+  override val kafkaPrincipal = JaasTestUtils.KafkaPlainAdmin
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index b483884..fe0204a 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -16,15 +16,13 @@
   */
 package kafka.api
 
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramMechanism
 import kafka.utils.JaasTestUtils
 import kafka.admin.ConfigCommand
 import kafka.utils.ZkUtils
 import scala.collection.JavaConverters._
 
-class SaslScramSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
-  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTest {
   override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
   override protected def kafkaServerSaslMechanisms = 
ScramMechanism.mechanismNames.asScala.toList
   override val clientPrincipal = JaasTestUtils.KafkaScramUser
@@ -45,5 +43,6 @@ class SaslScramSslEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest {
     }
     ConfigCommand.main(configCommandArgs(kafkaPrincipal, kafkaPassword))
     ConfigCommand.main(configCommandArgs(clientPrincipal, clientPassword))
+    ConfigCommand.main(configCommandArgs(JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 765f191..c1e2da2 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -54,8 +54,8 @@ trait SaslSetup {
       setJaasConfiguration(mode, kafkaServerSaslMechanisms, 
kafkaClientSaslMechanisms, Some(serverKeytabFile), Some(clientKeytabFile))
       kdc = new MiniKdc(kdcConf, workDir)
       kdc.start()
-      kdc.createPrincipal(serverKeytabFile, "kafka/localhost")
-      kdc.createPrincipal(clientKeytabFile, "client")
+      kdc.createPrincipal(serverKeytabFile, 
JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
+      kdc.createPrincipal(clientKeytabFile, 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName, 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
     } else {
       setJaasConfiguration(mode, kafkaServerSaslMechanisms, 
kafkaClientSaslMechanisms)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
deleted file mode 100644
index 470ec84..0000000
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.api
-
-import kafka.server.KafkaConfig
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SaslSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
-  override protected def securityProtocol = SecurityProtocol.SASL_SSL
-  override val clientPrincipal = "client"
-  override val kafkaPrincipal = "kafka"
-
-  // Configure brokers to require SSL client authentication in order to verify 
that SASL_SSL works correctly even if the
-  // client doesn't have a keystore. We want to cover the scenario where a 
broker requires either SSL client
-  // authentication or SASL authentication with SSL as the transport layer 
(but not both).
-  serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 812359e..365c0ba 100644
--- 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -19,11 +19,17 @@ package kafka.api
 
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.protocol.SecurityProtocol
-
+import org.junit.Before
 
 class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   override protected def securityProtocol = SecurityProtocol.SSL
   this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
   override val clientPrincipal = "O=A client,CN=localhost"
   override val kafkaPrincipal = "O=A server,CN=localhost"
+
+  @Before
+  override def setUp {
+    startSasl(ZkSasl, List.empty, List.empty)
+    super.setUp
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 3d5d702..4677c8c 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -19,6 +19,7 @@ import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.server.{KafkaConfig, ConfigEntityName, QuotaId}
+import kafka.utils.JaasTestUtils
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Before
@@ -30,7 +31,7 @@ class UserQuotaTest extends BaseQuotaTest with 
SaslTestHarness {
   override protected val zkSaslEnabled = false
   override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
 
-  override val userPrincipal = "client"
+  override val userPrincipal = 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2
   override val producerQuotaId = QuotaId(Some(userPrincipal), None)
   override val consumerQuotaId = QuotaId(Some(userPrincipal), None)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6508e63c/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index ed08e8a..a7f71b9 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -101,17 +101,25 @@ object JaasTestUtils {
   private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
 
   private val KafkaServerContextName = "KafkaServer"
-  private val KafkaServerPrincipal = "kafka/localh...@example.com"
+  val KafkaServerPrincipalUnqualifiedName = "kafka"
+  private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + 
"/localh...@example.com"
   private val KafkaClientContextName = "KafkaClient"
-  private val KafkaClientPrincipal = "cli...@example.com"
+  val KafkaClientPrincipalUnqualifiedName = "client"
+  private val KafkaClientPrincipal = KafkaClientPrincipalUnqualifiedName + 
"@EXAMPLE.COM"
+  val KafkaClientPrincipalUnqualifiedName2 = "client2"
+  private val KafkaClientPrincipal2 = KafkaClientPrincipalUnqualifiedName2 + 
"@EXAMPLE.COM"
   
-  private val KafkaPlainUser = "testuser"
-  private val KafkaPlainPassword = "testuser-secret"
-  private val KafkaPlainAdmin = "admin"
-  private val KafkaPlainAdminPassword = "admin-secret"
+  val KafkaPlainUser = "plain-user"
+  private val KafkaPlainPassword = "plain-user-secret"
+  val KafkaPlainUser2 = "plain-user2"
+  val KafkaPlainPassword2 = "plain-user2-secret"
+  val KafkaPlainAdmin = "plain-admin"
+  private val KafkaPlainAdminPassword = "plain-admin-secret"
 
   val KafkaScramUser = "scram-user"
   val KafkaScramPassword = "scram-user-secret"
+  val KafkaScramUser2 = "scram-user2"
+  val KafkaScramPassword2 = "scram-user2-secret"
   val KafkaScramAdmin = "scram-admin"
   val KafkaScramAdminPassword = "scram-admin-secret"
 
@@ -135,8 +143,9 @@ object JaasTestUtils {
     jaasFile.getCanonicalPath
   }
 
+  // Returns the dynamic configuration, using credentials for user #1
   def clientLoginModule(mechanism: String, keytabLocation: Option[File]): 
String =
-    kafkaClientModule(mechanism, keytabLocation).toString
+    kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, 
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
 
   private def zkSections: Seq[JaasSection] = Seq(
     new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, 
Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
@@ -158,7 +167,7 @@ object JaasTestUtils {
           KafkaPlainAdmin,
           KafkaPlainAdminPassword,
           debug = false,
-          Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> 
KafkaPlainPassword)).toJaasModule
+          Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> 
KafkaPlainPassword, KafkaPlainUser2 -> KafkaPlainPassword2)).toJaasModule
       case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>
         ScramLoginModule(
           KafkaScramAdmin,
@@ -169,33 +178,41 @@ object JaasTestUtils {
     new JaasSection(KafkaServerContextName, modules)
   }
 
-  def kafkaClientModule(mechanism: String, keytabLocation: Option[File]): 
JaasModule = {
+  // consider refactoring if more mechanisms are added
+  private def kafkaClientModule(mechanism: String, 
+      keytabLocation: Option[File], clientPrincipal: String,
+      plainUser: String, plainPassword: String, 
+      scramUser: String, scramPassword: String): JaasModule = {
     mechanism match {
       case "GSSAPI" =>
         Krb5LoginModule(
           useKeyTab = true,
           storeKey = true,
           keyTab = keytabLocation.getOrElse(throw new 
IllegalArgumentException("Keytab location not specified for 
GSSAPI")).getAbsolutePath,
-          principal = KafkaClientPrincipal,
+          principal = clientPrincipal,
           debug = true,
           serviceName = Some("kafka")
         ).toJaasModule
       case "PLAIN" =>
         PlainLoginModule(
-          KafkaPlainUser,
-          KafkaPlainPassword
+          plainUser,
+          plainPassword
         ).toJaasModule
       case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>
         ScramLoginModule(
-          KafkaScramUser,
-          KafkaScramPassword
+          scramUser,
+          scramPassword
         ).toJaasModule
       case mechanism => throw new IllegalArgumentException("Unsupported client 
mechanism " + mechanism)
     }
   }
 
+  /*
+   * Used for the static JAAS configuration and it uses the credentials for 
client#2
+   */
   private def kafkaClientSection(mechanisms: List[String], keytabLocation: 
Option[File]): JaasSection = {
-    new JaasSection(KafkaClientContextName, mechanisms.map(m => 
kafkaClientModule(m, keytabLocation)))
+    new JaasSection(KafkaClientContextName, mechanisms.map(m => 
+      kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, 
KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2)))
   }
 
   private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =

Reply via email to