Fix handling of RejectedExecution in sync Thrift server patch by Christian Rolf and jbellis for CASSANDRA-6788
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/56e2b4ac Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/56e2b4ac Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/56e2b4ac Branch: refs/heads/cassandra-2.1 Commit: 56e2b4ac02f31ff14be36d73597cfcf95fb4815e Parents: 1e64972 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Mar 17 10:50:08 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Mar 17 10:50:19 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../thrift/CustomTThreadPoolServer.java | 25 ++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e2b4ac/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c4e1bd4..2f708dc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.7 + * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788) * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865) * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864) * Fix schema concurrency exceptions (CASSANDRA-6841) http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e2b4ac/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java index cf48502..d1a3304 100644 --- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java +++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -97,7 +98,7 @@ public class CustomTThreadPoolServer extends TServer // block until we are under max clients while (activeClients.get() >= args.maxWorkerThreads) { - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); } try @@ -117,6 +118,12 @@ public class CustomTThreadPoolServer extends TServer logger.warn("Transport error occurred during acceptance of message.", ttx); } } + catch (RejectedExecutionException e) + { + // worker thread decremented activeClients but hadn't finished exiting + logger.debug("Dropping client connection because our limit of {} has been reached", args.maxWorkerThreads); + continue; + } if (activeClients.get() >= args.maxWorkerThreads) logger.warn("Maximum number of clients " + args.maxWorkerThreads + " reached"); @@ -213,19 +220,13 @@ public class CustomTThreadPoolServer extends TServer } finally { - activeClients.decrementAndGet(); if (socket != null) ThriftSessionManager.instance.connectionComplete(socket); - } - - if (inputTransport != null) - { - inputTransport.close(); - } - - if (outputTransport != null) - { - outputTransport.close(); + if (inputTransport != null) + inputTransport.close(); + if (outputTransport != null) + outputTransport.close(); + activeClients.decrementAndGet(); } } }