This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 015d7aede6cbd350d56d75006930dd2bf89a4a5a
Author: Manikumar Reddy <manikumar.re...@gmail.com>
AuthorDate: Mon May 16 19:25:02 2022 +0530

    MINOR: Add configurable max receive size for SASL authentication requests
    
    This adds a new configuration `sasl.server.max.receive.size` that sets the 
maximum receive size for requests before and during authentication.
    
    Reviewers: Tom Bentley <tbent...@redhat.com>, Mickael Maison 
<mickael.mai...@gmail.com>
    
    Co-authored-by: Manikumar Reddy <manikumar.re...@gmail.com>
    Co-authored-by: Mickael Maison <mickael.mai...@gmail.com>
---
 checkstyle/suppressions.xml                        |  2 +
 .../config/internals/BrokerSecurityConfigs.java    |  6 +++
 .../authenticator/SaslServerAuthenticator.java     | 16 ++++++--
 .../kafka/common/security/TestSecurityConfig.java  |  2 +
 .../authenticator/SaslAuthenticatorTest.java       | 46 ++++++++++++++++++++++
 .../authenticator/SaslServerAuthenticatorTest.java |  6 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 ++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 +
 8 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 55fcd1a9e5..7c32223961 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -49,6 +49,8 @@
               
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
+    <suppress checks="NPath"
+              files="SaslServerAuthenticator.java"/>
     <suppress checks="ClassFanOutComplexity"
               files="Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 0b90da8f80..8b7a9649c2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -36,6 +36,8 @@ public class BrokerSecurityConfigs {
     public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = 
"sasl.server.callback.handler.class";
     public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = 
"ssl.principal.mapping.rules";
     public static final String CONNECTIONS_MAX_REAUTH_MS = 
"connections.max.reauth.ms";
+    public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = 
"sasl.server.max.receive.size";
 
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully 
qualified name of a class that implements the " +
             "KafkaPrincipalBuilder interface, which is used to build the 
