Repository: kafka
Updated Branches:
  refs/heads/0.9.0 f3b38cb6f -> 220d49849


KAFKA-3169; Limit receive buffer size for SASL packets in broker

Limit receive buffer size to avoid OOM in broker with invalid SASL packets

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Sriharsha Chintalapani 
<har...@hortonworks.com>

Closes #831 from rajinisivaram/KAFKA-3169

(cherry picked from commit b433b4a2470896d90d9dc596bb932030869d5d67)
Signed-off-by: Sriharsha Chintalapani <har...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/220d4984
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/220d4984
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/220d4984

Branch: refs/heads/0.9.0
Commit: 220d498497a2d5a3d99af1d3a46c631417d22858
Parents: f3b38cb
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Sat Jan 30 17:05:01 2016 +0530
Committer: Sriharsha Chintalapani <har...@hortonworks.com>
Committed: Sat Jan 30 17:05:22 2016 +0530

----------------------------------------------------------------------
 .../org/apache/kafka/common/network/SaslChannelBuilder.java    | 2 +-
 .../common/security/authenticator/SaslServerAuthenticator.java | 6 ++++--
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/220d4984/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 34a87c9..b3db4e1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -81,7 +81,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             TransportLayer transportLayer = buildTransportLayer(id, key, 
socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(id, 
loginManager.subject(), kerberosShortNamer);
+                authenticator = new SaslServerAuthenticator(id, 
loginManager.subject(), kerberosShortNamer, maxReceiveSize);
             else
                 authenticator = new SaslClientAuthenticator(id, 
loginManager.subject(), loginManager.serviceName(),
                         socketChannel.socket().getInetAddress().getHostName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/220d4984/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index b4d99d2..1f925f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -60,6 +60,7 @@ public class SaslServerAuthenticator implements Authenticator 
{
     private final Subject subject;
     private final String node;
     private final KerberosShortNamer kerberosNamer;
+    private final int maxReceiveSize;
 
     // assigned in `configure`
     private TransportLayer transportLayer;
@@ -68,7 +69,7 @@ public class SaslServerAuthenticator implements Authenticator 
{
     private NetworkReceive netInBuffer;
     private NetworkSend netOutBuffer;
 
-    public SaslServerAuthenticator(String node, final Subject subject, 
KerberosShortNamer kerberosNameParser) throws IOException {
+    public SaslServerAuthenticator(String node, final Subject subject, 
KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         if (subject.getPrincipals().isEmpty())
@@ -76,6 +77,7 @@ public class SaslServerAuthenticator implements Authenticator 
{
         this.node = node;
         this.subject = subject;
         this.kerberosNamer = kerberosNameParser;
+        this.maxReceiveSize = maxReceiveSize;
         saslServer = createSaslServer();
     }
 
@@ -149,7 +151,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
             return;
         }
 
-        if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+        if (netInBuffer == null) netInBuffer = new 
NetworkReceive(maxReceiveSize, node);
 
         netInBuffer.readFrom(transportLayer);
 

Reply via email to