Allow configuration of internode socket buffer. Patch by Michael Michalski and Jason Brown for CASSANDRA-3389, reviewed by Jason Brown.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d4923f2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d4923f2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d4923f2 Branch: refs/heads/cassandra-1.2 Commit: 0d4923f246d5f6450d2908b59b4b7e70f9ebbfbe Parents: aa90c88 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Feb 1 05:59:19 2013 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Feb 1 05:59:19 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 4 ++++ src/java/org/apache/cassandra/config/Config.java | 2 ++ .../cassandra/config/DatabaseDescriptor.java | 10 ++++++++++ .../cassandra/net/IncomingTcpConnection.java | 12 ++++++++++++ .../cassandra/net/OutboundTcpConnection.java | 12 ++++++++++++ 6 files changed, 41 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index faefd55..43eb1f5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * Validate login for USE queries (CASSANDRA-5207) * cli: remove default username and password (CASSANDRA-5208) * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694) + * allow configuration of internode socket buffer (CASSANDRA-3378) 1.2.1 * stream undelivered hints on decommission (CASSANDRA-5128) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index bfc65d8..14f4e96 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -370,6 +370,10 @@ rpc_server_type: sync # rpc_send_buff_size_in_bytes: # rpc_recv_buff_size_in_bytes: +# uncomment to set socket buffer size for internode communication +# internode_send_buff_size_in_bytes: +# internode_recv_buff_size_in_bytes: + # Frame size for thrift (maximum field length). thrift_framed_transport_size_in_mb: 15 http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index edd4439..d8a8afd 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -87,6 +87,8 @@ public class Config public Integer rpc_max_threads = null; public Integer rpc_send_buff_size_in_bytes; public Integer rpc_recv_buff_size_in_bytes; + public Integer internode_send_buff_size_in_bytes; + public Integer internode_recv_buff_size_in_bytes; public Boolean start_native_transport = false; public Integer native_transport_port = 9042; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 18d856c..42bff03 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -982,6 +982,16 @@ public class DatabaseDescriptor return conf.rpc_recv_buff_size_in_bytes; } + public static Integer getInternodeSendBufferSize() + { + return conf.internode_send_buff_size_in_bytes; + } + + public static Integer getInternodeRecvBufferSize() + { + return conf.internode_recv_buff_size_in_bytes; + } + public static boolean startNativeTransport() { return conf.start_native_transport; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index cb989c2..aa8378e 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -20,6 +20,7 @@ package org.apache.cassandra.net; import java.io.*; import java.net.InetAddress; import java.net.Socket; +import java.net.SocketException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,17 @@ public class IncomingTcpConnection extends Thread { assert socket != null; this.socket = socket; + if (DatabaseDescriptor.getInternodeRecvBufferSize() != null) + { + try + { + this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize().intValue()); + } + catch (SocketException se) + { + logger.warn("Failed to set receive buffer size on internode socket.", se); + } + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 9de38d3..003bddb 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -23,6 +23,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; +import java.net.SocketException; import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -275,6 +276,17 @@ public class OutboundTcpConnection extends Thread { socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay()); } + if (DatabaseDescriptor.getInternodeSendBufferSize() != null) + { + try + { + socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize().intValue()); + } + catch (SocketException se) + { + logger.warn("Failed to set send buffer size on internode socket.", se); + } + } out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096)); if (targetVersion >= MessagingService.VERSION_12)