KAFKA-3617; Unit tests for SASL authenticator

Unit tests for SASL authenticator, tests for SASL/PLAIN and multiple 
mechanisms, authorization test for SASL/PLAIN

Author: Rajini Sivaram <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #1273 from rajinisivaram/KAFKA-3617


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a496f48
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a496f48
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a496f48

Branch: refs/heads/0.10.0
Commit: 3a496f480d002a4512273477eda9d92731e600c3
Parents: 316389d
Author: Rajini Sivaram <[email protected]>
Authored: Thu Apr 28 13:39:22 2016 -0700
Committer: Ismael Juma <[email protected]>
Committed: Thu Apr 28 13:39:22 2016 -0700

----------------------------------------------------------------------
 .../common/network/SaslChannelBuilder.java      |  13 +-
 .../apache/kafka/common/security/JaasUtils.java |   2 +-
 .../apache/kafka/common/network/CertStores.java |  46 +++
 .../kafka/common/network/NetworkTestUtils.java  |  80 ++++
 .../kafka/common/network/NioEchoServer.java     | 145 +++++++
 .../common/network/SslTransportLayerTest.java   | 293 +++----------
 .../authenticator/SaslAuthenticatorTest.java    | 408 +++++++++++++++++++
 .../authenticator/TestDigestLoginModule.java    | 109 +++++
 .../security/authenticator/TestJaasConfig.java  |  89 ++++
 .../kafka/api/EndToEndAuthorizationTest.scala   |   5 +-
 .../api/SaslMultiMechanismConsumerTest.scala    |   4 +-
 .../api/SaslPlainPlaintextConsumerTest.scala    |   2 +-
 .../SaslPlainSslEndToEndAuthorizationTest.scala |  28 ++
 .../scala/integration/kafka/api/SaslSetup.scala |  13 +
 .../integration/kafka/api/SaslTestHarness.scala |  13 -
 15 files changed, 982 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index a0464bc..5c907ed 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -63,14 +63,13 @@ public class SaslChannelBuilder implements ChannelBuilder {
                 hasKerberos = 
clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
             }
 
