Fix Thread Leak in OutboundTcpConnection

patch by jasobrown, reviewed by aweisberg for CASSANDRA-13204


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6237bf6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6237bf6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6237bf6

Branch: refs/heads/cassandra-3.0
Commit: a6237bf65a95d654b7e702e81fd0d353460d0c89
Parents: f6a7057
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Thu Feb 9 14:34:08 2017 -0800
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Sat Feb 11 07:47:29 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../org/apache/cassandra/net/OutboundTcpConnection.java | 12 ++++++++++--
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a6237bf6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c86687..9ce8d49 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.17
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
  * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
  * Upgrade netty version to fix memory leak with client encryption 
(CASSANDRA-13114)
  * Fix paging for DISTINCT queries on partition keys and static columns 
(CASSANDRA-13017)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a6237bf6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1a88220..ff2d929 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -161,8 +161,11 @@ public class OutboundTcpConnection extends Thread
 
     void closeSocket(boolean destroyThread)
     {
-        backlog.clear();
         isStopped = destroyThread; // Exit loop to stop the thread
+        backlog.clear();
+        // in the "destroyThread = true" case, enqueuing the sentinel is 
important mostly to unblock the backlog.take()
+        // (via the CoalescingStrategy) in case there's a data race between 
this method enqueuing the sentinel
+        // and run() clearing the backlog on connection failure.
         enqueue(CLOSE_SENTINEL, -1);
     }
 
@@ -183,7 +186,7 @@ public class OutboundTcpConnection extends Thread
         final List<QueuedMessage> drainedMessages = new 
ArrayList<>(drainedMessageSize);
 
         outer:
-        while (true)
+        while (!isStopped)
         {
             try
             {
@@ -199,6 +202,7 @@ public class OutboundTcpConnection extends Thread
             int count = drainedMessages.size();
             //The timestamp of the first message has already been provided to 
the coalescing strategy
             //so skip logging it.
+            inner:
             for (QueuedMessage qm : drainedMessages)
             {
                 try
@@ -217,8 +221,12 @@ public class OutboundTcpConnection extends Thread
                     else if (socket != null || connect())
                         writeConnected(qm, count == 1 && backlog.isEmpty());
                     else
+                    {
                         // clear out the queue, else gossip messages back up.
+                        drainedMessages.clear();
                         backlog.clear();
+                        break inner;
+                    }
                 }
                 catch (Exception e)
                 {

Reply via email to