This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 015d7aede6cbd350d56d75006930dd2bf89a4a5a Author: Manikumar Reddy <manikumar.re...@gmail.com> AuthorDate: Mon May 16 19:25:02 2022 +0530 MINOR: Add configurable max receive size for SASL authentication requests This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication. Reviewers: Tom Bentley <tbent...@redhat.com>, Mickael Maison <mickael.mai...@gmail.com> Co-authored-by: Manikumar Reddy <manikumar.re...@gmail.com> Co-authored-by: Mickael Maison <mickael.mai...@gmail.com> --- checkstyle/suppressions.xml | 2 + .../config/internals/BrokerSecurityConfigs.java | 6 +++ .../authenticator/SaslServerAuthenticator.java | 16 ++++++-- .../kafka/common/security/TestSecurityConfig.java | 2 + .../authenticator/SaslAuthenticatorTest.java | 46 ++++++++++++++++++++++ .../authenticator/SaslServerAuthenticatorTest.java | 6 +-- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++ .../scala/unit/kafka/server/KafkaConfigTest.scala | 2 + 8 files changed, 77 insertions(+), 7 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 55fcd1a9e5..7c32223961 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -49,6 +49,8 @@ files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/> <suppress checks="ClassFanOutComplexity" files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/> + <suppress checks="NPath" + files="SaslServerAuthenticator.java"/> <suppress checks="ClassFanOutComplexity" files="Errors.java"/> <suppress checks="ClassFanOutComplexity" diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java index 0b90da8f80..8b7a9649c2 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java @@ -36,6 +36,8 @@ public class BrokerSecurityConfigs { public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class"; public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules"; public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms"; + public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288; + public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size"; public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " + "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " + @@ -89,4 +91,8 @@ public class BrokerSecurityConfigs { + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently " + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL " + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000"; + + public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum receive size allowed before and during initial SASL authentication." + + " Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice," + + " PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits."; } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 019723b6b4..51b02952a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.SaslAuthenticateResponseData; import org.apache.kafka.common.message.SaslHandshakeResponseData; +import org.apache.kafka.common.network.InvalidReceiveException; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.ByteBufferSend; import org.apache.kafka.common.network.ChannelBuilders; @@ -88,8 +89,6 @@ import java.util.Optional; import java.util.function.Supplier; public class SaslServerAuthenticator implements Authenticator { - // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms - static final int MAX_RECEIVE_SIZE = 524288; private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class); /** @@ -140,6 +139,7 @@ public class SaslServerAuthenticator implements Authenticator { private String saslMechanism; // buffers used in `authenticate` + private Integer saslAuthRequestMaxReceiveSize; private NetworkReceive netInBuffer; private Send netOutBuffer; private Send authenticationFailureSend = null; @@ -189,6 +189,10 @@ public class SaslServerAuthenticator implements Authenticator { // Note that the old principal builder does not support SASL, so we do not need to pass the // authenticator or the transport layer this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, kerberosNameParser, null); + + saslAuthRequestMaxReceiveSize = (Integer) configs.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG); + if (saslAuthRequestMaxReceiveSize == null) + saslAuthRequestMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE; } private void createSaslServer(String mechanism) throws IOException { @@ -252,9 +256,13 @@ public class SaslServerAuthenticator implements Authenticator { } // allocate on heap (as opposed to any socket server memory pool) - if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId); + if (netInBuffer == null) netInBuffer = new NetworkReceive(saslAuthRequestMaxReceiveSize, connectionId); - netInBuffer.readFrom(transportLayer); + try { + netInBuffer.readFrom(transportLayer); + } catch (InvalidReceiveException e) { + throw new SaslAuthenticationException("Failing SASL authentication due to invalid receive size", e); + } if (!netInBuffer.complete()) return; netInBuffer.payload().rewind(); diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java index 07cbb7856d..197151f5fb 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java +++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java @@ -38,6 +38,8 @@ public class TestSecurityConfig extends AbstractConfig { null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC) .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Type.LONG, 0L, Importance.MEDIUM, BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC) + .define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, Type.INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE, + Importance.LOW, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC) .withClientSslSupport() .withClientSaslSupport(); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 988a0f2823..40a27935f3 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -212,6 +212,52 @@ public class SaslAuthenticatorTest { checkAuthenticationAndReauthentication(securityProtocol, node); } + /** + * Test SASL/PLAIN with sasl.authentication.max.receive.size config + */ + @Test + public void testSaslAuthenticationMaxReceiveSize() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + configureMechanisms("PLAIN", Collections.singletonList("PLAIN")); + + // test auth with 1KB receive size + saslServerConfigs.put(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, "1024"); + server = createEchoServer(securityProtocol); + + // test valid sasl authentication + String node1 = "valid"; + checkAuthenticationAndReauthentication(securityProtocol, node1); + + // test with handshake request with large mechanism string + byte[] bytes = new byte[1024]; + new Random().nextBytes(bytes); + String mechanism = new String(bytes, StandardCharsets.UTF_8); + String node2 = "invalid1"; + createClientConnection(SecurityProtocol.PLAINTEXT, node2); + SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion()); + RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version(), "someclient", nextCorrelationId++); + NetworkSend send = new NetworkSend(node2, handshakeRequest.toSend(header)); + selector.send(send); + //we will get exception in server and connection gets closed. + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); + selector.close(); + + String node3 = "invalid2"; + createClientConnection(SecurityProtocol.PLAINTEXT, node3); + sendHandshakeRequestReceiveResponse(node3, ApiKeys.SASL_HANDSHAKE.latestVersion()); + + // test with sasl authenticate request with large auth_byes string + String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + new String(bytes, StandardCharsets.UTF_8); + ByteBuffer authBuf = ByteBuffer.wrap(Utils.utf8(authString)); + SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(authBuf.array()); + SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build(); + header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, request.version(), "someclient", nextCorrelationId++); + send = new NetworkSend(node3, request.toSend(header)); + selector.send(send); + NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.READY.state()); + server.verifyAuthenticationMetrics(1, 2); + } + /** * Tests that SASL/PLAIN clients with invalid password fail authentication. */ diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 50696ecf05..b0dec3e522 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslHandshakeRequestData; @@ -25,7 +26,6 @@ import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ChannelMetadataRegistry; import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.DefaultChannelMetadataRegistry; -import org.apache.kafka.common.network.InvalidReceiveException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; @@ -96,10 +96,10 @@ public class SaslServerAuthenticatorTest { SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry()); when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { - invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1); + invocation.<ByteBuffer>getArgument(0).putInt(BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE + 1); return 4; }); - assertThrows(InvalidReceiveException.class, authenticator::authenticate); + assertThrows(SaslAuthenticationException.class, authenticator::authenticate); verify(transportLayer).read(any(ByteBuffer.class)); } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 860056f9a3..8f4806185b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -258,6 +258,7 @@ object Defaults { /** ********* General Security configuration ***********/ val ConnectionsMaxReauthMsDefault = 0L + val DefaultServerMaxMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder] /** ********* Sasl configuration ***********/ @@ -565,6 +566,7 @@ object KafkaConfig { /** ******** Common Security Configuration *************/ val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS + val SaslServerMaxReceiveSizeProp = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG /** ********* SSL Configuration ****************/ @@ -1009,6 +1011,7 @@ object KafkaConfig { /** ******** Common Security Configuration *************/ val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC + val SaslServerMaxReceiveSizeDoc = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC /** ********* SSL Configuration ****************/ @@ -1315,6 +1318,7 @@ object KafkaConfig { /** ********* General Security Configuration ****************/ .define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc) + .define(SaslServerMaxReceiveSizeProp, INT, Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc) .define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc) /** ********* SSL Configuration ****************/ diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index ee638ba893..78adff5f8f 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -946,6 +946,8 @@ class KafkaConfigTest { case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore + case KafkaConfig.SaslServerMaxReceiveSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + // Raft Quorum Configs case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")