-            String defaultRealm;
-            try {
-                defaultRealm = JaasUtils.defaultRealm();
-            } catch (Exception ke) {
-                defaultRealm = "";
-            }
-
             if (hasKerberos) {
+                String defaultRealm;
+                try {
+                    defaultRealm = JaasUtils.defaultKerberosRealm();
+                } catch (Exception ke) {
+                    defaultRealm = "";
+                }
                 @SuppressWarnings("unchecked")
                 List<String> principalToLocalRules = (List<String>) 
configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
                 if (principalToLocalRules != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java 
b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index ff5e008..63bbafc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -58,7 +58,7 @@ public class JaasUtils {
         return null;
     }
 
-    public static String defaultRealm()
+    public static String defaultKerberosRealm()
         throws ClassNotFoundException, NoSuchMethodException,
                IllegalArgumentException, IllegalAccessException,
                InvocationTargetException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java 
b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
new file mode 100644
index 0000000..6f108b5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+
+public class CertStores {
+
+    private final Map<String, Object> sslConfig;
+
+    public CertStores(boolean server, String host) throws Exception {
+        String name = server ? "server" : "client";
+        Mode mode = server ? Mode.SERVER : Mode.CLIENT;
+        File truststoreFile = File.createTempFile(name + "TS", ".jks");
+        sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, host);
+        if (server)
+            sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+    }
+
+    public Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
+        Map<String, Object> config = new HashMap<>(sslConfig);
+        config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+        config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+        config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+        return config;
+    }
+
+    public Map<String, Object> getUntrustingConfig() {
+        return sslConfig;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
new file mode 100644
index 0000000..53ba954
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+
+/**
+ * Common utility functions used by transport layer and authenticator tests.
+ */
+public class NetworkTestUtils {
+
+    public static NioEchoServer createEchoServer(SecurityProtocol 
securityProtocol, Map<String, Object> serverConfigs) throws Exception {
+        NioEchoServer server = new NioEchoServer(securityProtocol, 
serverConfigs, "localhost");
+        server.start();
+        return server;
+    }
+
+    public static Selector createSelector(ChannelBuilder channelBuilder) {
+        return new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
+    }
+
+    public static void checkClientConnection(Selector selector, String node, 
int minMessageSize, int messageCount) throws Exception {
+
+        String prefix = TestUtils.randomString(minMessageSize);
+        int requests = 0;
+        int responses = 0;
+        // wait for handshake to finish
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + 
"-0").getBytes())));
+        requests++;
+        while (responses < messageCount) {
+            selector.poll(0L);
+            assertEquals("No disconnects should have occurred.", 0, 
selector.disconnected().size());
+
+            for (NetworkReceive receive : selector.completedReceives()) {
+                assertEquals(prefix + "-" + responses, new 
String(Utils.toArray(receive.payload())));
+                responses++;
+            }
+
+            for (int i = 0; i < selector.completedSends().size() && requests < 
messageCount && selector.isChannelReady(node); i++, requests++) {
+                selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + 
"-" + requests).getBytes())));
+            }
+        }
+    }
+
+    public static void waitForChannelClose(Selector selector, String node) 
throws IOException {
+        boolean closed = false;
+        for (int i = 0; i < 30; i++) {
+            selector.poll(1000L);
+            if (selector.channel(node) == null) {
+                closed = true;
+                break;
+            }
+        }
+        assertTrue(closed);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
new file mode 100644
index 0000000..e99a399
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.MockTime;
+
+/**
+ * Non-blocking EchoServer implementation that uses ChannelBuilder to create 
channels
+ * with the configured security protocol.
+ *
+ */
+public class NioEchoServer extends Thread {
+    private final int port;
+    private final ServerSocketChannel serverSocketChannel;
+    private final List<SocketChannel> newChannels;
+    private final List<SocketChannel> socketChannels;
+    private final AcceptorThread acceptorThread;
+    private final Selector selector;
+    private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new 
ConcurrentLinkedQueue<NetworkSend>();
+
+    public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> 
configs, String serverHost) throws Exception {
+        serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 
0));
+        this.port = serverSocketChannel.socket().getLocalPort();
+        this.socketChannels = Collections.synchronizedList(new 
ArrayList<SocketChannel>());
+        this.newChannels = Collections.synchronizedList(new 
ArrayList<SocketChannel>());
+        ChannelBuilder channelBuilder = 
ChannelBuilders.create(securityProtocol, Mode.SERVER, LoginType.SERVER, 
configs, null, true);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
+        setName("echoserver");
+        setDaemon(true);
+        acceptorThread = new AcceptorThread();
+    }
+
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public void run() {
+        try {
+            acceptorThread.start();
+            while (serverSocketChannel.isOpen()) {
+                selector.poll(1000);
+                for (SocketChannel socketChannel : newChannels) {
+                    String id = id(socketChannel);
+                    selector.register(id, socketChannel);
+                    socketChannels.add(socketChannel);
+                }
+                newChannels.clear();
+                while (true) {
+                    NetworkSend send = inflightSends.peek();
+                    if (send != null && 
!selector.channel(send.destination()).hasSend()) {
+                        send = inflightSends.poll();
+                        selector.send(send);
+                    } else
+                        break;
+                }
+                List<NetworkReceive> completedReceives = 
selector.completedReceives();
+                for (NetworkReceive rcv : completedReceives) {
+                    NetworkSend send = new NetworkSend(rcv.source(), 
rcv.payload());
+                    if (!selector.channel(send.destination()).hasSend())
+                        selector.send(send);
+                    else
+                        inflightSends.add(send);
+                }
+            }
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+    private String id(SocketChannel channel) {
+        return channel.socket().getLocalAddress().getHostAddress() + ":" + 
channel.socket().getLocalPort() + "-" +
+                channel.socket().getInetAddress().getHostAddress() + ":" + 
channel.socket().getPort();
+    }
+
+    public void closeConnections() throws IOException {
+        for (SocketChannel channel : socketChannels)
+            channel.close();
+        socketChannels.clear();
+    }
+
+    public void close() throws IOException, InterruptedException {
+        this.serverSocketChannel.close();
+        closeConnections();
+        acceptorThread.interrupt();
+        acceptorThread.join();
+        interrupt();
+        join();
+    }
+
+    private class AcceptorThread extends Thread {
+        public AcceptorThread() throws IOException {
+            setName("acceptor");
+        }
+        public void run() {
+            try {
+                java.nio.channels.Selector acceptSelector = 
java.nio.channels.Selector.open();
+                serverSocketChannel.register(acceptSelector, 
SelectionKey.OP_ACCEPT);
+                while (serverSocketChannel.isOpen()) {
+                    if (acceptSelector.select(1000) > 0) {
+                        Iterator<SelectionKey> it = 
acceptSelector.selectedKeys().iterator();
+                        while (it.hasNext()) {
+                            SelectionKey key = it.next();
+                            if (key.isAcceptable()) {
+                                SocketChannel socketChannel = 
((ServerSocketChannel) key.channel()).accept();
+                                socketChannel.configureBlocking(false);
+                                newChannels.add(socketChannel);
+                                selector.wakeup();
+                            }
+                            it.remove();
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index d3302c8..4e96411 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -12,25 +12,15 @@
  */
 package org.apache.kafka.common.network;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.io.IOException;
-import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 
 import javax.net.ssl.SSLContext;
@@ -40,11 +30,9 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestSslUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +44,7 @@ public class SslTransportLayerTest {
 
     private static final int BUFFER_SIZE = 4 * 1024;
 
-    private SslEchoServer server;
+    private NioEchoServer server;
     private Selector selector;
     private ChannelBuilder channelBuilder;
     private CertStores serverCertStores;
@@ -91,13 +79,13 @@ public class SslTransportLayerTest {
     @Test
     public void testValidEndpointIdentification() throws Exception {
         String node = "0";
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -113,12 +101,12 @@ public class SslTransportLayerTest {
         sslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
         sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
         
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        waitForChannelClose(node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
     }
     
     /**
@@ -129,14 +117,14 @@ public class SslTransportLayerTest {
     public void testEndpointIdentificationDisabled() throws Exception {
         String node = "0";
         String serverHost = InetAddress.getLocalHost().getHostAddress();
-        server = new SslEchoServer(sslServerConfigs, serverHost);
+        server = new NioEchoServer(SecurityProtocol.SSL, sslServerConfigs, 
serverHost);
         server.start();
         
sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress(serverHost, 
server.port);
+        InetSocketAddress addr = new InetSocketAddress(serverHost, 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -147,12 +135,12 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequiredValidProvided() throws 
Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -164,12 +152,12 @@ public class SslTransportLayerTest {
         String node = "0";
         sslServerConfigs = serverCertStores.getUntrustingConfig();
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        createEchoServer(sslServerConfigs);        
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        waitForChannelClose(node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
     }
     
     /**
@@ -180,16 +168,16 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequiredNotProvided() throws Exception 
{
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        waitForChannelClose(node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
     }
     
     /**
@@ -201,12 +189,12 @@ public class SslTransportLayerTest {
         String node = "0";
         sslServerConfigs = serverCertStores.getUntrustingConfig();
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
-        createEchoServer(sslServerConfigs);      
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -217,16 +205,16 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationDisabledNotProvided() throws Exception 
{
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -237,12 +225,12 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequestedValidProvided() throws 
Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -253,16 +241,16 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequestedNotProvided() throws 
Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
         sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 100, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
     }
     
     /**
@@ -303,12 +291,12 @@ public class SslTransportLayerTest {
     public void testInvalidKeyPassword() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new 
Password("invalid"));
-        createEchoServer(sslServerConfigs);        
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        waitForChannelClose(node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
     }
     
     /**
@@ -318,14 +306,14 @@ public class SslTransportLayerTest {
     public void testUnsupportedTLSVersion() throws Exception {
         String node = "0";
         sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
Arrays.asList("TLSv1.2"));
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         
         sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
Arrays.asList("TLSv1.1"));
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        waitForChannelClose(node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
     }
     
     /**
@@ -336,14 +324,14 @@ public class SslTransportLayerTest {
         String node = "0";
         String[] cipherSuites = 
SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
         sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
Arrays.asList(cipherSuites[0]));
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         
         sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
Arrays.asList(cipherSuites[1]));
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        waitForChannelClose(node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
     }
 
     /**
@@ -352,12 +340,12 @@ public class SslTransportLayerTest {
     @Test
     public void testNetReadBufferResize() throws Exception {
         String node = "0";
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs, 10, null, null);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 64000, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
     }
     
     /**
@@ -366,12 +354,12 @@ public class SslTransportLayerTest {
     @Test
     public void testNetWriteBufferResize() throws Exception {
         String node = "0";
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs, null, 10, null);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 64000, 10);
+        NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
     }
 
     /**
@@ -380,55 +368,12 @@ public class SslTransportLayerTest {
     @Test
     public void testApplicationBufferResize() throws Exception {
         String node = "0";
-        createEchoServer(sslServerConfigs);
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
         createSelector(sslClientConfigs, null, null, 10);
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        testClientConnection(node, 64000, 10);
-    }
-
-    private void testClientConnection(String node, int minMessageSize, int 
messageCount) throws Exception {
-
-        String prefix = TestUtils.randomString(minMessageSize);
-        int requests = 0;
-        int responses = 0;
-        // wait for handshake to finish
-        while (!selector.isChannelReady(node)) {
-            selector.poll(1000L);
-        }
-        selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + 
"-0").getBytes())));
-        requests++;
-        while (responses < messageCount) {
-            selector.poll(0L);
-            assertEquals("No disconnects should have occurred.", 0, 
selector.disconnected().size());
-
-            for (NetworkReceive receive : selector.completedReceives()) {
-                assertEquals(prefix + "-" + responses, new 
String(Utils.toArray(receive.payload())));
-                responses++;
-            }
-
-            for (int i = 0; i < selector.completedSends().size() && requests < 
messageCount && selector.isChannelReady(node); i++, requests++) {
-                selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + 
"-" + requests).getBytes())));
-            }
-        }
-    }
-    
-    private void waitForChannelClose(String node) throws IOException {
-        boolean closed = false;
-        for (int i = 0; i < 30; i++) {
-            selector.poll(1000L);
-            if (selector.channel(node) == null) {
-                closed = true;
-                break;
-            }
-        }
-        assertTrue(closed);
-    }
-    
-    private void createEchoServer(Map<String, Object> sslServerConfigs) throws 
Exception {
-        server = new SslEchoServer(sslServerConfigs, "localhost");
-        server.start();
+        NetworkTestUtils.checkClientConnection(selector, node, 64000, 10);
     }
     
     private void createSelector(Map<String, Object> sslClientConfigs) {
@@ -455,32 +400,6 @@ public class SslTransportLayerTest {
         this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
     }
     
-    private static class CertStores {
-        
-        Map<String, Object> sslConfig;
-        
-        CertStores(boolean server, String host) throws Exception {
-            String name = server ? "server" : "client";
-            Mode mode = server ? Mode.SERVER : Mode.CLIENT;
-            File truststoreFile = File.createTempFile(name + "TS", ".jks");
-            sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, host);
-            if (server)
-                sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
-        }
-       
-        private Map<String, Object> getTrustingConfig(CertStores 
truststoreConfig) {
-            Map<String, Object> config = new HashMap<>(sslConfig);
-            config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
-            config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
-            config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
-            return config;
-        }
-        
-        private Map<String, Object> getUntrustingConfig() {
-            return sslConfig;
-        }
-    }
-
     /**
      * SSLTransportLayer with overrides for packet and application buffer size 
to test buffer resize
      * code path. The overridden buffer size starts with a small value and 
increases in size when the buffer
@@ -537,117 +456,5 @@ public class SslTransportLayerTest {
             }
         }
     }
-    
-    // Non-blocking EchoServer implementation that uses SSLTransportLayer
-    private class SslEchoServer extends Thread {
-        private final int port;
-        private final ServerSocketChannel serverSocketChannel;
-        private final List<SocketChannel> newChannels;
-        private final List<SocketChannel> socketChannels;
-        private final AcceptorThread acceptorThread;
-        private SslFactory sslFactory;
-        private final Selector selector;
-        private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new 
ConcurrentLinkedQueue<NetworkSend>();
-
-        public SslEchoServer(Map<String, ?> configs, String serverHost) throws 
Exception {
-            this.sslFactory = new SslFactory(Mode.SERVER);
-            this.sslFactory.configure(configs);
-            serverSocketChannel = ServerSocketChannel.open();
-            serverSocketChannel.configureBlocking(false);
-            serverSocketChannel.socket().bind(new 
InetSocketAddress(serverHost, 0));
-            this.port = serverSocketChannel.socket().getLocalPort();
-            this.socketChannels = Collections.synchronizedList(new 
ArrayList<SocketChannel>());
-            this.newChannels = Collections.synchronizedList(new 
ArrayList<SocketChannel>());
-            SslChannelBuilder channelBuilder = new 
SslChannelBuilder(Mode.SERVER);
-            channelBuilder.configure(sslServerConfigs);
-            this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
-            setName("echoserver");
-            setDaemon(true);
-            acceptorThread = new AcceptorThread();
-        }
-
-        @Override
-        public void run() {
-            try {
-                acceptorThread.start();
-                while (serverSocketChannel.isOpen()) {
-                    selector.poll(1000);
-                    for (SocketChannel socketChannel : newChannels) {
-                        String id = id(socketChannel);
-                        selector.register(id, socketChannel);
-                        socketChannels.add(socketChannel);
-                    }
-                    newChannels.clear();
-                    while (true) {
-                        NetworkSend send = inflightSends.peek();
-                        if (send != null && 
!selector.channel(send.destination()).hasSend()) {
-                            send = inflightSends.poll();
-                            selector.send(send);
-                        } else
-                            break;
-                    }
-                    List<NetworkReceive> completedReceives = 
selector.completedReceives();
-                    for (NetworkReceive rcv : completedReceives) {
-                        NetworkSend send = new NetworkSend(rcv.source(), 
rcv.payload());
-                        if (!selector.channel(send.destination()).hasSend())
-                            selector.send(send);
-                        else
-                            inflightSends.add(send);
-                    }
-                }
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-        
-        private String id(SocketChannel channel) {
-            return channel.socket().getLocalAddress().getHostAddress() + ":" + 
channel.socket().getLocalPort() + "-" +
-                    channel.socket().getInetAddress().getHostAddress() + ":" + 
channel.socket().getPort();
-        }
-
-        public void closeConnections() throws IOException {
-            for (SocketChannel channel : socketChannels)
-                channel.close();
-            socketChannels.clear();
-        }
-
-        public void close() throws IOException, InterruptedException {
-            this.serverSocketChannel.close();
-            closeConnections();
-            acceptorThread.interrupt();
-            acceptorThread.join();
-            interrupt();
-            join();
-        }
-        
-        private class AcceptorThread extends Thread {
-            public AcceptorThread() throws IOException {
-                setName("acceptor");
-            }
-            public void run() {
-                try {
-
-                    java.nio.channels.Selector acceptSelector = 
java.nio.channels.Selector.open();
-                    serverSocketChannel.register(acceptSelector, 
SelectionKey.OP_ACCEPT);
-                    while (serverSocketChannel.isOpen()) {
-                        if (acceptSelector.select(1000) > 0) {
-                            Iterator<SelectionKey> it = 
acceptSelector.selectedKeys().iterator();
-                            while (it.hasNext()) {
-                                SelectionKey key = it.next();
-                                if (key.isAcceptable()) {
-                                    SocketChannel socketChannel = 
((ServerSocketChannel) key.channel()).accept();
-                                    socketChannel.configureBlocking(false);
-                                    newChannels.add(socketChannel);
-                                    selector.wakeup();
-                                }
-                            }
-                        }
-                    }
-                } catch (IOException e) {
-                    // ignore
-                }
-            }
-        }
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
new file mode 100644
index 0000000..0a4928b
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.CertStores;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.NetworkTestUtils;
+import org.apache.kafka.common.network.NioEchoServer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.security.JaasUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the Sasl authenticator. These use a test harness that runs a 
simple socket server that echos back responses.
+ */
+public class SaslAuthenticatorTest {
+
+    private static final int BUFFER_SIZE = 4 * 1024;
+
+    private NioEchoServer server;
+    private Selector selector;
+    private ChannelBuilder channelBuilder;
+    private CertStores serverCertStores;
+    private CertStores clientCertStores;
+    private Map<String, Object> saslClientConfigs;
+    private Map<String, Object> saslServerConfigs;
+
+    @Before
+    public void setup() throws Exception {
+        serverCertStores = new CertStores(true, "localhost");
+        clientCertStores = new CertStores(false, "localhost");
+        saslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
+        saslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (server != null)
+            this.server.close();
+        if (selector != null)
+            this.selector.close();
+    }
+
+    /**
+     * Tests good path SASL/PLAIN client and server channels using SSL 
transport layer.
+     */
+    @Test
+    public void testValidSaslPlainOverSsl() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createAndCheckClientConnection(securityProtocol, node);
+    }
+
+    /**
+     * Tests good path SASL/PLAIN client and server channels using PLAINTEXT 
transport layer.
+     */
+    @Test
+    public void testValidSaslPlainOverPlaintext() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createAndCheckClientConnection(securityProtocol, node);
+    }
+
+    /**
+     * Tests that SASL/PLAIN clients with invalid password fail authentication.
+     */
+    @Test
+    public void testInvalidPasswordSaslPlain() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, 
"invalidpassword");
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+    }
+
+    /**
+     * Tests that SASL/PLAIN clients with invalid username fail authentication.
+     */
+    @Test
+    public void testInvalidUsernameSaslPlain() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.setPlainClientOptions("invaliduser", 
TestJaasConfig.PASSWORD);
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+    }
+
+    /**
+     * Tests that SASL/PLAIN clients without valid username fail 
authentication.
+     */
+    @Test
+    public void testMissingUsernameSaslPlain() throws Exception {
+        String node = "0";
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.setPlainClientOptions(null, "mypassword");
+
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createSelector(securityProtocol, saslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        try {
+            selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+            fail("SASL/PLAIN channel created without username");
+        } catch (KafkaException e) {
+            // Expected exception
+        }
+    }
+
+    /**
+     * Tests that SASL/PLAIN clients with missing password in JAAS 
configuration fail authentication.
+     */
+    @Test
+    public void testMissingPasswordSaslPlain() throws Exception {
+        String node = "0";
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.setPlainClientOptions("myuser", null);
+
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createSelector(securityProtocol, saslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        try {
+            selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+            fail("SASL/PLAIN channel created without password");
+        } catch (KafkaException e) {
+            // Expected exception
+        }
+    }
+
+    /**
+     * Tests that mechanisms that are not supported in Kafka can be plugged in 
without modifying
+     * Kafka code if Sasl client and server providers are available.
+     */
+    @Test
+    public void testMechanismPluggability() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5"));
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createAndCheckClientConnection(securityProtocol, node);
+    }
+
+    /**
+     * Tests that servers supporting multiple SASL mechanisms work with 
clients using
+     * any of the enabled mechanisms.
+     */
+    @Test
+    public void testMultipleServerMechanisms() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", 
"PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+
+        String node1 = "1";
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        createAndCheckClientConnection(securityProtocol, node1);
+
+        String node2 = "2";
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "DIGEST-MD5");
+        createSelector(securityProtocol, saslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);
+    }
+
+    /**
+     * Tests that any invalid data during Kafka SASL handshake request flow
+     * or the actual SASL authentication flow result in authentication failure
+     * and do not cause any failures in the server.
+     */
+    @Test
+    public void testInvalidSaslPacket() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+
+        // Send invalid SASL packet after valid handshake request
+        String node1 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        sendHandshakeRequest(node1);
+        Random random = new Random();
+        byte[] bytes = new byte[1024];
+        random.nextBytes(bytes);
+        selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
+        NetworkTestUtils.waitForChannelClose(selector, node1);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good1");
+
+        // Send invalid SASL packet before handshake request
+        String node2 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        random.nextBytes(bytes);
+        selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
+        NetworkTestUtils.waitForChannelClose(selector, node2);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good2");
+    }
+
+    /**
+     * Tests that packets that are too big during Kafka SASL handshake request 
flow
+     * or the actual SASL authentication flow result in authentication failure
+     * and do not cause any failures in the server.
+     */
+    @Test
+    public void testPacketSizeTooBig() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+
+        // Send SASL packet with large size after valid handshake request
+        String node1 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        sendHandshakeRequest(node1);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.putInt(Integer.MAX_VALUE);
+        buffer.put(new byte[buffer.capacity() - 4]);
+        buffer.rewind();
+        selector.send(new NetworkSend(node1, buffer));
+        NetworkTestUtils.waitForChannelClose(selector, node1);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good1");
+
+        // Send packet with large size before handshake request
+        String node2 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        buffer.clear();
+        buffer.putInt(Integer.MAX_VALUE);
+        buffer.put(new byte[buffer.capacity() - 4]);
+        buffer.rewind();
+        selector.send(new NetworkSend(node2, buffer));
+        NetworkTestUtils.waitForChannelClose(selector, node2);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good2");
+    }
+
+    /**
+     * Tests that Kafka requests that are forbidden until successful 
authentication result
+     * in authentication failure and do not cause any failures in the server.
+     */
+    @Test
+    public void testDisallowedKafkaRequestsBeforeAuthentication() throws 
Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+
+        // Send metadata request before Kafka SASL handshake request
+        String node1 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        RequestHeader metadataRequestHeader1 = new 
RequestHeader(ApiKeys.METADATA.id, "someclient", 1);
+        MetadataRequest metadataRequest1 = new 
MetadataRequest(Collections.singletonList("sometopic"));
+        selector.send(new NetworkSend(node1, 
RequestSend.serialize(metadataRequestHeader1, metadataRequest1.toStruct())));
+        NetworkTestUtils.waitForChannelClose(selector, node1);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good1");
+
+        // Send metadata request after Kafka SASL handshake request
+        String node2 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        sendHandshakeRequest(node2);
+        RequestHeader metadataRequestHeader2 = new 
RequestHeader(ApiKeys.METADATA.id, "someclient", 2);
+        MetadataRequest metadataRequest2 = new 
MetadataRequest(Collections.singletonList("sometopic"));
+        selector.send(new NetworkSend(node2, 
RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct())));
+        NetworkTestUtils.waitForChannelClose(selector, node2);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good2");
+    }
+
+    /**
+     * Tests that connections cannot be created if the login module class is 
unavailable.
+     */
+    @Test
+    public void testInvalidLoginModule() throws Exception {
+        TestJaasConfig jaasConfig = configureMechanisms("PLAIN", 
Arrays.asList("PLAIN"));
+        jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, 
"InvalidLoginModule", TestJaasConfig.defaultClientOptions());
+
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        try {
+            createSelector(securityProtocol, saslClientConfigs);
+            fail("SASL/PLAIN channel created without valid login module");
+        } catch (KafkaException e) {
+            // Expected exception
+        }
+    }
+
+    /**
+     * Tests that mechanisms with default implementation in Kafka may be 
disabled in
+     * the Kafka server by removing from the enabled mechanism list.
+     */
+    @Test
+    public void testDisabledMechanism() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+    }
+
+    /**
+     * Tests that clients using invalid SASL mechanisms fail authentication.
+     */
+    @Test
+    public void testInvalidMechanism() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
+
+        server = NetworkTestUtils.createEchoServer(securityProtocol, 
saslServerConfigs);
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.waitForChannelClose(selector, node);
+    }
+
+    private TestJaasConfig configureMechanisms(String clientMechanism, 
List<String> serverMechanisms) {
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
+        saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
serverMechanisms);
+        return TestJaasConfig.createConfiguration(clientMechanism, 
serverMechanisms);
+    }
+
+    private void createSelector(SecurityProtocol securityProtocol, Map<String, 
Object> clientConfigs) {
+        String saslMechanism = (String) 
saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
+        this.channelBuilder = ChannelBuilders.create(securityProtocol, 
Mode.CLIENT, LoginType.CLIENT, clientConfigs, saslMechanism, true);
+        this.selector = NetworkTestUtils.createSelector(channelBuilder);
+    }
+
+    private void createClientConnection(SecurityProtocol securityProtocol, 
String node) throws Exception {
+        createSelector(securityProtocol, saslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+    }
+
+    private void createAndCheckClientConnection(SecurityProtocol 
securityProtocol, String node) throws Exception {
+        createClientConnection(securityProtocol, node);
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+        selector.close();
+        selector = null;
+    }
+
+    private void sendHandshakeRequest(String node) throws Exception {
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, 
"someclient", 1);
+        SaslHandshakeRequest handshakeRequest = new 
SaslHandshakeRequest("PLAIN");
+        selector.send(new NetworkSend(node, RequestSend.serialize(header, 
handshakeRequest.toStruct())));
+        int waitSeconds = 10;
+        do {
+            selector.poll(1000);
+        } while (selector.completedSends().isEmpty() && waitSeconds-- > 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
new file mode 100644
index 0000000..2923a5a
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import java.io.IOException;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+
+/**
+ * Digest-MD5 login module for multi-mechanism tests. Since callback handlers 
are not configurable in Kafka
+ * yet, this replaces the standard Digest-MD5 SASL server provider with one 
that invokes the test callback handler.
+ * This login module uses the same format as PlainLoginModule and hence simply 
reuses the same methods.
+ *
+ */
+public class TestDigestLoginModule extends PlainLoginModule {
+
+    private static final SaslServerFactory STANDARD_DIGEST_SASL_SERVER_FACTORY;
+    static {
+        SaslServerFactory digestSaslServerFactory = null;
+        Enumeration<SaslServerFactory> factories = 
Sasl.getSaslServerFactories();
+        Map<String, Object> emptyProps = new HashMap<>();
+        while (factories.hasMoreElements()) {
+            SaslServerFactory factory = factories.nextElement();
+            if 
(Arrays.asList(factory.getMechanismNames(emptyProps)).contains("DIGEST-MD5")) {
+                digestSaslServerFactory = factory;
+                break;
+            }
+        }
+        STANDARD_DIGEST_SASL_SERVER_FACTORY = digestSaslServerFactory;
+        Security.insertProviderAt(new DigestSaslServerProvider(), 1);
+    }
+
+    public static class DigestServerCallbackHandler implements CallbackHandler 
{
+
+        @Override
+        public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nameCallback = (NameCallback) callback;
+                    nameCallback.setName(nameCallback.getDefaultName());
+                } else if (callback instanceof PasswordCallback) {
+                    PasswordCallback passwordCallback = (PasswordCallback) 
callback;
+                    
passwordCallback.setPassword(TestJaasConfig.PASSWORD.toCharArray());
+                } else if (callback instanceof RealmCallback) {
+                    RealmCallback realmCallback = (RealmCallback) callback;
+                    realmCallback.setText(realmCallback.getDefaultText());
+                } else if (callback instanceof AuthorizeCallback) {
+                    AuthorizeCallback authCallback = (AuthorizeCallback) 
callback;
+                    if 
(TestJaasConfig.USERNAME.equals(authCallback.getAuthenticationID())) {
+                        authCallback.setAuthorized(true);
+                        
authCallback.setAuthorizedID(authCallback.getAuthenticationID());
+                    }
+                }
+            }
+        }
+    }
+
+    public static class DigestSaslServerFactory implements SaslServerFactory {
+
+        @Override
+        public SaslServer createSaslServer(String mechanism, String protocol, 
String serverName, Map<String, ?> props, CallbackHandler cbh)
+                throws SaslException {
+            return 
STANDARD_DIGEST_SASL_SERVER_FACTORY.createSaslServer(mechanism, protocol, 
serverName, props, new DigestServerCallbackHandler());
+        }
+
+        @Override
+        public String[] getMechanismNames(Map<String, ?> props) {
+            return new String[] {"DIGEST-MD5"};
+        }
+    }
+
+    public static class DigestSaslServerProvider extends Provider {
+
+        private static final long serialVersionUID = 1L;
+
+        protected DigestSaslServerProvider() {
+            super("Test SASL/Digest-MD5 Server Provider", 1.0, "Test 
SASL/Digest-MD5 Server Provider for Kafka");
+            super.put("SaslServerFactory.DIGEST-MD5", 
TestDigestLoginModule.DigestSaslServerFactory.class.getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
new file mode 100644
index 0000000..2291cc1
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+
+public class TestJaasConfig extends Configuration {
+
+    static final String USERNAME = "myuser";
+    static final String PASSWORD = "mypassword";
+
+    private Map<String, AppConfigurationEntry[]> entryMap = new HashMap<>();
+
+    public static TestJaasConfig createConfiguration(String clientMechanism, 
List<String> serverMechanisms) {
+        TestJaasConfig config = new TestJaasConfig();
+        config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, 
loginModule(clientMechanism), defaultClientOptions());
+        for (String mechanism : serverMechanisms) {
+            config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, 
loginModule(mechanism), defaultServerOptions());
+        }
+        Configuration.setConfiguration(config);
+        return config;
+    }
+
+    public void setPlainClientOptions(String clientUsername, String 
clientPassword) {
+        Map<String, Object> options = new HashMap<>();
+        if (clientUsername != null)
+            options.put("username", clientUsername);
+        if (clientPassword != null)
+            options.put("password", clientPassword);
+        createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, 
PlainLoginModule.class.getName(), options);
+    }
+
+    public void createOrUpdateEntry(String name, String loginModule, 
Map<String, Object> options) {
+        AppConfigurationEntry entry = new AppConfigurationEntry(loginModule, 
LoginModuleControlFlag.REQUIRED, options);
+        entryMap.put(name, new AppConfigurationEntry[] {entry});
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+        return entryMap.get(name);
+    }
+
+    private static String loginModule(String mechanism) {
+        String loginModule;
+        switch (mechanism) {
+            case "PLAIN":
+                loginModule = PlainLoginModule.class.getName();
+                break;
+            case "DIGEST-MD5":
+                loginModule = TestDigestLoginModule.class.getName();
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported mechanism " + 
mechanism);
+        }
+        return loginModule;
+    }
+
+    public static Map<String, Object> defaultClientOptions() {
+        Map<String, Object> options = new HashMap<>();
+        options.put("username", USERNAME);
+        options.put("password", PASSWORD);
+        return options;
+    }
+
+    public static Map<String, Object> defaultServerOptions() {
+        Map<String, Object> options = new HashMap<>();
+        options.put("user_" + USERNAME, PASSWORD);
+        return options;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 870caca..fec96cd 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -78,6 +78,9 @@ trait EndToEndAuthorizationTest extends 
IntegrationTestHarness with SaslSetup {
   val kafkaPrincipal: String
 
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+  protected def kafkaClientSaslMechanism = "GSSAPI"
+  protected def kafkaServerSaslMechanisms = List("GSSAPI")
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
 
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
@@ -143,7 +146,7 @@ trait EndToEndAuthorizationTest extends 
IntegrationTestHarness with SaslSetup {
       case SecurityProtocol.SSL =>
         startSasl(ZkSasl, null, null)
       case _ =>
-        startSasl(Both, List("GSSAPI"), List("GSSAPI"))
+        startSasl(Both, List(kafkaClientSaslMechanism), 
kafkaServerSaslMechanisms)
     }
     super.setUp
     AclCommand.main(topicBrokerReadAclArgs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index d203245..fc79c60 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -27,7 +27,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest 
with SaslTestHarne
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
-  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms))
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
 
   @Test
   def testMultipleBrokerMechanisms() {
@@ -35,7 +35,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest 
with SaslTestHarne
     val plainSaslProducer = producers(0)
     val plainSaslConsumer = consumers(0)
 
-    val gssapiSaslProperties = kafkaSaslProperties("GSSAPI", 
kafkaServerSaslMechanisms)
+    val gssapiSaslProperties = kafkaSaslProperties("GSSAPI")
     val gssapiSaslProducer = TestUtils.createNewProducer(brokerList,
                                                          securityProtocol = 
this.securityProtocol,
                                                          trustStoreFile = 
this.trustStoreFile,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 687cfc3..bdca577 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -23,5 +23,5 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest 
with SaslTestHarne
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
-  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms))
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..63636c0
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -0,0 +1,28 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslPlainSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected def kafkaClientSaslMechanism = "PLAIN"
+  override protected def kafkaServerSaslMechanisms = List("PLAIN")
+  override val clientPrincipal = "testuser"
+  override val kafkaPrincipal = "admin"
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index acc86e3..765f191 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -18,11 +18,14 @@
 package kafka.api
 
 import java.io.File
+import java.util.Properties
 import javax.security.auth.login.Configuration
 import kafka.security.minikdc.MiniKdc
+import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.authenticator.LoginManager
+import org.apache.kafka.common.config.SaslConfigs
 
 /*
  * Implements an enumeration for the modes enabled here:
@@ -81,4 +84,14 @@ trait SaslSetup {
     System.clearProperty("zookeeper.authProvider.1")
     Configuration.setConfiguration(null)
   }
+
+  def kafkaSaslProperties(clientSaslMechanism: String, serverSaslMechanisms: 
Option[Seq[String]] = None) = {
+    val props = new Properties
+    props.put(SaslConfigs.SASL_MECHANISM, clientSaslMechanism)
+    serverSaslMechanisms.foreach { serverMechanisms =>
+        props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, 
clientSaslMechanism)
+        props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
serverMechanisms.mkString(","))
+    }
+    props
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
index 5531919..8fd3eb4 100644
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -13,11 +13,7 @@
 package kafka.api
 
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.KafkaConfig
 import org.junit.{After, Before}
-import java.util.Properties
-import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.SaslConfigs
 
 trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
   protected val zkSaslEnabled: Boolean
@@ -42,13 +38,4 @@ trait SaslTestHarness extends ZooKeeperTestHarness with 
SaslSetup {
     super.tearDown
     closeSasl()
   }
-
-  def kafkaSaslProperties(kafkaClientSaslMechanism: String, 
kafkaServerSaslMechanisms: List[String]) = {
-    val props = new Properties
-    props.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
-    props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, 
kafkaClientSaslMechanism)
-    props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
kafkaServerSaslMechanisms.asJava)
-    props
-  }
-
 }

Reply via email to