Allow configuration of internode socket buffer.
Patch by Michael Michalski and Jason Brown for CASSANDRA-3389, reviewed
by Jason Brown.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d4923f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d4923f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d4923f2

Branch: refs/heads/cassandra-1.2
Commit: 0d4923f246d5f6450d2908b59b4b7e70f9ebbfbe
Parents: aa90c88
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Fri Feb 1 05:59:19 2013 -0600
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Fri Feb 1 05:59:19 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 conf/cassandra.yaml                                |    4 ++++
 src/java/org/apache/cassandra/config/Config.java   |    2 ++
 .../cassandra/config/DatabaseDescriptor.java       |   10 ++++++++++
 .../cassandra/net/IncomingTcpConnection.java       |   12 ++++++++++++
 .../cassandra/net/OutboundTcpConnection.java       |   12 ++++++++++++
 6 files changed, 41 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index faefd55..43eb1f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Validate login for USE queries (CASSANDRA-5207)
  * cli: remove default username and password (CASSANDRA-5208)
  * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694)
+ * allow configuration of internode socket buffer (CASSANDRA-3378)
 
 1.2.1
  * stream undelivered hints on decommission (CASSANDRA-5128)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bfc65d8..14f4e96 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -370,6 +370,10 @@ rpc_server_type: sync
 # rpc_send_buff_size_in_bytes:
 # rpc_recv_buff_size_in_bytes:
 
+# uncomment to set socket buffer size for internode communication
+# internode_send_buff_size_in_bytes:
+# internode_recv_buff_size_in_bytes:
+
 # Frame size for thrift (maximum field length).
 thrift_framed_transport_size_in_mb: 15
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index edd4439..d8a8afd 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -87,6 +87,8 @@ public class Config
     public Integer rpc_max_threads = null;
     public Integer rpc_send_buff_size_in_bytes;
     public Integer rpc_recv_buff_size_in_bytes;
+    public Integer internode_send_buff_size_in_bytes;
+    public Integer internode_recv_buff_size_in_bytes;
 
     public Boolean start_native_transport = false;
     public Integer native_transport_port = 9042;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 18d856c..42bff03 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -982,6 +982,16 @@ public class DatabaseDescriptor
         return conf.rpc_recv_buff_size_in_bytes;
     }
 
+    public static Integer getInternodeSendBufferSize()
+    {
+        return conf.internode_send_buff_size_in_bytes;
+    }
+
+    public static Integer getInternodeRecvBufferSize()
+    {
+        return conf.internode_recv_buff_size_in_bytes;
+    }
+
     public static boolean startNativeTransport()
     {
         return conf.start_native_transport;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index cb989c2..aa8378e 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 import java.io.*;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.net.SocketException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,17 @@ public class IncomingTcpConnection extends Thread
     {
         assert socket != null;
         this.socket = socket;
+        if (DatabaseDescriptor.getInternodeRecvBufferSize() != null)
+        {
+            try
+            {
+                
this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize().intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set receive buffer size on internode 
socket.", se);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4923f2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 9de38d3..003bddb 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -275,6 +276,17 @@ public class OutboundTcpConnection extends Thread
                 {
                     
socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay());
                 }
+                if (DatabaseDescriptor.getInternodeSendBufferSize() != null)
+                {
+                    try
+                    {
+                        
socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize().intValue());
+                    }
+                    catch (SocketException se)
+                    {
+                        logger.warn("Failed to set send buffer size on 
internode socket.", se);
+                    }
+                }
                 out = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream(), 4096));
 
                 if (targetVersion >= MessagingService.VERSION_12)

Reply via email to