Fix Thread Leak in OutboundTcpConnection patch by jasobrown, reviewed by aweisberg for CASSANDRA-13204
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6237bf6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6237bf6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6237bf6 Branch: refs/heads/cassandra-3.0 Commit: a6237bf65a95d654b7e702e81fd0d353460d0c89 Parents: f6a7057 Author: Jason Brown <jasedbr...@gmail.com> Authored: Thu Feb 9 14:34:08 2017 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Sat Feb 11 07:47:29 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/net/OutboundTcpConnection.java | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a6237bf6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5c86687..9ce8d49 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.17 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204) * Coalescing strategy can enter infinite loop (CASSANDRA-13159) * Upgrade netty version to fix memory leak with client encryption (CASSANDRA-13114) * Fix paging for DISTINCT queries on partition keys and static columns (CASSANDRA-13017) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a6237bf6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 1a88220..ff2d929 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -161,8 +161,11 @@ public class OutboundTcpConnection extends Thread void closeSocket(boolean destroyThread) { - backlog.clear(); isStopped = destroyThread; // Exit loop to stop the thread + backlog.clear(); + // in the "destroyThread = true" case, enqueuing the sentinel is important mostly to unblock the backlog.take() + // (via the CoalescingStrategy) in case there's a data race between this method enqueuing the sentinel + // and run() clearing the backlog on connection failure. enqueue(CLOSE_SENTINEL, -1); } @@ -183,7 +186,7 @@ public class OutboundTcpConnection extends Thread final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize); outer: - while (true) + while (!isStopped) { try { @@ -199,6 +202,7 @@ public class OutboundTcpConnection extends Thread int count = drainedMessages.size(); //The timestamp of the first message has already been provided to the coalescing strategy //so skip logging it. + inner: for (QueuedMessage qm : drainedMessages) { try @@ -217,8 +221,12 @@ public class OutboundTcpConnection extends Thread else if (socket != null || connect()) writeConnected(qm, count == 1 && backlog.isEmpty()); else + { // clear out the queue, else gossip messages back up. + drainedMessages.clear(); backlog.clear(); + break inner; + } } catch (Exception e) {