Repository: kafka Updated Branches: refs/heads/0.9.0 f3b38cb6f -> 220d49849
KAFKA-3169; Limit receive buffer size for SASL packets in broker Limit receive buffer size to avoid OOM in broker with invalid SASL packets Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Sriharsha Chintalapani <har...@hortonworks.com> Closes #831 from rajinisivaram/KAFKA-3169 (cherry picked from commit b433b4a2470896d90d9dc596bb932030869d5d67) Signed-off-by: Sriharsha Chintalapani <har...@hortonworks.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/220d4984 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/220d4984 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/220d4984 Branch: refs/heads/0.9.0 Commit: 220d498497a2d5a3d99af1d3a46c631417d22858 Parents: f3b38cb Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Sat Jan 30 17:05:01 2016 +0530 Committer: Sriharsha Chintalapani <har...@hortonworks.com> Committed: Sat Jan 30 17:05:22 2016 +0530 ---------------------------------------------------------------------- .../org/apache/kafka/common/network/SaslChannelBuilder.java | 2 +- .../common/security/authenticator/SaslServerAuthenticator.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/220d4984/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 34a87c9..b3db4e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -81,7 +81,7 @@ public class SaslChannelBuilder implements ChannelBuilder { TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); Authenticator authenticator; if (mode == Mode.SERVER) - authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer); + authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, maxReceiveSize); else authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), socketChannel.socket().getInetAddress().getHostName()); http://git-wip-us.apache.org/repos/asf/kafka/blob/220d4984/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index b4d99d2..1f925f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -60,6 +60,7 @@ public class SaslServerAuthenticator implements Authenticator { private final Subject subject; private final String node; private final KerberosShortNamer kerberosNamer; + private final int maxReceiveSize; // assigned in `configure` private TransportLayer transportLayer; @@ -68,7 +69,7 @@ public class SaslServerAuthenticator implements Authenticator { private NetworkReceive netInBuffer; private NetworkSend netOutBuffer; - public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser) throws IOException { + public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException { if (subject == null) throw new IllegalArgumentException("subject cannot be null"); if (subject.getPrincipals().isEmpty()) @@ -76,6 +77,7 @@ public class SaslServerAuthenticator implements Authenticator { this.node = node; this.subject = subject; this.kerberosNamer = kerberosNameParser; + this.maxReceiveSize = maxReceiveSize; saslServer = createSaslServer(); } @@ -149,7 +151,7 @@ public class SaslServerAuthenticator implements Authenticator { return; } - if (netInBuffer == null) netInBuffer = new NetworkReceive(node); + if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node); netInBuffer.readFrom(transportLayer);