Updated Branches: refs/heads/trunk 9c739eb75 -> d37696ca7
Thrift socket listen backlog (Partial resolution) patch by Nenad Merdanovic; reviewed by Vijay for CASSANDRA-6206 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d37696ca Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d37696ca Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d37696ca Branch: refs/heads/trunk Commit: d37696ca74fdf2306e4d78d52ae1142768716be9 Parents: 9c739eb Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Sun Nov 10 13:33:45 2013 -0800 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Sun Nov 10 13:33:45 2013 -0800 ---------------------------------------------------------------------- src/java/org/apache/cassandra/config/Config.java | 1 + src/java/org/apache/cassandra/config/DatabaseDescriptor.java | 5 +++++ src/java/org/apache/cassandra/service/CassandraDaemon.java | 3 ++- .../org/apache/cassandra/thrift/CustomTThreadPoolServer.java | 2 +- .../org/apache/cassandra/thrift/TCustomServerSocket.java | 4 ++-- src/java/org/apache/cassandra/thrift/TServerFactory.java | 1 + src/java/org/apache/cassandra/thrift/ThriftServer.java | 8 ++++++-- 7 files changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 2f9c146..f41a112 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -84,6 +84,7 @@ public class Config public Boolean start_rpc = true; public String rpc_address; public Integer rpc_port = 9160; + public Integer rpc_listen_backlog = 50; public String rpc_server_type = "sync"; public Boolean rpc_keepalive = true; public Integer rpc_min_threads = 16; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bae8a4f..6abbd0c 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -730,6 +730,11 @@ public class DatabaseDescriptor return Integer.parseInt(System.getProperty("cassandra.rpc_port", conf.rpc_port.toString())); } + public static int getRpcListenBacklog() + { + return conf.rpc_listen_backlog; + } + public static long getRpcTimeout() { return conf.request_timeout_in_ms; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index b99d625..cb3cc88 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -305,7 +305,8 @@ public class CassandraDaemon // Thift InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); int rpcPort = DatabaseDescriptor.getRpcPort(); - thriftServer = new ThriftServer(rpcAddr, rpcPort); + int listenBacklog = DatabaseDescriptor.getRpcListenBacklog(); + thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog); // Native transport InetAddress nativeAddr = DatabaseDescriptor.getNativeTransportAddress(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java index 7dc1bd9..7bcfdf2 100644 --- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java +++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java @@ -254,7 +254,7 @@ public class CustomTThreadPoolServer extends TServer } else { - serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize); + serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize, args.listenBacklog); } } catch (TTransportException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java index c30cec0..d88cf71 100644 --- a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java +++ b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java @@ -57,7 +57,7 @@ public class TCustomServerSocket extends TServerTransport * @throws TTransportException */ public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, - Integer recvBufferSize) + Integer recvBufferSize, Integer listenBacklog) throws TTransportException { try @@ -67,7 +67,7 @@ public class TCustomServerSocket extends TServerTransport // Prevent 2MSL delay problem on server restarts serverSocket.setReuseAddress(true); // Bind to listening port - serverSocket.bind(bindAddr); + serverSocket.bind(bindAddr, listenBacklog); } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/thrift/TServerFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/TServerFactory.java b/src/java/org/apache/cassandra/thrift/TServerFactory.java index 2e2acb8..ec3d61b 100644 --- a/src/java/org/apache/cassandra/thrift/TServerFactory.java +++ b/src/java/org/apache/cassandra/thrift/TServerFactory.java @@ -32,6 +32,7 @@ public interface TServerFactory public static class Args { public InetSocketAddress addr; + public Integer listenBacklog; public CassandraServer cassandraServer; public TProcessor processor; public TProtocolFactory tProtocolFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d37696ca/src/java/org/apache/cassandra/thrift/ThriftServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftServer.java b/src/java/org/apache/cassandra/thrift/ThriftServer.java index dbd3824..593fcf0 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftServer.java +++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java @@ -40,12 +40,14 @@ public class ThriftServer implements CassandraDaemon.Server protected final InetAddress address; protected final int port; + protected final int backlog; private volatile ThriftServerThread server; - public ThriftServer(InetAddress address, int port) + public ThriftServer(InetAddress address, int port, int backlog) { this.address = address; this.port = port; + this.backlog = backlog; } public void start() @@ -53,7 +55,7 @@ public class ThriftServer implements CassandraDaemon.Server if (server == null) { CassandraServer iface = getCassandraServer(); - server = new ThriftServerThread(address, port, iface, getProcessor(iface), getTransportFactory()); + server = new ThriftServerThread(address, port, backlog, iface, getProcessor(iface), getTransportFactory()); server.start(); } } @@ -110,6 +112,7 @@ public class ThriftServer implements CassandraDaemon.Server public ThriftServerThread(InetAddress listenAddr, int listenPort, + int listenBacklog, CassandraServer server, TProcessor processor, TTransportFactory transportFactory) @@ -120,6 +123,7 @@ public class ThriftServer implements CassandraDaemon.Server TServerFactory.Args args = new TServerFactory.Args(); args.tProtocolFactory = new TBinaryProtocol.Factory(true, true); args.addr = new InetSocketAddress(listenAddr, listenPort); + args.listenBacklog = listenBacklog; args.cassandraServer = server; args.processor = processor; args.keepAlive = DatabaseDescriptor.getRpcKeepAlive();