Fix handling of RejectedExecution in sync Thrift server
patch by Christian Rolf and jbellis for CASSANDRA-6788


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/56e2b4ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/56e2b4ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/56e2b4ac

Branch: refs/heads/cassandra-2.1
Commit: 56e2b4ac02f31ff14be36d73597cfcf95fb4815e
Parents: 1e64972
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Mar 17 10:50:08 2014 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon Mar 17 10:50:19 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../thrift/CustomTThreadPoolServer.java         | 25 ++++++++++----------
 2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e2b4ac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c4e1bd4..2f708dc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
  * Log more information when exceeding tombstone_warn_threshold 
(CASSANDRA-6865)
  * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
  * Fix schema concurrency exceptions (CASSANDRA-6841)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/56e2b4ac/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java 
b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index cf48502..d1a3304 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -97,7 +98,7 @@ public class CustomTThreadPoolServer extends TServer
             // block until we are under max clients
             while (activeClients.get() >= args.maxWorkerThreads)
             {
-                Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
+                Uninterruptibles.sleepUninterruptibly(10, 
TimeUnit.MILLISECONDS);
             }
 
             try
@@ -117,6 +118,12 @@ public class CustomTThreadPoolServer extends TServer
                     logger.warn("Transport error occurred during acceptance of 
message.", ttx);
                 }
             }
+            catch (RejectedExecutionException e)
+            {
+                // worker thread decremented activeClients but hadn't finished 
exiting
+                logger.debug("Dropping client connection because our limit of 
{} has been reached", args.maxWorkerThreads);
+                continue;
+            }
 
             if (activeClients.get() >= args.maxWorkerThreads)
                 logger.warn("Maximum number of clients " + 
args.maxWorkerThreads + " reached");
@@ -213,19 +220,13 @@ public class CustomTThreadPoolServer extends TServer
             }
             finally
             {
-                activeClients.decrementAndGet();
                 if (socket != null)
                     ThriftSessionManager.instance.connectionComplete(socket);
-            }
-
-            if (inputTransport != null)
-            {
-                inputTransport.close();
-            }
-
-            if (outputTransport != null)
-            {
-                outputTransport.close();
+                if (inputTransport != null)
+                    inputTransport.close();
+                if (outputTransport != null)
+                    outputTransport.close();
+                activeClients.decrementAndGet();
             }
         }
     }

Reply via email to