Thread local pools never cleaned up patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13033
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f668c6f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f668c6f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f668c6f Branch: refs/heads/cassandra-3.11 Commit: 7f668c6fe117f892cd79863fb9805ea5d5a2823c Parents: da94781 Author: Robert Stupp <sn...@snazy.de> Authored: Mon Dec 12 20:28:31 2016 +0100 Committer: Robert Stupp <sn...@snazy.de> Committed: Mon Dec 12 20:38:56 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../concurrent/NamedThreadFactory.java | 24 +++++++++++- .../db/commitlog/AbstractCommitLogService.java | 3 +- .../db/commitlog/CommitLogSegmentManager.java | 3 +- .../cassandra/net/OutboundTcpConnection.java | 41 +++++++++----------- .../apache/cassandra/repair/RepairRunnable.java | 4 +- .../scheduler/RoundRobinScheduler.java | 3 +- .../cassandra/service/StorageService.java | 7 ++-- .../cassandra/streaming/ConnectionHandler.java | 4 +- .../compress/CompressedInputStream.java | 3 +- 10 files changed, 59 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bbd47c1..5bc30be 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Thread local pools never cleaned up (CASSANDRA-13033) * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781) * CQL often queries static columns unnecessarily (CASSANDRA-12768) * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index 33c80d5..22193c4 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -20,6 +20,9 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; + /** * This class is an implementation of the <i>ThreadFactory</i> interface. This * is useful to give Java threads meaningful names which is useful when using @@ -54,12 +57,29 @@ public class NamedThreadFactory implements ThreadFactory public Thread newThread(Runnable runnable) { - String name = id + ":" + n.getAndIncrement(); - Thread thread = new Thread(threadGroup, runnable, name); + String name = id + ':' + n.getAndIncrement(); + Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name); thread.setPriority(priority); thread.setDaemon(true); if (contextClassLoader != null) thread.setContextClassLoader(contextClassLoader); return thread; } + + /** + * Ensures that {@link FastThreadLocal#remove() FastThreadLocal.remove()} is called when the {@link Runnable#run()} + * method of the given {@link Runnable} instance completes to ensure cleanup of {@link FastThreadLocal} instances. + * This is especially important for direct byte buffers allocated locally for a thread. + */ + public static Runnable threadLocalDeallocator(Runnable r) + { + return () -> + { + try { + r.run(); + } finally { + FastThreadLocal.removeAll(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 113d1ba..e5a5887 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.commitlog; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.slf4j.*; @@ -159,7 +160,7 @@ public abstract class AbstractCommitLogService } }; - thread = new Thread(runnable, name); + thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name); thread.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 82cee50..79dd316 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; @@ -174,7 +175,7 @@ public class CommitLogSegmentManager run = true; - managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); + managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR"); managerThread.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index f573787..a9dfcdc 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -45,6 +45,7 @@ import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; @@ -505,31 +506,27 @@ public class OutboundTcpConnection extends Thread { final AtomicInteger version = new AtomicInteger(NO_VERSION); final CountDownLatch versionLatch = new CountDownLatch(1); - new Thread("HANDSHAKE-" + poolReference.endPoint()) + new Thread(NamedThreadFactory.threadLocalDeallocator(() -> { - @Override - public void run() + try { - try - { - logger.info("Handshaking version with {}", poolReference.endPoint()); - version.set(inputStream.readInt()); - } - catch (IOException ex) - { - final String msg = "Cannot handshake version with " + poolReference.endPoint(); - if (logger.isTraceEnabled()) - logger.trace(msg, ex); - else - logger.info(msg); - } - finally - { - //unblock the waiting thread on either success or fail - versionLatch.countDown(); - } + logger.info("Handshaking version with {}", poolReference.endPoint()); + version.set(inputStream.readInt()); + } + catch (IOException ex) + { + final String msg = "Cannot handshake version with " + poolReference.endPoint(); + if (logger.isTraceEnabled()) + logger.trace(msg, ex); + else + logger.info(msg); + } + finally + { + //unblock the waiting thread on either success or fail + versionLatch.countDown(); } - }.start(); + }),"HANDSHAKE-" + poolReference.endPoint()).start(); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 7dd1b31..213e5c5 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -373,7 +373,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti private Thread createQueryThread(final int cmd, final UUID sessionId) { - return new Thread(new WrappedRunnable() + return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable() { // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. @@ -440,6 +440,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti seen[si].clear(); } } - }); + })); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java index c98c0fe..61dfa50 100644 --- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java +++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.RequestSchedulerOptions; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -69,7 +70,7 @@ public class RoundRobinScheduler implements IRequestScheduler } } }; - Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER"); + Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER"); scheduler.start(); logger.info("Started the RoundRobin Request Scheduler"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d70c8dc..71cbc35 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -50,6 +50,7 @@ import org.apache.cassandra.auth.AuthMigrationListener; import org.apache.cassandra.batchlog.BatchRemoveVerbHandler; import org.apache.cassandra.batchlog.BatchStoreVerbHandler; import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -613,7 +614,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // daemon threads, like our executors', continue to run while shutdown hooks are invoked - drainOnShutdown = new Thread(new WrappedRunnable() + drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable() { @Override public void runMayThrow() throws InterruptedException, ExecutionException, IOException @@ -628,7 +629,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory()); logbackHook.run(); } - }, "StorageServiceShutdownHook"); + }), "StorageServiceShutdownHook"); Runtime.getRuntime().addShutdownHook(drainOnShutdown); replacing = DatabaseDescriptor.isReplacing(); @@ -3195,7 +3196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return 0; int cmd = nextRepairCommand.incrementAndGet(); - new Thread(createRepairTask(cmd, keyspace, options, legacy)).start(); + new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start(); return cmd; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index d3d8ed2..b83c089 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; @@ -214,7 +216,7 @@ public class ConnectionHandler this.socket = socket; this.protocolVersion = protocolVersion; - new Thread(this, name() + "-" + session.peer).start(); + new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start(); } public ListenableFuture<?> close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index d59849f..6577980 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -33,6 +33,7 @@ import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.WrappedRunnable; @@ -91,7 +92,7 @@ public class CompressedInputStream extends InputStream this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); this.crcCheckChanceSupplier = crcCheckChanceSupplier; - new Thread(new Reader(source, info, dataBuffer)).start(); + new Thread(NamedThreadFactory.threadLocalDeallocator(new Reader(source, info, dataBuffer))).start(); } public int read() throws IOException