KAFKA-3618; Handle ApiVersionsRequest before SASL authentication Server-side implementation and tests for handling ApiVersionsRequest before SaslHandshakeRequest.
Author: Rajini Sivaram <[email protected]> Reviewers: Gwen Shapira, Ismael Juma Closes #1286 from rajinisivaram/KAFKA-3618 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69d9a669 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69d9a669 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69d9a669 Branch: refs/heads/0.10.0 Commit: 69d9a669d7bbfec1e33dd6177c5687ef7f9977df Parents: e503273 Author: Rajini Sivaram <[email protected]> Authored: Fri Apr 29 11:15:20 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Fri Apr 29 11:15:20 2016 -0700 ---------------------------------------------------------------------- .../common/requests/ApiVersionsResponse.java | 14 ++ .../authenticator/SaslServerAuthenticator.java | 10 ++ .../kafka/common/network/NetworkTestUtils.java | 14 +- .../authenticator/SaslAuthenticatorTest.java | 159 ++++++++++++++++++- .../src/main/scala/kafka/server/KafkaApis.scala | 12 +- .../kafka/server/ApiVersionsRequestTest.scala | 23 +-- .../unit/kafka/server/ApiVersionsTest.scala | 6 +- .../unit/kafka/server/BaseRequestTest.scala | 24 +-- .../server/SaslApiVersionsRequestTest.scala | 78 +++++++++ 9 files changed, 297 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 36881a3..fe995b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Protocol; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -29,6 +30,7 @@ import java.util.Map; public class ApiVersionsResponse extends AbstractRequestResponse { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id); + private static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(); public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String API_VERSIONS_KEY_NAME = "api_versions"; @@ -106,6 +108,18 @@ public class ApiVersionsResponse extends AbstractRequestResponse { return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList()); } + public static ApiVersionsResponse apiVersionsResponse() { + return API_VERSIONS_RESPONSE; + } + + private static ApiVersionsResponse createApiVersionsResponse() { + List<ApiVersion> versionList = new ArrayList<>(); + for (ApiKeys apiKey : ApiKeys.values()) { + versionList.add(new ApiVersion(apiKey.id, Protocol.MIN_VERSIONS[apiKey.id], Protocol.CURR_VERSION[apiKey.id])); + } + return new ApiVersionsResponse(Errors.NONE.code(), versionList); + } + private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) { Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>(); for (ApiVersion apiVersion: apiVersions) { http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 89c6e6c..a9c19a5 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -61,6 +61,8 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractRequestResponse; +import org.apache.kafka.common.requests.ApiVersionsRequest; +import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.requests.ResponseSend; @@ -290,7 +292,11 @@ public class SaslServerAuthenticator implements Authenticator { isKafkaRequest = true; ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); + LOG.debug("Handle Kafka request {}", apiKey); switch (apiKey) { + case API_VERSIONS: + handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request); + break; case SASL_HANDSHAKE: clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request); break; @@ -336,6 +342,10 @@ public class SaslServerAuthenticator implements Authenticator { } } + private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException { + sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse()); + } + private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response) throws IOException { ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId()); netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct())); http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java index 53ba954..969055d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java @@ -42,13 +42,10 @@ public class NetworkTestUtils { public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception { + waitForChannelReady(selector, node); String prefix = TestUtils.randomString(minMessageSize); int requests = 0; int responses = 0; - // wait for handshake to finish - while (!selector.isChannelReady(node)) { - selector.poll(1000L); - } selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes()))); requests++; while (responses < messageCount) { @@ -66,6 +63,15 @@ public class NetworkTestUtils { } } + public static void waitForChannelReady(Selector selector, String node) throws IOException { + // wait for handshake to finish + int secondsLeft = 30; + while (!selector.isChannelReady(node) && secondsLeft-- > 0) { + selector.poll(1000L); + } + assertTrue(selector.isChannelReady(node)); + } + public static void waitForChannelClose(Selector selector, String node) throws IOException { boolean closed = false; for (int i = 0; i < 30; i++) { http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 0a4928b..368b5a7 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -17,11 +17,14 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.network.CertStores; @@ -34,11 +37,18 @@ import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.Protocol; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractRequestResponse; +import org.apache.kafka.common.requests.ApiVersionsRequest; +import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.SaslHandshakeRequest; +import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.JaasUtils; import org.junit.After; import org.junit.Before; @@ -210,6 +220,30 @@ public class SaslAuthenticatorTest { } /** + * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator + * prior to SASL handshake flow and that subsequent authentication succeeds + * when transport layer is PLAINTEXT. This test simulates SASL authentication using a + * (non-SASL) PLAINTEXT client and sends ApiVersionsRequest straight after + * connection to the server is established, before any SASL-related packets are sent. + */ + @Test + public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws Exception { + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT); + } + + /** + * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator + * prior to SASL handshake flow and that subsequent authentication succeeds + * when transport layer is SSL. This test simulates SASL authentication using a + * (non-SASL) SSL client and sends ApiVersionsRequest straight after + * SSL handshake, before any SASL-related packets are sent. + */ + @Test + public void testUnauthenticatedApiVersionsRequestOverSsl() throws Exception { + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL); + } + + /** * Tests that any invalid data during Kafka SASL handshake request flow * or the actual SASL authentication flow result in authentication failure * and do not cause any failures in the server. @@ -223,7 +257,7 @@ public class SaslAuthenticatorTest { // Send invalid SASL packet after valid handshake request String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - sendHandshakeRequest(node1); + sendHandshakeRequestReceiveResponse(node1); Random random = new Random(); byte[] bytes = new byte[1024]; random.nextBytes(bytes); @@ -247,6 +281,33 @@ public class SaslAuthenticatorTest { } /** + * Tests that ApiVersionsRequest after Kafka SASL handshake request flow, + * but prior to actual SASL authentication, results in authentication failure. + * This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)} + * where a non-SASL client is used to send requests that are processed by + * {@link SaslServerAuthenticator} of the server prior to client authentication. + */ + @Test + public void testInvalidApiVersionsRequestSequence() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + + // Send handshake request followed by ApiVersionsRequest + String node1 = "invalid1"; + createClientConnection(SecurityProtocol.PLAINTEXT, node1); + sendHandshakeRequestReceiveResponse(node1); + + RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, "someclient", 2); + selector.send(new NetworkSend(node1, RequestSend.serialize(versionsHeader, new ApiVersionsRequest().toStruct()))); + NetworkTestUtils.waitForChannelClose(selector, node1); + selector.close(); + + // Test good connection still works + createAndCheckClientConnection(securityProtocol, "good1"); + } + + /** * Tests that packets that are too big during Kafka SASL handshake request flow * or the actual SASL authentication flow result in authentication failure * and do not cause any failures in the server. @@ -260,7 +321,7 @@ public class SaslAuthenticatorTest { // Send SASL packet with large size after valid handshake request String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - sendHandshakeRequest(node1); + sendHandshakeRequestReceiveResponse(node1); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.putInt(Integer.MAX_VALUE); buffer.put(new byte[buffer.capacity() - 4]); @@ -312,7 +373,7 @@ public class SaslAuthenticatorTest { // Send metadata request after Kafka SASL handshake request String node2 = "invalid2"; createClientConnection(SecurityProtocol.PLAINTEXT, node2); - sendHandshakeRequest(node2); + sendHandshakeRequestReceiveResponse(node2); RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 2); MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic")); selector.send(new NetworkSend(node2, RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct()))); @@ -371,6 +432,68 @@ public class SaslAuthenticatorTest { NetworkTestUtils.waitForChannelClose(selector, node); } + /** + * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator + * prior to SASL handshake flow and that subsequent authentication succeeds + * when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client that simulates + * SASL authentication after ApiVersionsRequest. + * <p> + * Test sequence (using <tt>securityProtocol=PLAINTEXT</tt> as an example): + * <ol> + * <li>Starts a SASL_PLAINTEXT test server that simply echoes back client requests after authentication.</li> + * <li>A (non-SASL) PLAINTEXT test client connects to the SASL server port. Client is now unauthenticated.<./li> + * <li>The unauthenticated non-SASL client sends an ApiVersionsRequest and validates the response. + * A valid response indicates that {@link SaslServerAuthenticator} of the test server responded to + * the ApiVersionsRequest even though the client is not yet authenticated.</li> + * <li>The unauthenticated non-SASL client sends a SaslHandshakeRequest and validates the response. A valid response + * indicates that {@link SaslServerAuthenticator} of the test server responded to the SaslHandshakeRequest + * after processing ApiVersionsRequest.</li> + * <li>The unauthenticated non-SASL client sends the SASL/PLAIN packet containing username/password to authenticate + * itself. The client is now authenticated by the server. At this point this test client is at the + * same state as a regular SASL_PLAINTEXT client that is <tt>ready</tt>.</li> + * <li>The authenticated client sends random data to the server and checks that the data is echoed + * back by the test server (ie, not Kafka request-response) to ensure that the client now + * behaves exactly as a regular SASL_PLAINTEXT client that has completed authentication.</li> + * </ol> + */ + private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception { + configureMechanisms("PLAIN", Arrays.asList("PLAIN")); + server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + + // Create non-SASL connection to manually authenticate after ApiVersionsRequest + String node = "1"; + SecurityProtocol clientProtocol; + switch (securityProtocol) { + case SASL_PLAINTEXT: + clientProtocol = SecurityProtocol.PLAINTEXT; + break; + case SASL_SSL: + clientProtocol = SecurityProtocol.SSL; + break; + default: + throw new IllegalArgumentException("Server protocol " + securityProtocol + " is not SASL"); + } + createClientConnection(clientProtocol, node); + NetworkTestUtils.waitForChannelReady(selector, node); + + // Send ApiVersionsRequest and check response + ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node); + assertEquals(Protocol.MIN_VERSIONS[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion); + assertEquals(Protocol.CURR_VERSION[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion); + + // Send SaslHandshakeRequest and check response + SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node); + assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms()); + + // Authenticate using PLAIN username/password + String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD; + selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8")))); + waitForResponse(); + + // Check send/receive on the manually authenticated connection + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + } + private TestJaasConfig configureMechanisms(String clientMechanism, List<String> serverMechanisms) { saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism); saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms); @@ -396,13 +519,35 @@ public class SaslAuthenticatorTest { selector = null; } - private void sendHandshakeRequest(String node) throws Exception { - RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, "someclient", 1); + private Struct sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequestResponse request) throws IOException { + RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1); + selector.send(new NetworkSend(node, RequestSend.serialize(header, request.toStruct()))); + ByteBuffer responseBuffer = waitForResponse(); + return NetworkClient.parseResponse(responseBuffer, header); + } + + private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception { SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN"); - selector.send(new NetworkSend(node, RequestSend.serialize(header, handshakeRequest.toStruct()))); + Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); + SaslHandshakeResponse response = new SaslHandshakeResponse(responseStruct); + assertEquals(Errors.NONE.code(), response.errorCode()); + return response; + } + + private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception { + ApiVersionsRequest handshakeRequest = new ApiVersionsRequest(); + Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest); + ApiVersionsResponse response = new ApiVersionsResponse(responseStruct); + assertEquals(Errors.NONE.code(), response.errorCode()); + return response; + } + + private ByteBuffer waitForResponse() throws IOException { int waitSeconds = 10; do { selector.poll(1000); - } while (selector.completedSends().isEmpty() && waitSeconds-- > 0); + } while (selector.completedReceives().isEmpty() && waitSeconds-- > 0); + assertEquals(1, selector.completedReceives().size()); + return selector.completedReceives().get(0).payload(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 67d46fc..cf7814e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -47,16 +47,6 @@ import scala.collection._ import scala.collection.JavaConverters._ import org.apache.kafka.common.requests.SaslHandshakeResponse -object KafkaApis { - val apiVersionsResponse = new ApiVersionsResponse(Errors.NONE.code, buildApiKeysToApiVersions.values.toList.asJava) - - private def buildApiKeysToApiVersions: Map[Short, ApiVersionsResponse.ApiVersion] = { - ApiKeys.values.map(apiKey => - apiKey.id -> new ApiVersionsResponse.ApiVersion(apiKey.id, Protocol.MIN_VERSIONS(apiKey.id), Protocol.CURR_VERSION(apiKey.id))).toMap - } -} - - /** * Logic to handle the various Kafka requests */ @@ -1041,7 +1031,7 @@ class KafkaApis(val requestChannel: RequestChannel, val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) && request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id) val responseBody = if (isApiVersionsRequestVersionSupported) - KafkaApis.apiVersionsResponse + ApiVersionsResponse.apiVersionsResponse else ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index ed59930..8bf4d73 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -25,16 +25,10 @@ import org.junit.Test import scala.collection.JavaConversions._ -class ApiVersionsRequestTest extends BaseRequestTest { - - override def numBrokers: Int = 1 - - @Test - def testApiVersionsRequest() { - val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, 0) - +object ApiVersionsRequestTest { + def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse) { assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.", ApiKeys.values.length, apiVersionsResponse.apiVersions.size) - for (expectedApiVersion: ApiVersion <- KafkaApis.apiVersionsResponse.apiVersions) { + for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.apiVersionsResponse.apiVersions) { val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey) assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion) assertEquals("API key must be supported by the broker.", expectedApiVersion.apiKey, actualApiVersion.apiKey) @@ -42,6 +36,17 @@ class ApiVersionsRequestTest extends BaseRequestTest { assertEquals(s"Received unexpected max version for API key ${actualApiVersion.apiKey}.", expectedApiVersion.maxVersion, actualApiVersion.maxVersion) } } +} + +class ApiVersionsRequestTest extends BaseRequestTest { + + override def numBrokers: Int = 1 + + @Test + def testApiVersionsRequest() { + val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, 0) + ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse) + } private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse = { val response = send(request, ApiKeys.API_VERSIONS, version) http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala index 4429f26..177b509 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala @@ -17,7 +17,7 @@ package unit.kafka.server -import kafka.server.KafkaApis +import org.apache.kafka.common.requests.ApiVersionsResponse import org.apache.kafka.common.protocol.{Protocol, ApiKeys} import org.junit.Assert._ import org.junit.Test @@ -26,11 +26,11 @@ class ApiVersionsTest { @Test def testApiVersions { - val apiVersions = KafkaApis.apiVersionsResponse.apiVersions + val apiVersions = ApiVersionsResponse.apiVersionsResponse.apiVersions assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length) for (key <- ApiKeys.values) { - val version = KafkaApis.apiVersionsResponse.apiVersion(key.id) + val version = ApiVersionsResponse.apiVersionsResponse.apiVersion(key.id) assertNotNull(s"Could not find ApiVersion for API ${key.name}", version) assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id)) assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id)) http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index d92ccea..906c4b2 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -39,7 +39,9 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { protected def propertyOverrides(properties: Properties) {} def generateConfigs() = { - val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) + val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false, + interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile, saslProperties = saslProperties) props.foreach(propertyOverrides) props.map(KafkaConfig.fromProps) } @@ -57,7 +59,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available")) } - private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = { + def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = { new Socket("localhost", s.boundPort(protocol)) } @@ -76,20 +78,24 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { response } - private def requestAndReceive(request: Array[Byte]): Array[Byte] = { - val plainSocket = connect() + def requestAndReceive(socket: Socket, request: Array[Byte]): Array[Byte] = { + sendRequest(socket, request) + receiveResponse(socket) + } + + def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = { + val socket = connect() try { - sendRequest(plainSocket, request) - receiveResponse(plainSocket) + send(socket, request, apiKey, version) } finally { - plainSocket.close() + socket.close() } } /** * Serializes and send the request to the given api. A ByteBuffer containing the response is returned. */ - def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = { + def send(socket: Socket, request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = { correlationId += 1 val serializedBytes = { val header = new RequestHeader(apiKey.id, version, "", correlationId) @@ -99,7 +105,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { byteBuffer.array() } - val response = requestAndReceive(serializedBytes) + val response = requestAndReceive(socket, serializedBytes) val responseBuffer = ByteBuffer.wrap(response) ResponseHeader.parse(responseBuffer) // Parse the header to ensure its valid and move the buffer forward http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala new file mode 100644 index 0000000..632665a --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.io.IOException +import java.net.Socket +import java.util.Collections +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} +import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} +import org.apache.kafka.common.requests.SaslHandshakeRequest +import org.apache.kafka.common.requests.SaslHandshakeResponse +import org.apache.kafka.common.protocol.Errors +import org.junit.Test +import org.junit.Assert._ +import kafka.api.SaslTestHarness + +class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + override protected val kafkaClientSaslMechanism = "PLAIN" + override protected val kafkaServerSaslMechanisms = List("PLAIN") + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + override protected val zkSaslEnabled = false + override def numBrokers = 1 + + @Test + def testApiVersionsRequestBeforeSaslHandshakeRequest() { + val plaintextSocket = connect(protocol = securityProtocol) + try { + val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0) + ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse) + sendSaslHandshakeRequestValidateResponse(plaintextSocket) + } finally { + plaintextSocket.close() + } + } + + @Test + def testApiVersionsRequestAfterSaslHandshakeRequest() { + val plaintextSocket = connect(protocol = securityProtocol) + try { + sendSaslHandshakeRequestValidateResponse(plaintextSocket) + try { + sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0) + fail("Versions Request during Sasl handshake did not fail") + } catch { + case ioe: IOException => // expected exception + } + } finally { + plaintextSocket.close() + } + } + + private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, version: Short): ApiVersionsResponse = { + val response = send(socket, request, ApiKeys.API_VERSIONS, version) + ApiVersionsResponse.parse(response) + } + + private def sendSaslHandshakeRequestValidateResponse(socket: Socket) { + val response = send(socket, new SaslHandshakeRequest("PLAIN"), ApiKeys.SASL_HANDSHAKE, 0) + val handshakeResponse = SaslHandshakeResponse.parse(response) + assertEquals(Errors.NONE.code, handshakeResponse.errorCode()) + assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms()) + } +}
