KAFKA-3617; Unit tests for SASL authenticator Unit tests for SASL authenticator, tests for SASL/PLAIN and multiple mechanisms, authorization test for SASL/PLAIN
Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1273 from rajinisivaram/KAFKA-3617 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a496f48 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a496f48 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a496f48 Branch: refs/heads/0.10.0 Commit: 3a496f480d002a4512273477eda9d92731e600c3 Parents: 316389d Author: Rajini Sivaram <[email protected]> Authored: Thu Apr 28 13:39:22 2016 -0700 Committer: Ismael Juma <[email protected]> Committed: Thu Apr 28 13:39:22 2016 -0700 ---------------------------------------------------------------------- .../common/network/SaslChannelBuilder.java | 13 +- .../apache/kafka/common/security/JaasUtils.java | 2 +- .../apache/kafka/common/network/CertStores.java | 46 +++ .../kafka/common/network/NetworkTestUtils.java | 80 ++++ .../kafka/common/network/NioEchoServer.java | 145 +++++++ .../common/network/SslTransportLayerTest.java | 293 +++---------- .../authenticator/SaslAuthenticatorTest.java | 408 +++++++++++++++++++ .../authenticator/TestDigestLoginModule.java | 109 +++++ .../security/authenticator/TestJaasConfig.java | 89 ++++ .../kafka/api/EndToEndAuthorizationTest.scala | 5 +- .../api/SaslMultiMechanismConsumerTest.scala | 4 +- .../api/SaslPlainPlaintextConsumerTest.scala | 2 +- .../SaslPlainSslEndToEndAuthorizationTest.scala | 28 ++ .../scala/integration/kafka/api/SaslSetup.scala | 13 + .../integration/kafka/api/SaslTestHarness.scala | 13 - 15 files changed, 982 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index a0464bc..5c907ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -63,14 +63,13 @@ public class SaslChannelBuilder implements ChannelBuilder { hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM); } - String defaultRealm; - try { - defaultRealm = JaasUtils.defaultRealm(); - } catch (Exception ke) { - defaultRealm = ""; - } - if (hasKerberos) { + String defaultRealm; + try { + defaultRealm = JaasUtils.defaultKerberosRealm(); + } catch (Exception ke) { + defaultRealm = ""; + } @SuppressWarnings("unchecked") List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES); if (principalToLocalRules != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java index ff5e008..63bbafc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java @@ -58,7 +58,7 @@ public class JaasUtils { return null; } - public static String defaultRealm() + public static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException, IllegalArgumentException, IllegalAccessException, InvocationTargetException { http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/CertStores.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java new file mode 100644 index 0000000..6f108b5 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.test.TestSslUtils; + +public class CertStores { + + private final Map<String, Object> sslConfig; + + public CertStores(boolean server, String host) throws Exception { + String name = server ? "server" : "client"; + Mode mode = server ? Mode.SERVER : Mode.CLIENT; + File truststoreFile = File.createTempFile(name + "TS", ".jks"); + sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, host); + if (server) + sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + } + + public Map<String, Object> getTrustingConfig(CertStores truststoreConfig) { + Map<String, Object> config = new HashMap<>(sslConfig); + config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + return config; + } + + public Map<String, Object> getUntrustingConfig() { + return sslConfig; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java new file mode 100644 index 0000000..53ba954 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; + +/** + * Common utility functions used by transport layer and authenticator tests. + */ +public class NetworkTestUtils { + + public static NioEchoServer createEchoServer(SecurityProtocol securityProtocol, Map<String, Object> serverConfigs) throws Exception { + NioEchoServer server = new NioEchoServer(securityProtocol, serverConfigs, "localhost"); + server.start(); + return server; + } + + public static Selector createSelector(ChannelBuilder channelBuilder) { + return new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); + } + + public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception { + + String prefix = TestUtils.randomString(minMessageSize); + int requests = 0; + int responses = 0; + // wait for handshake to finish + while (!selector.isChannelReady(node)) { + selector.poll(1000L); + } + selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes()))); + requests++; + while (responses < messageCount) { + selector.poll(0L); + assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); + + for (NetworkReceive receive : selector.completedReceives()) { + assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload()))); + responses++; + } + + for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) { + selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes()))); + } + } + } + + public static void waitForChannelClose(Selector selector, String node) throws IOException { + boolean closed = false; + for (int i = 0; i < 30; i++) { + selector.poll(1000L); + if (selector.channel(node) == null) { + closed = true; + break; + } + } + assertTrue(closed); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java new file mode 100644 index 0000000..e99a399 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.utils.MockTime; + +/** + * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels + * with the configured security protocol. + * + */ +public class NioEchoServer extends Thread { + private final int port; + private final ServerSocketChannel serverSocketChannel; + private final List<SocketChannel> newChannels; + private final List<SocketChannel> socketChannels; + private final AcceptorThread acceptorThread; + private final Selector selector; + private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>(); + + public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0)); + this.port = serverSocketChannel.socket().getLocalPort(); + this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); + this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); + ChannelBuilder channelBuilder = ChannelBuilders.create(securityProtocol, Mode.SERVER, LoginType.SERVER, configs, null, true); + this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); + setName("echoserver"); + setDaemon(true); + acceptorThread = new AcceptorThread(); + } + + public int port() { + return port; + } + + @Override + public void run() { + try { + acceptorThread.start(); + while (serverSocketChannel.isOpen()) { + selector.poll(1000); + for (SocketChannel socketChannel : newChannels) { + String id = id(socketChannel); + selector.register(id, socketChannel); + socketChannels.add(socketChannel); + } + newChannels.clear(); + while (true) { + NetworkSend send = inflightSends.peek(); + if (send != null && !selector.channel(send.destination()).hasSend()) { + send = inflightSends.poll(); + selector.send(send); + } else + break; + } + List<NetworkReceive> completedReceives = selector.completedReceives(); + for (NetworkReceive rcv : completedReceives) { + NetworkSend send = new NetworkSend(rcv.source(), rcv.payload()); + if (!selector.channel(send.destination()).hasSend()) + selector.send(send); + else + inflightSends.add(send); + } + } + } catch (IOException e) { + // ignore + } + } + + private String id(SocketChannel channel) { + return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" + + channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort(); + } + + public void closeConnections() throws IOException { + for (SocketChannel channel : socketChannels) + channel.close(); + socketChannels.clear(); + } + + public void close() throws IOException, InterruptedException { + this.serverSocketChannel.close(); + closeConnections(); + acceptorThread.interrupt(); + acceptorThread.join(); + interrupt(); + join(); + } + + private class AcceptorThread extends Thread { + public AcceptorThread() throws IOException { + setName("acceptor"); + } + public void run() { + try { + java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open(); + serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); + while (serverSocketChannel.isOpen()) { + if (acceptSelector.select(1000) > 0) { + Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + if (key.isAcceptable()) { + SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); + socketChannel.configureBlocking(false); + newChannels.add(socketChannel); + selector.wakeup(); + } + it.remove(); + } + } + } + } catch (IOException e) { + // ignore + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d3302c8..4e96411 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -12,25 +12,15 @@ */ package org.apache.kafka.common.network; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; import java.io.IOException; -import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import javax.net.ssl.SSLContext; @@ -40,11 +30,9 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.test.TestSslUtils; -import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,7 +44,7 @@ public class SslTransportLayerTest { private static final int BUFFER_SIZE = 4 * 1024; - private SslEchoServer server; + private NioEchoServer server; private Selector selector; private ChannelBuilder channelBuilder; private CertStores serverCertStores; @@ -91,13 +79,13 @@ public class SslTransportLayerTest { @Test public void testValidEndpointIdentification() throws Exception { String node = "0"; - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -113,12 +101,12 @@ public class SslTransportLayerTest { sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - waitForChannelClose(node); + NetworkTestUtils.waitForChannelClose(selector, node); } /** @@ -129,14 +117,14 @@ public class SslTransportLayerTest { public void testEndpointIdentificationDisabled() throws Exception { String node = "0"; String serverHost = InetAddress.getLocalHost().getHostAddress(); - server = new SslEchoServer(sslServerConfigs, serverHost); + server = new NioEchoServer(SecurityProtocol.SSL, sslServerConfigs, serverHost); server.start(); sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress(serverHost, server.port); + InetSocketAddress addr = new InetSocketAddress(serverHost, server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -147,12 +135,12 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequiredValidProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -164,12 +152,12 @@ public class SslTransportLayerTest { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - waitForChannelClose(node); + NetworkTestUtils.waitForChannelClose(selector, node); } /** @@ -180,16 +168,16 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequiredNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - waitForChannelClose(node); + NetworkTestUtils.waitForChannelClose(selector, node); } /** @@ -201,12 +189,12 @@ public class SslTransportLayerTest { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -217,16 +205,16 @@ public class SslTransportLayerTest { public void testClientAuthenticationDisabledNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -237,12 +225,12 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequestedValidProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -253,16 +241,16 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequestedNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 100, 10); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } /** @@ -303,12 +291,12 @@ public class SslTransportLayerTest { public void testInvalidKeyPassword() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid")); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - waitForChannelClose(node); + NetworkTestUtils.waitForChannelClose(selector, node); } /** @@ -318,14 +306,14 @@ public class SslTransportLayerTest { public void testUnsupportedTLSVersion() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1")); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - waitForChannelClose(node); + NetworkTestUtils.waitForChannelClose(selector, node); } /** @@ -336,14 +324,14 @@ public class SslTransportLayerTest { String node = "0"; String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites(); sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0])); - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1])); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - waitForChannelClose(node); + NetworkTestUtils.waitForChannelClose(selector, node); } /** @@ -352,12 +340,12 @@ public class SslTransportLayerTest { @Test public void testNetReadBufferResize() throws Exception { String node = "0"; - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs, 10, null, null); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 64000, 10); + NetworkTestUtils.checkClientConnection(selector, node, 64000, 10); } /** @@ -366,12 +354,12 @@ public class SslTransportLayerTest { @Test public void testNetWriteBufferResize() throws Exception { String node = "0"; - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs, null, 10, null); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 64000, 10); + NetworkTestUtils.checkClientConnection(selector, node, 64000, 10); } /** @@ -380,55 +368,12 @@ public class SslTransportLayerTest { @Test public void testApplicationBufferResize() throws Exception { String node = "0"; - createEchoServer(sslServerConfigs); + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); createSelector(sslClientConfigs, null, null, 10); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - testClientConnection(node, 64000, 10); - } - - private void testClientConnection(String node, int minMessageSize, int messageCount) throws Exception { - - String prefix = TestUtils.randomString(minMessageSize); - int requests = 0; - int responses = 0; - // wait for handshake to finish - while (!selector.isChannelReady(node)) { - selector.poll(1000L); - } - selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes()))); - requests++; - while (responses < messageCount) { - selector.poll(0L); - assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); - - for (NetworkReceive receive : selector.completedReceives()) { - assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload()))); - responses++; - } - - for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) { - selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes()))); - } - } - } - - private void waitForChannelClose(String node) throws IOException { - boolean closed = false; - for (int i = 0; i < 30; i++) { - selector.poll(1000L); - if (selector.channel(node) == null) { - closed = true; - break; - } - } - assertTrue(closed); - } - - private void createEchoServer(Map<String, Object> sslServerConfigs) throws Exception { - server = new SslEchoServer(sslServerConfigs, "localhost"); - server.start(); + NetworkTestUtils.checkClientConnection(selector, node, 64000, 10); } private void createSelector(Map<String, Object> sslClientConfigs) { @@ -455,32 +400,6 @@ public class SslTransportLayerTest { this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); } - private static class CertStores { - - Map<String, Object> sslConfig; - - CertStores(boolean server, String host) throws Exception { - String name = server ? "server" : "client"; - Mode mode = server ? Mode.SERVER : Mode.CLIENT; - File truststoreFile = File.createTempFile(name + "TS", ".jks"); - sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, host); - if (server) - sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); - } - - private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) { - Map<String, Object> config = new HashMap<>(sslConfig); - config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); - config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); - return config; - } - - private Map<String, Object> getUntrustingConfig() { - return sslConfig; - } - } - /** * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize * code path. The overridden buffer size starts with a small value and increases in size when the buffer @@ -537,117 +456,5 @@ public class SslTransportLayerTest { } } } - - // Non-blocking EchoServer implementation that uses SSLTransportLayer - private class SslEchoServer extends Thread { - private final int port; - private final ServerSocketChannel serverSocketChannel; - private final List<SocketChannel> newChannels; - private final List<SocketChannel> socketChannels; - private final AcceptorThread acceptorThread; - private SslFactory sslFactory; - private final Selector selector; - private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>(); - - public SslEchoServer(Map<String, ?> configs, String serverHost) throws Exception { - this.sslFactory = new SslFactory(Mode.SERVER); - this.sslFactory.configure(configs); - serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.configureBlocking(false); - serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0)); - this.port = serverSocketChannel.socket().getLocalPort(); - this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); - this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); - SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER); - channelBuilder.configure(sslServerConfigs); - this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); - setName("echoserver"); - setDaemon(true); - acceptorThread = new AcceptorThread(); - } - - @Override - public void run() { - try { - acceptorThread.start(); - while (serverSocketChannel.isOpen()) { - selector.poll(1000); - for (SocketChannel socketChannel : newChannels) { - String id = id(socketChannel); - selector.register(id, socketChannel); - socketChannels.add(socketChannel); - } - newChannels.clear(); - while (true) { - NetworkSend send = inflightSends.peek(); - if (send != null && !selector.channel(send.destination()).hasSend()) { - send = inflightSends.poll(); - selector.send(send); - } else - break; - } - List<NetworkReceive> completedReceives = selector.completedReceives(); - for (NetworkReceive rcv : completedReceives) { - NetworkSend send = new NetworkSend(rcv.source(), rcv.payload()); - if (!selector.channel(send.destination()).hasSend()) - selector.send(send); - else - inflightSends.add(send); - } - } - } catch (IOException e) { - // ignore - } - } - - private String id(SocketChannel channel) { - return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" + - channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort(); - } - - public void closeConnections() throws IOException { - for (SocketChannel channel : socketChannels) - channel.close(); - socketChannels.clear(); - } - - public void close() throws IOException, InterruptedException { - this.serverSocketChannel.close(); - closeConnections(); - acceptorThread.interrupt(); - acceptorThread.join(); - interrupt(); - join(); - } - - private class AcceptorThread extends Thread { - public AcceptorThread() throws IOException { - setName("acceptor"); - } - public void run() { - try { - - java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open(); - serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); - while (serverSocketChannel.isOpen()) { - if (acceptSelector.select(1000) > 0) { - Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator(); - while (it.hasNext()) { - SelectionKey key = it.next(); - if (key.isAcceptable()) { - SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); - socketChannel.configureBlocking(false); - newChannels.add(socketChannel); - selector.wakeup(); - } - } - } - } - } catch (IOException e) { - // ignore - } - } - } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java new file mode 100644 index 0000000..0a4928b --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import static org.junit.Assert.fail; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.CertStores; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.NetworkTestUtils; +import org.apache.kafka.common.network.NioEchoServer; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.SaslHandshakeRequest; +import org.apache.kafka.common.security.JaasUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the Sasl authenticator. These use a test harness that runs a simple socket server that echos back responses. + */ +public class SaslAuthenticatorTest { + + private static final int BUFFER_SIZE = 4 * 1024; + + private NioEchoServer server; + private Selector selector; + private ChannelBuilder channelBuilder; + private CertStores serverCertStores; + private CertStores clientCertStores; + private Map<String, Object> saslClientConfigs; + private Map<String, Object> saslServerConfigs; + + @Before + public void setup() throws Exception { + serverCertStores = new CertStores(true, "localhost"); + clientCertStores = new CertStores(false, "localhost"); + saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); + saslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); + } + + @After + public void teardown() throws Exception { + if (server != null) + this.server.close(); + if (selector != null) + this.selector.close(); + } + + /** + * Tests good path SASL/PLAIN client and server channels using SSL transport layer. + */ + @Test + public void testValidSaslPlainOverSsl() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createAndCheckClientConnection(securityProtocol, node); + } + + /** + * Tests good path SASL/PLAIN client and server channels using PLAINTEXT transport layer. + */ + @Test + public void testValidSaslPlainOverPlaintext() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createAndCheckClientConnection(securityProtocol, node); + } + + /** + * Tests that SASL/PLAIN clients with invalid password fail authentication. + */ + @Test + public void testInvalidPasswordSaslPlain() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword"); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createClientConnection(securityProtocol, node); + NetworkTestUtils.waitForChannelClose(selector, node); + } + + /** + * Tests that SASL/PLAIN clients with invalid username fail authentication. + */ + @Test + public void testInvalidUsernameSaslPlain() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createClientConnection(securityProtocol, node); + NetworkTestUtils.waitForChannelClose(selector, node); + } + + /** + * Tests that SASL/PLAIN clients without valid username fail authentication. + */ + @Test + public void testMissingUsernameSaslPlain() throws Exception { + String node = "0"; + TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + jaasConfig.setPlainClientOptions(null, "mypassword"); + + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createSelector(securityProtocol, saslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); + try { + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + fail("SASL/PLAIN channel created without username"); + } catch (KafkaException e) { + // Expected exception + } + } + + /** + * Tests that SASL/PLAIN clients with missing password in JAAS configuration fail authentication. + */ + @Test + public void testMissingPasswordSaslPlain() throws Exception { + String node = "0"; + TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + jaasConfig.setPlainClientOptions("myuser", null); + + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createSelector(securityProtocol, saslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); + try { + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + fail("SASL/PLAIN channel created without password"); + } catch (KafkaException e) { + // Expected exception + } + } + + /** + * Tests that mechanisms that are not supported in Kafka can be plugged in without modifying + * Kafka code if Sasl client and server providers are available. + */ + @Test + public void testMechanismPluggability() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5")); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createAndCheckClientConnection(securityProtocol, node); + } + + /** + * Tests that servers supporting multiple SASL mechanisms work with clients using + * any of the enabled mechanisms. + */ + @Test + public void testMultipleServerMechanisms() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN")); + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + + String node1 = "1"; + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + createAndCheckClientConnection(securityProtocol, node1); + + String node2 = "2"; + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "DIGEST-MD5"); + createSelector(securityProtocol, saslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); + selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.checkClientConnection(selector, node2, 100, 10); + } + + /** + * Tests that any invalid data during Kafka SASL handshake request flow + * or the actual SASL authentication flow result in authentication failure + * and do not cause any failures in the server. + */ + @Test + public void testInvalidSaslPacket() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + + // Send invalid SASL packet after valid handshake request + String node1 = "invalid1"; + createClientConnection(SecurityProtocol.PLAINTEXT, node1); + sendHandshakeRequest(node1); + Random random = new Random(); + byte[] bytes = new byte[1024]; + random.nextBytes(bytes); + selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes))); + NetworkTestUtils.waitForChannelClose(selector, node1); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good1"); + + // Send invalid SASL packet before handshake request + String node2 = "invalid2"; + createClientConnection(SecurityProtocol.PLAINTEXT, node2); + random.nextBytes(bytes); + selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes))); + NetworkTestUtils.waitForChannelClose(selector, node2); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good2"); + } + + /** + * Tests that packets that are too big during Kafka SASL handshake request flow + * or the actual SASL authentication flow result in authentication failure + * and do not cause any failures in the server. + */ + @Test + public void testPacketSizeTooBig() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + + // Send SASL packet with large size after valid handshake request + String node1 = "invalid1"; + createClientConnection(SecurityProtocol.PLAINTEXT, node1); + sendHandshakeRequest(node1); + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.putInt(Integer.MAX_VALUE); + buffer.put(new byte[buffer.capacity() - 4]); + buffer.rewind(); + selector.send(new NetworkSend(node1, buffer)); + NetworkTestUtils.waitForChannelClose(selector, node1); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good1"); + + // Send packet with large size before handshake request + String node2 = "invalid2"; + createClientConnection(SecurityProtocol.PLAINTEXT, node2); + buffer.clear(); + buffer.putInt(Integer.MAX_VALUE); + buffer.put(new byte[buffer.capacity() - 4]); + buffer.rewind(); + selector.send(new NetworkSend(node2, buffer)); + NetworkTestUtils.waitForChannelClose(selector, node2); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good2"); + } + + /** + * Tests that Kafka requests that are forbidden until successful authentication result + * in authentication failure and do not cause any failures in the server. + */ + @Test + public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + + // Send metadata request before Kafka SASL handshake request + String node1 = "invalid1"; + createClientConnection(SecurityProtocol.PLAINTEXT, node1); + RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 1); + MetadataRequest metadataRequest1 = new MetadataRequest(Collections.singletonList("sometopic")); + selector.send(new NetworkSend(node1, RequestSend.serialize(metadataRequestHeader1, metadataRequest1.toStruct()))); + NetworkTestUtils.waitForChannelClose(selector, node1); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good1"); + + // Send metadata request after Kafka SASL handshake request + String node2 = "invalid2"; + createClientConnection(SecurityProtocol.PLAINTEXT, node2); + sendHandshakeRequest(node2); + RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 2); + MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic")); + selector.send(new NetworkSend(node2, RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct()))); + NetworkTestUtils.waitForChannelClose(selector, node2); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good2"); + } + + /** + * Tests that connections cannot be created if the login module class is unavailable. + */ + @Test + public void testInvalidLoginModule() throws Exception { + TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions()); + + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + try { + createSelector(securityProtocol, saslClientConfigs); + fail("SASL/PLAIN channel created without valid login module"); + } catch (KafkaException e) { + // Expected exception + } + } + + /** + * Tests that mechanisms with default implementation in Kafka may be disabled in + * the Kafka server by removing from the enabled mechanism list. + */ + @Test + public void testDisabledMechanism() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5")); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createClientConnection(securityProtocol, node); + NetworkTestUtils.waitForChannelClose(selector, node); + } + + /** + * Tests that clients using invalid SASL mechanisms fail authentication. + */ + @Test + public void testInvalidMechanism() throws Exception { + String node = "0"; + SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID"); + + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + createClientConnection(securityProtocol, node); + NetworkTestUtils.waitForChannelClose(selector, node); + } + + private TestJaasConfig configureMechanisms(String clientMechanism, List<String> serverMechanisms) { + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism); + saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms); + return TestJaasConfig.createConfiguration(clientMechanism, serverMechanisms); + } + + private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) { + String saslMechanism = (String) saslClientConfigs.get(SaslConfigs.SASL_MECHANISM); + this.channelBuilder = ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, clientConfigs, saslMechanism, true); + this.selector = NetworkTestUtils.createSelector(channelBuilder); + } + + private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception { + createSelector(securityProtocol, saslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + } + + private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String node) throws Exception { + createClientConnection(securityProtocol, node); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + selector.close(); + selector = null; + } + + private void sendHandshakeRequest(String node) throws Exception { + RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, "someclient", 1); + SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN"); + selector.send(new NetworkSend(node, RequestSend.serialize(header, handshakeRequest.toStruct()))); + int waitSeconds = 10; + do { + selector.poll(1000); + } while (selector.completedSends().isEmpty() && waitSeconds-- > 0); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java new file mode 100644 index 0000000..2923a5a --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import java.io.IOException; +import java.security.Provider; +import java.security.Security; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslServerFactory; + +import org.apache.kafka.common.security.plain.PlainLoginModule; + +/** + * Digest-MD5 login module for multi-mechanism tests. Since callback handlers are not configurable in Kafka + * yet, this replaces the standard Digest-MD5 SASL server provider with one that invokes the test callback handler. + * This login module uses the same format as PlainLoginModule and hence simply reuses the same methods. + * + */ +public class TestDigestLoginModule extends PlainLoginModule { + + private static final SaslServerFactory STANDARD_DIGEST_SASL_SERVER_FACTORY; + static { + SaslServerFactory digestSaslServerFactory = null; + Enumeration<SaslServerFactory> factories = Sasl.getSaslServerFactories(); + Map<String, Object> emptyProps = new HashMap<>(); + while (factories.hasMoreElements()) { + SaslServerFactory factory = factories.nextElement(); + if (Arrays.asList(factory.getMechanismNames(emptyProps)).contains("DIGEST-MD5")) { + digestSaslServerFactory = factory; + break; + } + } + STANDARD_DIGEST_SASL_SERVER_FACTORY = digestSaslServerFactory; + Security.insertProviderAt(new DigestSaslServerProvider(), 1); + } + + public static class DigestServerCallbackHandler implements CallbackHandler { + + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + for (Callback callback : callbacks) { + if (callback instanceof NameCallback) { + NameCallback nameCallback = (NameCallback) callback; + nameCallback.setName(nameCallback.getDefaultName()); + } else if (callback instanceof PasswordCallback) { + PasswordCallback passwordCallback = (PasswordCallback) callback; + passwordCallback.setPassword(TestJaasConfig.PASSWORD.toCharArray()); + } else if (callback instanceof RealmCallback) { + RealmCallback realmCallback = (RealmCallback) callback; + realmCallback.setText(realmCallback.getDefaultText()); + } else if (callback instanceof AuthorizeCallback) { + AuthorizeCallback authCallback = (AuthorizeCallback) callback; + if (TestJaasConfig.USERNAME.equals(authCallback.getAuthenticationID())) { + authCallback.setAuthorized(true); + authCallback.setAuthorizedID(authCallback.getAuthenticationID()); + } + } + } + } + } + + public static class DigestSaslServerFactory implements SaslServerFactory { + + @Override + public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) + throws SaslException { + return STANDARD_DIGEST_SASL_SERVER_FACTORY.createSaslServer(mechanism, protocol, serverName, props, new DigestServerCallbackHandler()); + } + + @Override + public String[] getMechanismNames(Map<String, ?> props) { + return new String[] {"DIGEST-MD5"}; + } + } + + public static class DigestSaslServerProvider extends Provider { + + private static final long serialVersionUID = 1L; + + protected DigestSaslServerProvider() { + super("Test SASL/Digest-MD5 Server Provider", 1.0, "Test SASL/Digest-MD5 Server Provider for Kafka"); + super.put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java new file mode 100644 index 0000000..2291cc1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; + +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.plain.PlainLoginModule; + +public class TestJaasConfig extends Configuration { + + static final String USERNAME = "myuser"; + static final String PASSWORD = "mypassword"; + + private Map<String, AppConfigurationEntry[]> entryMap = new HashMap<>(); + + public static TestJaasConfig createConfiguration(String clientMechanism, List<String> serverMechanisms) { + TestJaasConfig config = new TestJaasConfig(); + config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions()); + for (String mechanism : serverMechanisms) { + config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions()); + } + Configuration.setConfiguration(config); + return config; + } + + public void setPlainClientOptions(String clientUsername, String clientPassword) { + Map<String, Object> options = new HashMap<>(); + if (clientUsername != null) + options.put("username", clientUsername); + if (clientPassword != null) + options.put("password", clientPassword); + createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options); + } + + public void createOrUpdateEntry(String name, String loginModule, Map<String, Object> options) { + AppConfigurationEntry entry = new AppConfigurationEntry(loginModule, LoginModuleControlFlag.REQUIRED, options); + entryMap.put(name, new AppConfigurationEntry[] {entry}); + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return entryMap.get(name); + } + + private static String loginModule(String mechanism) { + String loginModule; + switch (mechanism) { + case "PLAIN": + loginModule = PlainLoginModule.class.getName(); + break; + case "DIGEST-MD5": + loginModule = TestDigestLoginModule.class.getName(); + break; + default: + throw new IllegalArgumentException("Unsupported mechanism " + mechanism); + } + return loginModule; + } + + public static Map<String, Object> defaultClientOptions() { + Map<String, Object> options = new HashMap<>(); + options.put("username", USERNAME); + options.put("password", PASSWORD); + return options; + } + + public static Map<String, Object> defaultServerOptions() { + Map<String, Object> options = new HashMap<>(); + options.put("user_" + USERNAME, PASSWORD); + return options; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 870caca..fec96cd 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -78,6 +78,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { val kafkaPrincipal: String override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + protected def kafkaClientSaslMechanism = "GSSAPI" + protected def kafkaServerSaslMechanisms = List("GSSAPI") + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) val topicResource = new Resource(Topic, topic) val groupResource = new Resource(Group, group) @@ -143,7 +146,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { case SecurityProtocol.SSL => startSasl(ZkSasl, null, null) case _ => - startSasl(Both, List("GSSAPI"), List("GSSAPI")) + startSasl(Both, List(kafkaClientSaslMechanism), kafkaServerSaslMechanisms) } super.setUp AclCommand.main(topicBrokerReadAclArgs) http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index d203245..fc79c60 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -27,7 +27,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms)) + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) @Test def testMultipleBrokerMechanisms() { @@ -35,7 +35,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne val plainSaslProducer = producers(0) val plainSaslConsumer = consumers(0) - val gssapiSaslProperties = kafkaSaslProperties("GSSAPI", kafkaServerSaslMechanisms) + val gssapiSaslProperties = kafkaSaslProperties("GSSAPI") val gssapiSaslProducer = TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index 687cfc3..bdca577 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -23,5 +23,5 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarne this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms)) + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala new file mode 100644 index 0000000..63636c0 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import kafka.server.KafkaConfig +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslPlainSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { + override protected def securityProtocol = SecurityProtocol.SASL_SSL + override protected def kafkaClientSaslMechanism = "PLAIN" + override protected def kafkaServerSaslMechanisms = List("PLAIN") + override val clientPrincipal = "testuser" + override val kafkaPrincipal = "admin" +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index acc86e3..765f191 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -18,11 +18,14 @@ package kafka.api import java.io.File +import java.util.Properties import javax.security.auth.login.Configuration import kafka.security.minikdc.MiniKdc +import kafka.server.KafkaConfig import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.authenticator.LoginManager +import org.apache.kafka.common.config.SaslConfigs /* * Implements an enumeration for the modes enabled here: @@ -81,4 +84,14 @@ trait SaslSetup { System.clearProperty("zookeeper.authProvider.1") Configuration.setConfiguration(null) } + + def kafkaSaslProperties(clientSaslMechanism: String, serverSaslMechanisms: Option[Seq[String]] = None) = { + val props = new Properties + props.put(SaslConfigs.SASL_MECHANISM, clientSaslMechanism) + serverSaslMechanisms.foreach { serverMechanisms => + props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, clientSaslMechanism) + props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms.mkString(",")) + } + props + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3a496f48/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index 5531919..8fd3eb4 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -13,11 +13,7 @@ package kafka.api import kafka.zk.ZooKeeperTestHarness -import kafka.server.KafkaConfig import org.junit.{After, Before} -import java.util.Properties -import scala.collection.JavaConverters._ -import org.apache.kafka.common.config.SaslConfigs trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { protected val zkSaslEnabled: Boolean @@ -42,13 +38,4 @@ trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { super.tearDown closeSasl() } - - def kafkaSaslProperties(kafkaClientSaslMechanism: String, kafkaServerSaslMechanisms: List[String]) = { - val props = new Properties - props.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism) - props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism) - props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, kafkaServerSaslMechanisms.asJava) - props - } - }