KafkaPrincipal object used during " +
@@ -89,4 +91,8 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum 
receive size allowed before and during initial SASL authentication." +
+            " Default receive size is 512KB. GSSAPI limits requests to 64K, 
but we allow upto 512KB by default for custom SASL mechanisms. In practice," +
+            " PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.";
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 019723b6b4..51b02952a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ByteBufferSend;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -88,8 +89,6 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 public class SaslServerAuthenticator implements Authenticator {
-    // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL 
mechanisms
-    static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = 
LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     /**
@@ -140,6 +139,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
     private String saslMechanism;
 
     // buffers used in `authenticate`
+    private Integer saslAuthRequestMaxReceiveSize;
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
     private Send authenticationFailureSend = null;
@@ -189,6 +189,10 @@ public class SaslServerAuthenticator implements 
Authenticator {
         // Note that the old principal builder does not support SASL, so we do 
not need to pass the
         // authenticator or the transport layer
         this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, kerberosNameParser, null);
+
+        saslAuthRequestMaxReceiveSize = (Integer) 
configs.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG);
+        if (saslAuthRequestMaxReceiveSize == null)
+            saslAuthRequestMaxReceiveSize = 
BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE;
     }
 
     private void createSaslServer(String mechanism) throws IOException {
@@ -252,9 +256,13 @@ public class SaslServerAuthenticator implements 
Authenticator {
             }
 
             // allocate on heap (as opposed to any socket server memory pool)
-            if (netInBuffer == null) netInBuffer = new 
NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+            if (netInBuffer == null) netInBuffer = new 
NetworkReceive(saslAuthRequestMaxReceiveSize, connectionId);
 
-            netInBuffer.readFrom(transportLayer);
+            try {
+                netInBuffer.readFrom(transportLayer);
+            } catch (InvalidReceiveException e) {
+                throw new SaslAuthenticationException("Failing SASL 
authentication due to invalid receive size", e);
+            }
             if (!netInBuffer.complete())
                 return;
             netInBuffer.payload().rewind();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
 
b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 07cbb7856d..197151f5fb 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -38,6 +38,8 @@ public class TestSecurityConfig extends AbstractConfig {
                     null, Importance.MEDIUM, 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
             .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, 
Type.LONG, 0L, Importance.MEDIUM,
                     BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
+            .define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, 
Type.INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE,
+                    Importance.LOW, 
BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
             .withClientSslSupport()
             .withClientSaslSupport();
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 988a0f2823..40a27935f3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -212,6 +212,52 @@ public class SaslAuthenticatorTest {
         checkAuthenticationAndReauthentication(securityProtocol, node);
     }
 
+    /**
+     * Test SASL/PLAIN with sasl.authentication.max.receive.size config
+     */
+    @Test
+    public void testSaslAuthenticationMaxReceiveSize() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+
+        // test auth with 1KB receive size
+        
saslServerConfigs.put(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG,
 "1024");
+        server = createEchoServer(securityProtocol);
+
+        // test valid sasl authentication
+        String node1 = "valid";
+        checkAuthenticationAndReauthentication(securityProtocol, node1);
+
+        // test with handshake request with large mechanism string
+        byte[] bytes = new byte[1024];
+        new Random().nextBytes(bytes);
+        String mechanism = new String(bytes, StandardCharsets.UTF_8);
+        String node2 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        SaslHandshakeRequest handshakeRequest = 
buildSaslHandshakeRequest(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, 
handshakeRequest.version(), "someclient", nextCorrelationId++);
+        NetworkSend send = new NetworkSend(node2, 
handshakeRequest.toSend(header));
+        selector.send(send);
+        //we will get exception in server and connection gets closed.
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY.state());
+        selector.close();
+
+        String node3 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node3);
+        sendHandshakeRequestReceiveResponse(node3, 
ApiKeys.SASL_HANDSHAKE.latestVersion());
+
+        // test with sasl authenticate request with large auth_byes string
+        String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" +  
new String(bytes, StandardCharsets.UTF_8);
+        ByteBuffer authBuf = ByteBuffer.wrap(Utils.utf8(authString));
+        SaslAuthenticateRequestData data = new 
SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+        SaslAuthenticateRequest request = new 
SaslAuthenticateRequest.Builder(data).build();
+        header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, 
request.version(), "someclient", nextCorrelationId++);
+        send = new NetworkSend(node3, request.toSend(header));
+        selector.send(send);
+        NetworkTestUtils.waitForChannelClose(selector, node3, 
ChannelState.READY.state());
+        server.verifyAuthenticationMetrics(1, 2);
+    }
+
     /**
      * Tests that SASL/PLAIN clients with invalid password fail authentication.
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 50696ecf05..b0dec3e522 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.security.authenticator;
 
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -25,7 +26,6 @@ import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.ChannelMetadataRegistry;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
-import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -96,10 +96,10 @@ public class SaslServerAuthenticatorTest {
                 SCRAM_SHA_256.mechanismName(), new 
DefaultChannelMetadataRegistry());
 
         when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
-            
invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE
 + 1);
+            
invocation.<ByteBuffer>getArgument(0).putInt(BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE
 + 1);
             return 4;
         });
-        assertThrows(InvalidReceiveException.class, 
authenticator::authenticate);
+        assertThrows(SaslAuthenticationException.class, 
authenticator::authenticate);
         verify(transportLayer).read(any(ByteBuffer.class));
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 860056f9a3..8f4806185b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -258,6 +258,7 @@ object Defaults {
 
     /** ********* General Security configuration ***********/
   val ConnectionsMaxReauthMsDefault = 0L
+  val DefaultServerMaxMaxReceiveSize = 
BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE
   val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
 
   /** ********* Sasl configuration ***********/
@@ -565,6 +566,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassProp = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
   val ConnectionsMaxReauthMsProp = 
BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
+  val SaslServerMaxReceiveSizeProp = 
BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG
   val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
 
   /** ********* SSL Configuration ****************/
@@ -1009,6 +1011,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassDoc = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
   val ConnectionsMaxReauthMsDoc = 
BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
+  val SaslServerMaxReceiveSizeDoc = 
BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC
   val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
 
   /** ********* SSL Configuration ****************/
@@ -1315,6 +1318,7 @@ object KafkaConfig {
 
       /** ********* General Security Configuration ****************/
       .define(ConnectionsMaxReauthMsProp, LONG, 
Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+      .define(SaslServerMaxReceiveSizeProp, INT, 
Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc)
       .define(securityProviderClassProp, STRING, null, LOW, 
securityProviderClassDoc)
 
       /** ********* SSL Configuration ****************/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index ee638ba893..78adff5f8f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -946,6 +946,8 @@ class KafkaConfigTest {
         case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
         case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
 
+        case KafkaConfig.SaslServerMaxReceiveSizeProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+
         // Raft Quorum Configs
         case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string
         case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")

Reply via email to