This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 8dcaa12  Allow instance class loaders to be garbage collected for 
inJVM dtest
8dcaa12 is described below

commit 8dcaa12baa97ce870f23ff9045f968f2fa28b2cc
Author: Jon Meredith <jmeredit...@gmail.com>
AuthorDate: Thu Aug 15 10:12:06 2019 -0600

    Allow instance class loaders to be garbage collected for inJVM dtest
    
    Backport support optional network/gossip test features,
    instance generations and subnet support from trunk.
    
    Backport MessageFilter and cluster builder API changes to match
    trunk (except for the change in namespace for Verbs).
    
    Add a test for repeatedly creating/tearing down in-JVM dtest
    clusters to help find resource leaks.
    
    Change IsolatedExecutor to clean up on an executor with zero
    core threads so that it exits sooner enabling GC on the
    InstanceClassLoader.  Moved classloader close after shutdown to prevent
    improve logging when the isolated executor is shutdown.
    
    Update the logback config for dtests to make it obvious
    which log threads are for instances vs the main logger.
    
    Disable native library loading until it can be reinstated
    (tracked in CASSANDRA-15170).
    
    Shutdown various executors and threads that were
    preventing the instance classloader from being unloaded.
    
    Add test-jvm-dtest-some ant target
    
    Patch by Jon Meredith; Reviewed by Alex Petrov and
    Benedict Elliott Smith for CASSANDRA-15170
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |  22 ++-
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  11 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   8 +-
 .../org/apache/cassandra/db/BatchlogManager.java   |  11 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  14 +-
 .../apache/cassandra/db/HintedHandOffManager.java  |   9 +
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   9 +
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 ++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +-
 .../org/apache/cassandra/net/MessagingService.java |   7 +-
 .../cassandra/net/OutboundTcpConnection.java       |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +-
 .../service/PendingRangeCalculatorService.java     |  11 +-
 .../apache/cassandra/service/StorageService.java   |  29 ++-
 .../cassandra/streaming/StreamCoordinator.java     |  13 ++
 .../cassandra/utils/BackgroundActivityMonitor.java |  12 +-
 src/java/org/apache/cassandra/utils/CLibrary.java  |   9 +-
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 151 ++++++++++++++++
 .../utils/NanoTimeToCurrentTimeMillis.java         |  14 +-
 .../org/apache/cassandra/utils/concurrent/Ref.java |   8 +-
 .../cassandra/utils/memory/MemtablePool.java       |   8 +-
 test/conf/logback-dtest.xml                        |  20 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  20 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  22 +--
 .../api/{IMessageFilters.java => Feature.java}     |  28 +--
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../cassandra/distributed/api/IInstanceConfig.java |   1 +
 .../cassandra/distributed/api/IMessageFilters.java |   6 +-
 .../distributed/impl/AbstractCluster.java          | 138 +++++++++-----
 .../cassandra/distributed/impl/Instance.java       | 165 ++++++++++++++---
 .../distributed/impl/InstanceClassLoader.java      |   9 +-
 .../cassandra/distributed/impl/InstanceConfig.java |  28 ++-
 .../distributed/impl/IsolatedExecutor.java         |  47 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  31 +---
 .../distributed/test/DistributedTestBase.java      |  28 +++
 .../distributed/test/ResourceLeakTest.java         | 201 +++++++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 41 files changed, 893 insertions(+), 231 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index eabdcaa..caea0f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
  * Fixing invalid CQL in security documentation (CASSANDRA-15020)
  * Make tools/bin/token-generator py2/3 compatible (CASSANDRA-15012)
  * Multi-version in-JVM dtests (CASSANDRA-14937)
+ * Allow instance class loaders to be garbage collected for inJVM dtest 
(CASSANDRA-15170)
 
 
 2.2.14
diff --git a/build.xml b/build.xml
index d522b59..b42c3ed 100644
--- a/build.xml
+++ b/build.xml
@@ -1818,6 +1818,10 @@
       <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
       <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/>
       <jvmarg value="-Dcassandra.skip_sync=true" />
+      <jvmarg value="-XX:MaxMetaspaceSize=256M" />
+      <jvmarg value="-XX:SoftRefLRUPolicyMSPerMB=0" />
+      <jvmarg value="-XX:+HeapDumpOnOutOfMemoryError" />
+      <jvmarg value="-XX:HeapDumpPath=build/test/oom.hprof" />
     </testmacro>
   </target>
 
@@ -1851,10 +1855,24 @@
       </jar>
   </target>
 
-  <target name="test-jvm-dtest" depends="build-test" description="Execute unit 
tests">
+  <target name="test-jvm-dtest" depends="build-test" description="Execute 
in-jvm dtests">
     <testmacro inputdir="${test.distributed.src}" 
timeout="${test.distributed.timeout}" forkmode="once" showoutput="true" 
filter="**/test/*Test.java">
       <jvmarg value="-Dlogback.configurationFile=test/conf/logback-dtest.xml"/>
-      <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
+      <jvmarg value="-Dcassandra.ring_delay_ms=10000"/>
+      <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
+      <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/>
+      <jvmarg value="-Dcassandra.skip_sync=true" />
+    </testmacro>
+  </target>
+
+  <!-- Use this with an FQDN for test class, and a csv list of methods like 
this:
+      ant test-jvm-dtest-some 
-Dtest.name=org.apache.cassandra.distributed.test.ResourceLeakTest 
-Dtest.methods=looperTest
+    -->
+  <target name="test-jvm-dtest-some" depends="build-test" description="Execute 
some in-jvm dtests">
+    <testmacro inputdir="${test.distributed.src}" 
timeout="${test.distributed.timeout}" forkmode="once" showoutput="true">
+      <test name="${test.name}" methods="${test.methods}" 
outfile="build/test/output/TEST-${test.name}-${test.methods}"/>
+      <jvmarg value="-Dlogback.configurationFile=test/conf/logback-dtest.xml"/>
+      <jvmarg value="-Dcassandra.ring_delay_ms=10000"/>
       <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
       <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/>
       <jvmarg value="-Dcassandra.skip_sync=true" />
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java 
b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
index 473edb7..af41513 100644
--- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -76,7 +76,7 @@ public class InfiniteLoopExecutor
         return this;
     }
 
-    public void shutdown()
+    public void shutdownNow()
     {
         isShutdown = true;
         thread.interrupt();
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java 
b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 489f58e..13d27a8 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -19,9 +19,12 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.utils.ExecutorUtils;
+
 /**
  * Centralized location for shared executors
  */
@@ -43,12 +46,8 @@ public class ScheduledExecutors
     public static final DebuggableScheduledThreadPoolExecutor optionalTasks = 
new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
 
     @VisibleForTesting
-    public static void shutdownAndWait() throws InterruptedException
+    public static void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
     {
-        ExecutorService[] executors = new ExecutorService[] { scheduledTasks, 
nonPeriodicTasks, optionalTasks };
-        for (ExecutorService executor : executors)
-            executor.shutdownNow();
-        for (ExecutorService executor : executors)
-            executor.awaitTermination(60, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, scheduledTasks, 
nonPeriodicTasks, optionalTasks);
     }
 }
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java 
b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index d355d77..50cc5a3 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -114,7 +114,7 @@ public class SharedExecutorPool
         return executor;
     }
 
-    public void shutdown() throws InterruptedException
+    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException
     {
         shuttingDown = true;
         for (SEPExecutor executor : executors)
@@ -122,7 +122,7 @@ public class SharedExecutorPool
 
         terminateWorkers();
 
-        long until = System.nanoTime() + TimeUnit.MINUTES.toNanos(1L);
+        long until = System.nanoTime() + unit.toNanos(timeout);
         for (SEPExecutor executor : executors)
             executor.shutdown.await(until - System.nanoTime(), 
TimeUnit.NANOSECONDS);
     }
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java 
b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 5e0a667..01bfb7c 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -121,12 +122,9 @@ public class StageManager
     };
 
     @VisibleForTesting
-    public static void shutdownAndWait() throws InterruptedException
+    public static void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
     {
-        for (Stage stage : Stage.values())
-            StageManager.stages.get(stage).shutdown();
-        for (Stage stage : Stage.values())
-            StageManager.stages.get(stage).awaitTermination(60, 
TimeUnit.SECONDS);
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, 
StageManager.stages.values());
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 9a2d1f6..40f8ce0 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -19,13 +19,11 @@ package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -54,7 +52,9 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.WrappedRunnable;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
@@ -74,7 +74,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public void start()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        MBeanWrapper mbs = MBeanWrapper.instance;
         try
         {
             mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
@@ -95,10 +95,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         batchlogTasks.scheduleWithFixedDelay(runnable, 
StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
-    public static void shutdown() throws InterruptedException
+    public static void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
     {
-        batchlogTasks.shutdown();
-        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownAndWait(timeout, unit, batchlogTasks);
     }
 
     public int countAllBatches()
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d26cd61..01330a6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -88,6 +88,8 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 import com.clearspring.analytics.stream.Counter;
 
+import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@ -192,24 +194,18 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public volatile long sampleLatencyNanos;
     private final ScheduledFuture<?> latencyCalculator;
 
-    public static void shutdownFlushExecutor() throws InterruptedException
-    {
-        flushExecutor.shutdown();
-        flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
-    }
-
     public static void shutdownPostFlushExecutor() throws InterruptedException
     {
         postFlushExecutor.shutdown();
         postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
     }
 
-    public static void shutdownReclaimExecutor() throws InterruptedException
+    public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException
     {
-        reclaimExecutor.shutdown();
-        reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownAndWait(timeout, unit, reclaimExecutor, 
postFlushExecutor, flushExecutor);
     }
 
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members 
jibe with new settings.
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 95af9ba..7a570d2 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -68,6 +68,9 @@ import java.util.List;
 
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
+
 /**
  * The hint schema looks like this:
  *
@@ -619,4 +622,10 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
         }
     }
 
+    @VisibleForTesting
+    public void shutdownAndWait(long timeout, TimeUnit units) throws 
InterruptedException, TimeoutException
+    {
+        shutdown(executor, hintDeliveryExecutor);
+        awaitTermination(timeout, units, executor, hintDeliveryExecutor);
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 2f0179d..6dd519a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -382,6 +382,7 @@ public class CommitLog implements CommitLogMBean
 
     /**
      * Shuts down the threads used by the commit log, blocking until 
completion.
+     * TODO this should accept a timeout, and throw TimeoutException
      */
     public void shutdownBlocking() throws InterruptedException
     {
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 831c252..bd4fe13 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
@@ -47,6 +48,8 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 
 /**
  * This module is responsible for Gossiping information for the local 
endpoint. This abstraction
@@ -1547,4 +1550,10 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return System.currentTimeMillis() + Gossiper.aVeryLongTime;
     }
 
+    @VisibleForTesting
+    public void stopShutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        stop();
+        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 3ebbb6e..9317132 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
@@ -43,11 +44,15 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
+
 /**
  * Manages the fixed-size memory pool for index summaries, periodically 
resizing them
  * in order to give more memory to hot sstables and less memory to cold 
sstables.
@@ -257,4 +262,10 @@ public class IndexSummaryManager implements 
IndexSummaryManagerMBean
     {
         return 
CompactionManager.instance.runIndexSummaryRedistribution(redistribution);
     }
+
+    @VisibleForTesting
+    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 81af9f0..16fa6c9 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -2326,10 +2326,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
     }
 
-    public static void shutdownBlocking() throws InterruptedException
+    public static void shutdownBlocking(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
     {
-        syncExecutor.shutdownNow();
-        syncExecutor.awaitTermination(0, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
         resetTidying();
     }
 }
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index e7ce964..e42b91b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -775,6 +775,8 @@ public final class MessagingService implements 
MessagingServiceMBean
                     // see 
https://issues.apache.org/jira/browse/CASSANDRA-10545
                     handleIOException(e);
                 }
+
+            
connectionManagers.values().forEach(OutboundTcpConnectionPool::close);
         }
         catch (IOException e)
         {
@@ -1063,7 +1065,10 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20
         // see https://bugs.openjdk.java.net/browse/JDK-8050499
-        if (!"Unknown error: 316".equals(e.getMessage()) || !"Mac OS 
X".equals(System.getProperty("os.name")))
+        if ((!"Unknown error: 316".equals(e.getMessage()) || !"Mac OS 
X".equals(System.getProperty("os.name"))) &&
+            !"Thread signal failed".equals(e.getMessage()) && // handle 
shutdown for in-JVM dtests
+            !"Bad file descriptor".equals(e.getMessage()) &&
+            !"No such file or directory".equals(e.getMessage()))
             throw e;
     }
 
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4cfe019..e8346d8 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -392,7 +392,7 @@ public class OutboundTcpConnection extends Thread
 
         long start = System.nanoTime();
         long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
-        while (System.nanoTime() - start < timeout)
+        while (System.nanoTime() - start < timeout && !isStopped)
         {
             targetVersion = 
MessagingService.instance().getVersion(poolReference.endPoint());
             try
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 8f6c9c2..1380f43 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -601,7 +601,8 @@ public class CassandraDaemon
         }
     }
 
-    private void waitForGossipToSettle()
+    @VisibleForTesting
+    public static void waitForGossipToSettle()
     {
         int forceAfter = 
Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
         if (forceAfter == 0)
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index e82b0bb..a7ee333 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -23,15 +23,21 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.utils.ExecutorUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
+
 public class PendingRangeCalculatorService
 {
     public static final PendingRangeCalculatorService instance = new 
PendingRangeCalculatorService();
@@ -112,9 +118,8 @@ public class PendingRangeCalculatorService
     }
 
     @VisibleForTesting
-    public void shutdownExecutor() throws InterruptedException
+    public void shutdownExecutor(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
     {
-        executor.shutdown();
-        executor.awaitTermination(60, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, executor);
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index c5c1ca6..0a9a8da 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.sql.Time;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
@@ -79,6 +80,8 @@ import org.apache.cassandra.utils.progress.ProgressEventType;
 import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+
 /**
  * This abstraction contains the token/identifier of this node
  * on the identifier space. This token gets gossiped around.
@@ -659,7 +662,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
                 // wait for miscellaneous tasks like sstable and commitlog 
segment deletion
                 ScheduledExecutors.nonPeriodicTasks.shutdown();
-                if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, 
TimeUnit.MINUTES))
+                if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, 
MINUTES))
                     logger.warn("Miscellaneous task executor still busy after 
one minute; proceeding with shutdown");
             }
         }, "StorageServiceShutdownHook");
@@ -1365,9 +1368,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return bgMonitor.getSeverity(endpoint);
     }
 
-    public void shutdownBGMonitor()
+    public void shutdownBGMonitorAndWait(long timeout, TimeUnit units) throws 
TimeoutException, InterruptedException
     {
-        bgMonitor.shutdown();
+        bgMonitor.shutdownAndWait(timeout, units);
     }
 
     /**
@@ -4067,7 +4070,15 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             remainingCFs--;
         }
 
-        BatchlogManager.shutdown();
+        try
+        {
+            /* not clear this is reasonable time, but propagated from prior 
embedded behaviour */
+            BatchlogManager.shutdownAndWait(1L, MINUTES);
+        }
+        catch (TimeoutException t)
+        {
+            logger.error("Batchlog manager timed out shutting down", t);
+        }
 
         // Interrupt on going compaction and shutdown to prevent further 
compaction
         CompactionManager.instance.forceShutdown();
@@ -4093,7 +4104,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         // wait for miscellaneous tasks like sstable and commitlog segment 
deletion
         ScheduledExecutors.nonPeriodicTasks.shutdown();
-        if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, 
TimeUnit.MINUTES))
+        if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
             logger.warn("Failed to wait for non periodic tasks to shutdown");
 
         ColumnFamilyStore.shutdownPostFlushExecutor();
@@ -4551,4 +4562,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.info(String.format("Updated hinted_handoff_throttle_in_kb to 
%d", throttleInKB));
     }
 
+    @VisibleForTesting
+    public void shutdownServer()
+    {
+        if (drainOnShutdown != null)
+        {
+            Runtime.getRuntime().removeShutdownHook(drainOnShutdown);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 603366d..e0948c9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -19,11 +19,17 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -290,4 +296,11 @@ public class StreamCoordinator
             return sessionInfos.values();
         }
     }
+
+    @VisibleForTesting
+    public static void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        ExecutorUtils.shutdownAndWait(timeout, unit, streamExecutor);
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java 
b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
index ab81c20..711c5dd 100644
--- a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
+++ b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
@@ -28,6 +28,7 @@ import java.net.InetAddress;
 import java.util.StringTokenizer;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.gms.ApplicationState;
@@ -143,16 +144,9 @@ public class BackgroundActivityMonitor
         return 0.0;
     }
 
-    public void shutdown()
+    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
TimeoutException, InterruptedException
     {
-        reportThread.shutdown();
-        try
-        {
-            reportThread.awaitTermination(1L, TimeUnit.MINUTES);
-        } catch (InterruptedException e)
-        {
-            throw new IllegalStateException();
-        }
+        ExecutorUtils.shutdownAndWait(timeout, unit, reportThread);
     }
 
     public class BackgroundActivityReporter implements Runnable
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java 
b/src/java/org/apache/cassandra/utils/CLibrary.java
index b6598ec..e3bec4f 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -60,7 +60,14 @@ public final class CLibrary
     {
         try
         {
-            Native.register("c");
+            if (Boolean.getBoolean("cassandra.disable_clibrary"))
+            {
+                jnaAvailable = false;
+            }
+            else
+            {
+                Native.register("c");
+            }
         }
         catch (NoClassDefFoundError e)
         {
diff --git a/src/java/org/apache/cassandra/utils/ExecutorUtils.java 
b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
new file mode 100644
index 0000000..21933a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+public class ExecutorUtils
+{
+
+    public static Runnable runWithThreadName(Runnable runnable, String 
threadName)
+    {
+        return () -> {
+            String oldThreadName = Thread.currentThread().getName();
+            try
+            {
+                Thread.currentThread().setName(threadName);
+                runnable.run();
+            }
+            finally
+            {
+                Thread.currentThread().setName(oldThreadName);
+            }
+        };
+    }
+
+    public static void shutdownNow(Iterable<?> executors)
+    {
+        shutdown(true, executors);
+    }
+
+    public static void shutdown(Iterable<?> executors)
+    {
+        shutdown(false, executors);
+    }
+
+    public static void shutdown(boolean interrupt, Iterable<?> executors)
+    {
+        for (Object executor : executors)
+        {
+            if (executor instanceof ExecutorService)
+            {
+                if (interrupt) ((ExecutorService) executor).shutdownNow();
+                else ((ExecutorService) executor).shutdown();
+            }
+            else if (executor instanceof InfiniteLoopExecutor)
+                ((InfiniteLoopExecutor) executor).shutdownNow();
+            else if (executor instanceof Thread)
+                ((Thread) executor).interrupt();
+            else if (executor != null)
+                throw new IllegalArgumentException(executor.toString());
+        }
+    }
+
+    public static void shutdown(ExecutorService ... executors)
+    {
+        shutdown(Arrays.asList(executors));
+    }
+
+    public static void shutdownNow(ExecutorService ... executors)
+    {
+        shutdownNow(Arrays.asList(executors));
+    }
+
+    public static void awaitTermination(long timeout, TimeUnit unit, 
ExecutorService ... executors) throws InterruptedException, TimeoutException
+    {
+        awaitTermination(timeout, unit, Arrays.asList(executors));
+    }
+
+    public static void awaitTermination(long timeout, TimeUnit unit, 
Collection<?> executors) throws InterruptedException, TimeoutException
+    {
+        long deadline = System.nanoTime() + unit.toNanos(timeout);
+        awaitTerminationUntil(deadline, executors);
+    }
+
+    public static void awaitTerminationUntil(long deadline, Collection<?> 
executors) throws InterruptedException, TimeoutException
+    {
+        for (Object executor : executors)
+        {
+            long wait = deadline - System.nanoTime();
+            if (executor instanceof ExecutorService)
+            {
+                if (wait <= 0 || 
!((ExecutorService)executor).awaitTermination(wait, NANOSECONDS))
+                    throw new TimeoutException(executor + " did not terminate 
on time");
+            }
+            else if (executor instanceof InfiniteLoopExecutor)
+            {
+                if (wait <= 0 || 
!((InfiniteLoopExecutor)executor).awaitTermination(wait, NANOSECONDS))
+                    throw new TimeoutException(executor + " did not terminate 
on time");
+            }
+            else if (executor instanceof Thread)
+            {
+                Thread t = (Thread) executor;
+                if (wait <= 0)
+                    throw new TimeoutException(executor + " did not terminate 
on time");
+                t.join((wait + 999999) / 1000000L, (int) (wait % 1000000L));
+                if (t.isAlive())
+                    throw new TimeoutException(executor + " did not terminate 
on time");
+            }
+            else if (executor != null)
+            {
+                throw new IllegalArgumentException(executor.toString());
+            }
+        }
+    }
+
+    public static void shutdownAndWait(long timeout, TimeUnit unit, 
Collection<?> executors) throws TimeoutException, InterruptedException
+    {
+        shutdown(executors);
+        awaitTermination(timeout, unit, executors);
+    }
+
+    public static void shutdownNowAndWait(long timeout, TimeUnit unit, 
Collection<?> executors) throws TimeoutException, InterruptedException
+    {
+        shutdownNow(executors);
+        awaitTermination(timeout, unit, executors);
+    }
+
+    public static void shutdownAndWait(long timeout, TimeUnit unit, Object ... 
executors) throws TimeoutException, InterruptedException
+    {
+        shutdownAndWait(timeout, unit, Arrays.asList(executors));
+    }
+
+    public static void shutdownNowAndWait(long timeout, TimeUnit unit, Object 
... executors) throws TimeoutException, InterruptedException
+    {
+        shutdownNowAndWait(timeout, unit, Arrays.asList(executors));
+    }
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java 
b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
index f124383..9d42acb 100644
--- a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
+++ b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
@@ -39,6 +39,8 @@ public class NanoTimeToCurrentTimeMillis
     @VisibleForTesting
     public static final Object TIMESTAMP_UPDATE = new Object();
 
+    private static final Thread updater;
+
     /*
      * System.currentTimeMillis() is 25 nanoseconds. This is 2 nanoseconds 
(maybe) according to JMH.
      * Faster than calling both currentTimeMillis() and nanoTime().
@@ -57,7 +59,7 @@ public class NanoTimeToCurrentTimeMillis
     static
     {
         //Pick up updates from NTP periodically
-        Thread t = new Thread("NanoTimeToCurrentTimeMillis updater")
+        updater = new Thread("NanoTimeToCurrentTimeMillis updater")
         {
             @Override
             public void run()
@@ -82,7 +84,13 @@ public class NanoTimeToCurrentTimeMillis
                 }
             }
         };
-        t.setDaemon(true);
-        t.start();
+        updater.setDaemon(true);
+        updater.start();
+    }
+
+    public static void shutdown(long millis) throws InterruptedException
+    {
+        updater.interrupt();
+        updater.join(millis);
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java 
b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index e1cc7ff..c009032 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
@@ -35,7 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.ExecutorUtils;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
@@ -343,9 +344,8 @@ public final class Ref<T> implements RefCounted<T>
     }
 
     @VisibleForTesting
-    public static void shutdownReferenceReaper() throws InterruptedException
+    public static void shutdownReferenceReaper(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException
     {
-        EXEC.shutdown();
-        EXEC.awaitTermination(60, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, EXEC);
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java 
b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index b4efaa6..9c4824a 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -19,10 +19,12 @@
 package org.apache.cassandra.utils.memory;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 
@@ -62,12 +64,12 @@ public abstract class MemtablePool
     public abstract boolean needToCopyOnHeap();
 
     @VisibleForTesting
-    public void shutdown() throws InterruptedException
+    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
     {
-        cleaner.shutdown();
-        cleaner.awaitTermination(60, TimeUnit.SECONDS);
+        ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
     }
 
+
     public abstract MemtableAllocator newAllocator();
 
     /**
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
index b8899a3..4282fee 100644
--- a/test/conf/logback-dtest.xml
+++ b/test/conf/logback-dtest.xml
@@ -23,7 +23,7 @@
   <!-- Shutdown hook ensures that async appender flushes -->
   <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
-  <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+  <appender name="INSTANCEFILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
 
     <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
     <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -38,18 +38,18 @@
 
     <encoder>
       <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} 
%msg%n</pattern>
-      <immediateFlush>false</immediateFlush>
     </encoder>
+    <immediateFlush>false</immediateFlush>
   </appender>
 
-  <appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
+  <appender name="INSTANCEASYNCFILE" 
class="ch.qos.logback.classic.AsyncAppender">
     <discardingThreshold>0</discardingThreshold>
     <maxFlushTime>0</maxFlushTime>
     <queueSize>1024</queueSize>
-    <appender-ref ref="FILE"/>
+    <appender-ref ref="INSTANCEFILE"/>
   </appender>
 
-  <appender name="STDERR" target="System.err" 
class="ch.qos.logback.core.ConsoleAppender">
+  <appender name="INSTANCESTDERR" target="System.err" 
class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
       <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
     </encoder>
@@ -58,7 +58,7 @@
     </filter>
   </appender>
 
-  <appender name="STDOUT" target="System.out" 
class="ch.qos.logback.core.ConsoleAppender">
+  <appender name="INSTANCESTDOUT" target="System.out" 
class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
       <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
     </encoder>
@@ -67,7 +67,7 @@
     </filter>
   </appender>
 
-  <appender name="STDOUT" target="System.out" 
class="ch.qos.logback.core.ConsoleAppender">
+  <appender name="INSTANCESTDOUT" target="System.out" 
class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
       <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - 
%msg%n</pattern>
     </encoder>
@@ -79,8 +79,8 @@
   <logger name="org.apache.hadoop" level="WARN"/>
 
   <root level="DEBUG">
-    <appender-ref ref="ASYNCFILE" />
-    <appender-ref ref="STDERR" />
-    <appender-ref ref="STDOUT" />
+    <appender-ref ref="INSTANCEASYNCFILE" />
+    <appender-ref ref="INSTANCESTDERR" />
+    <appender-ref ref="INSTANCESTDOUT" />
   </root>
 </configuration>
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java 
b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index c7f7675..95862b6 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.distributed;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.List;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
@@ -40,18 +40,24 @@ public class Cluster extends 
AbstractCluster<IInvokableInstance> implements IClu
         super(root, version, configs, sharedClassLoader);
     }
 
-    protected IInvokableInstance newInstanceWrapper(Versions.Version version, 
InstanceConfig config)
+    protected IInvokableInstance newInstanceWrapper(int generation, 
Versions.Version version, InstanceConfig config)
     {
-        return new Wrapper(version, config);
+        return new Wrapper(generation, version, config);
     }
 
-    public static Cluster create(int nodeCount) throws Throwable
+    public static Builder<IInvokableInstance, Cluster> build(int nodeCount)
+    {
+        return new Builder<>(nodeCount, Cluster::new);
+    }
+
+    public static Cluster create(int nodeCount, Consumer<InstanceConfig> 
configUpdater) throws IOException
     {
-        return create(nodeCount, Cluster::new);
+        return build(nodeCount).withConfig(configUpdater).start();
     }
-    public static Cluster create(int nodeCount, File root)
+
+    public static Cluster create(int nodeCount) throws Throwable
     {
-        return create(nodeCount, Versions.CURRENT, root, Cluster::new);
+        return build(nodeCount).start();
     }
 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java 
b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 0c8e63a..232ef0b 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -43,28 +43,24 @@ public class UpgradeableCluster extends 
AbstractCluster<IUpgradeableInstance> im
         super(root, version, configs, sharedClassLoader);
     }
 
-    protected IUpgradeableInstance newInstanceWrapper(Versions.Version 
version, InstanceConfig config)
+    protected IUpgradeableInstance newInstanceWrapper(int generation, 
Versions.Version version, InstanceConfig config)
     {
-        return new Wrapper(version, config);
+        return new Wrapper(generation, version, config);
     }
 
-    public static UpgradeableCluster create(int nodeCount) throws Throwable
-    {
-        return create(nodeCount, UpgradeableCluster::new);
-    }
-    public static UpgradeableCluster create(int nodeCount, File root)
+    public static Builder<IUpgradeableInstance, UpgradeableCluster> build(int 
nodeCount)
     {
-        return create(nodeCount, Versions.CURRENT, root, 
UpgradeableCluster::new);
+        return new Builder<>(nodeCount, UpgradeableCluster::new);
     }
 
-    public static UpgradeableCluster create(int nodeCount, Versions.Version 
version) throws IOException
+    public static UpgradeableCluster create(int nodeCount) throws Throwable
     {
-        return create(nodeCount, version, 
Files.createTempDirectory("dtests").toFile(), UpgradeableCluster::new);
+        return build(nodeCount).start();
     }
-    public static UpgradeableCluster create(int nodeCount, Versions.Version 
version, File root)
+
+    public static UpgradeableCluster create(int nodeCount, Versions.Version 
version) throws Throwable
     {
-        return create(nodeCount, version, root, UpgradeableCluster::new);
+        return build(nodeCount).withVersion(version).start();
     }
-
 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java 
b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
similarity index 56%
copy from 
test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
copy to test/distributed/org/apache/cassandra/distributed/api/Feature.java
index b5fde84..a5c9316 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
@@ -18,31 +18,7 @@
 
 package org.apache.cassandra.distributed.api;
 
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessagingService;
-
-import java.util.function.BiConsumer;
-
-public interface IMessageFilters
+public enum Feature
 {
-    public interface Filter
-    {
-        Filter restore();
-        Filter drop();
-    }
-
-    public interface Builder
-    {
-        Builder from(int ... nums);
-        Builder to(int ... nums);
-        Filter ready();
-        Filter drop();
-    }
-
-    Builder verbs(MessagingService.Verb... verbs);
-    Builder allVerbs();
-    void reset();
-
-    // internal
-    BiConsumer<InetAddressAndPort, IMessage> 
filter(BiConsumer<InetAddressAndPort, IMessage> applyIfNotFiltered);
+    NETWORK, GOSSIP
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java 
b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 3834093..d5382b4 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -37,7 +37,9 @@ public interface IInstance extends IIsolatedExecutor
     UUID schemaVersion();
 
     void startup();
+    boolean isShutdown();
     Future<Void> shutdown();
+    Future<Void> shutdown(boolean graceful);
 
     // these methods are not for external use, but for simplicity we leave 
them public and on the normal IInstance interface
     void startup(ICluster cluster);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
index 6741b3f..3e5a18f 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -38,4 +38,5 @@ public interface IInstanceConfig
     Object get(String fieldName);
     String getString(String fieldName);
     int getInt(String fieldName);
+    boolean has(Feature featureFlag);
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java 
b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
index b5fde84..f7c8094 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -18,11 +18,8 @@
 
 package org.apache.cassandra.distributed.api;
 
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 
-import java.util.function.BiConsumer;
-
 public interface IMessageFilters
 {
     public interface Filter
@@ -44,5 +41,6 @@ public interface IMessageFilters
     void reset();
 
     // internal
-    BiConsumer<InetAddressAndPort, IMessage> 
filter(BiConsumer<InetAddressAndPort, IMessage> applyIfNotFiltered);
+    boolean permit(IInstance from, IInstance to, int verb);
+
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index c27d9bf..19fb7e5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -30,11 +30,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -91,6 +89,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
     // to ensure we have instantiated the main classloader's LoggerFactory 
(and any LogbackStatusListener)
     // before we instantiate any for a new instance
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractCluster.class);
+    private static final AtomicInteger generation = new AtomicInteger();
 
     private final File root;
     private final ClassLoader sharedClassLoader;
@@ -104,6 +103,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
 
     protected class Wrapper extends DelegatingInvokableInstance implements 
IUpgradeableInstance
     {
+        private final int generation;
         private final InstanceConfig config;
         private volatile IInvokableInstance delegate;
         private volatile Versions.Version version;
@@ -112,21 +112,22 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster,
         protected IInvokableInstance delegate()
         {
             if (delegate == null)
-                delegate = newInstance();
+                delegate = newInstance(generation);
             return delegate;
         }
 
-        public Wrapper(Versions.Version version, InstanceConfig config)
+        public Wrapper(int generation, Versions.Version version, 
InstanceConfig config)
         {
+            this.generation = generation;
             this.config = config;
             this.version = version;
             // we ensure there is always a non-null delegate, so that the 
executor may be used while the node is offline
-            this.delegate = newInstance();
+            this.delegate = newInstance(generation);
         }
 
-        private IInvokableInstance newInstance()
+        private IInvokableInstance newInstance(int generation)
         {
-            ClassLoader classLoader = new InstanceClassLoader(config.num(), 
version.classpath, sharedClassLoader);
+            ClassLoader classLoader = new InstanceClassLoader(generation, 
version.classpath, sharedClassLoader);
             return 
Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, 
Instance>)Instance::new, classLoader)
                            .apply(config, classLoader);
         }
@@ -136,6 +137,11 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
             return config;
         }
 
+        public boolean isShutdown()
+        {
+            return isShutdown;
+        }
+
         @Override
         public synchronized void startup()
         {
@@ -149,10 +155,16 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster,
         @Override
         public synchronized Future<Void> shutdown()
         {
+            return shutdown(true);
+        }
+
+        @Override
+        public synchronized Future<Void> shutdown(boolean graceful)
+        {
             if (isShutdown)
                 throw new IllegalStateException();
             isShutdown = true;
-            Future<Void> future = delegate.shutdown();
+            Future<Void> future = delegate.shutdown(graceful);
             delegate = null;
             return future;
         }
@@ -187,19 +199,20 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster,
         this.sharedClassLoader = sharedClassLoader;
         this.instances = new ArrayList<>();
         this.instanceMap = new HashMap<>();
+        int generation = AbstractCluster.generation.incrementAndGet();
         for (InstanceConfig config : configs)
         {
-            I instance = newInstanceWrapper(version, config);
+            I instance = newInstanceWrapper(generation, version, config);
             instances.add(instance);
             // we use the config().broadcastAddressAndPort() here because we 
have not initialised the Instance
             I prev = instanceMap.put(instance.broadcastAddressAndPort(), 
instance);
             if (null != prev)
                 throw new IllegalStateException("Cluster cannot have multiple 
nodes with same InetAddressAndPort: " + instance.broadcastAddressAndPort() + " 
vs " + prev.broadcastAddressAndPort());
         }
-        this.filters = new MessageFilters(this);
+        this.filters = new MessageFilters();
     }
 
-    protected abstract I newInstanceWrapper(Versions.Version version, 
InstanceConfig config);
+    protected abstract I newInstanceWrapper(int generation, Versions.Version 
version, InstanceConfig config);
 
     /**
      * WARNING: we index from 1 here, for consistency with inet address!
@@ -257,9 +270,12 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
     {
         for (IInstance reportTo: instances)
         {
+            if (reportTo.isShutdown())
+                continue;
+
             for (IInstance reportFrom: instances)
             {
-                if (reportFrom == reportTo)
+                if (reportFrom == reportTo || reportFrom.isShutdown())
                     continue;
 
                 int minVersion = Math.min(reportFrom.getMessagingVersion(), 
reportTo.getMessagingVersion());
@@ -335,46 +351,83 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster,
         C newCluster(File root, Versions.Version version, List<InstanceConfig> 
configs, ClassLoader sharedClassLoader);
     }
 
-    protected static <I extends IInstance, C extends AbstractCluster<I>> C
-    create(int nodeCount, Factory<I, C> factory) throws Throwable
+    public static class Builder<I extends IInstance, C extends 
AbstractCluster<I>>
     {
-        return create(nodeCount, Files.createTempDirectory("dtests").toFile(), 
factory);
-    }
+        private final int nodeCount;
+        private final Factory<I, C> factory;
+        private int subnet;
+        private File root;
+        private Versions.Version version;
+        private Consumer<InstanceConfig> configUpdater;
+        public Builder(int nodeCount, Factory<I, C> factory)
+        {
+            this.nodeCount = nodeCount;
+            this.factory = factory;
+        }
 
-    protected static <I extends IInstance, C extends AbstractCluster<I>> C
-    create(int nodeCount, File root, Factory<I, C> factory)
-    {
-        return create(nodeCount, Versions.CURRENT, root, factory);
-    }
+        public Builder<I, C> withSubnet(int subnet)
+        {
+            this.subnet = subnet;
+            return this;
+        }
 
-    protected static <I extends IInstance, C extends AbstractCluster<I>> C
-    create(int nodeCount, Versions.Version version, Factory<I, C> factory) 
throws IOException
-    {
-        return create(nodeCount, version, 
Files.createTempDirectory("dtests").toFile(), factory);
-    }
+        public Builder<I, C> withRoot(File root)
+        {
+            this.root = root;
+            return this;
+        }
 
-    protected static <I extends IInstance, C extends AbstractCluster<I>> C
-    create(int nodeCount, Versions.Version version, File root, Factory<I, C> 
factory)
-    {
-        root.mkdirs();
-        setupLogging(root);
+        public Builder<I, C> withVersion(Versions.Version version)
+        {
+            this.version = version;
+            return this;
+        }
 
-        ClassLoader sharedClassLoader = 
Thread.currentThread().getContextClassLoader();
+        public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)
+        {
+            this.configUpdater = updater;
+            return this;
+        }
 
-        List<InstanceConfig> configs = new ArrayList<>();
-        long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / 
nodeCount);
-        for (int i = 0 ; i < nodeCount ; ++i)
+        public C createWithoutStarting() throws IOException
         {
-            InstanceConfig config = InstanceConfig.generate(i + 1, root, 
String.valueOf(token));
-            configs.add(config);
-            token += increment;
+            File root = this.root;
+            Versions.Version version = this.version;
+
+            if (root == null)
+                root = Files.createTempDirectory("dtests").toFile();
+            if (version == null)
+                version = Versions.CURRENT;
+
+            root.mkdirs();
+            setupLogging(root);
+
+            ClassLoader sharedClassLoader = 
Thread.currentThread().getContextClassLoader();
+
+            List<InstanceConfig> configs = new ArrayList<>();
+            long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / 
nodeCount);
+            for (int i = 0; i < nodeCount; ++i)
+            {
+                InstanceConfig config = InstanceConfig.generate(i + 1, subnet, 
root, String.valueOf(token));
+                if (configUpdater != null)
+                    configUpdater.accept(config);
+                configs.add(config);
+                token += increment;
+            }
+
+            C cluster = factory.newCluster(root, version, configs, 
sharedClassLoader);
+            return cluster;
         }
 
-        C cluster = factory.newCluster(root, version, configs, 
sharedClassLoader);
-        cluster.startup();
-        return cluster;
+        public C start() throws IOException
+        {
+            C cluster = createWithoutStarting();
+            cluster.startup();
+            return cluster;
+        }
     }
 
+
     private static void setupLogging(File root)
     {
         try
@@ -398,6 +451,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
     public void close()
     {
         FBUtilities.waitOnFutures(instances.stream()
+                                           .filter(i -> !i.isShutdown())
                                            .map(IInstance::shutdown)
                                            .collect(Collectors.toList()),
                                   1L, TimeUnit.MINUTES);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index dce03ca..29426cb 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -35,9 +35,6 @@ import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.LoggerContext;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
 import org.apache.cassandra.concurrent.StageManager;
@@ -48,7 +45,9 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.db.BatchlogManager;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SystemKeyspace;
@@ -58,12 +57,14 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -76,11 +77,17 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamCoordinator;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Ref;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
     public final IInstanceConfig config;
@@ -93,6 +100,10 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         this.config = config;
         InstanceIDDefiner.setInstanceId(config.num());
         
FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address);
+        // Set the config at instance creation, possibly before startup() has 
run on all other instances.
+        // setMessagingVersions below will call runOnInstance which will 
instantiate
+        // the MessagingService and dependencies preventing later changes to 
network parameters.
+        Config.setOverrideLoadConfig(() -> loadConfig(config));
     }
 
     public IInstanceConfig config()
@@ -140,6 +151,11 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         throw new UnsupportedOperationException();
     }
 
+    public boolean isShutdown()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void schemaChangeInternal(String query)
     {
@@ -166,7 +182,10 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     private void registerMockMessaging(ICluster cluster)
     {
         BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, 
message) -> cluster.get(to).receiveMessage(message);
-        BiConsumer<InetAddressAndPort, IMessage> 
deliverToInstanceIfNotFiltered = cluster.filters().filter(deliverToInstance);
+        BiConsumer<InetAddressAndPort, IMessage> 
deliverToInstanceIfNotFiltered = (to, message) -> {
+            if (cluster.filters().permit(this, cluster.get(to), 
message.verb()))
+                deliverToInstance.accept(to, message);
+        };
 
         Map<InetAddress, InetAddressAndPort> addressAndPortMap = new 
HashMap<>();
         cluster.stream().forEach(instance -> {
@@ -182,6 +201,26 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 new MessageDeliverySink(deliverToInstanceIfNotFiltered, 
addressAndPortMap::get));
     }
 
+    // unnecessary if registerMockMessaging used
+    private void registerFilter(ICluster cluster)
+    {
+        IInstance instance = this;
+        MessagingService.instance().addMessageSink(new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress toAddress)
+            {
+                // Port is not passed in, so take a best guess at the 
destination port from this instance
+                IInstance to = 
cluster.get(InetAddressAndPort.getByAddressOverrideDefaults(toAddress, 
instance.config().broadcastAddressAndPort().port));
+                return cluster.filters().permit(instance, to, 
message.verb.ordinal());
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return true;
+            }
+        });
+    }
+
     private class MessageDeliverySink implements IMessageSink
     {
         private final BiConsumer<InetAddressAndPort, IMessage> deliver;
@@ -200,6 +239,11 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 assert 
from.equals(lookupAddressAndPort.apply(messageOut.from));
                 InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
                 int version = MessagingService.instance().getVersion(to);
+
+                out.writeInt(MessagingService.PROTOCOL_MAGIC);
+                out.writeInt(id);
+                long timestamp = System.currentTimeMillis();
+                out.writeInt((int) timestamp);
                 messageOut.serialize(out, version);
                 deliver.accept(toFull, new Message(messageOut.verb.ordinal(), 
out.toByteArray(), id, version, from));
             }
@@ -217,14 +261,45 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }
     }
 
-    public void receiveMessage(IMessage message)
+    public void receiveMessage(IMessage imessage)
     {
         sync(() -> {
-            try (DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(message.bytes())))
+            // Based on 
org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
+            try (DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(imessage.bytes())))
             {
-                MessageIn<?> messageIn = MessageIn.read(in, message.version(), 
message.id());
-                Runnable deliver = new MessageDeliveryTask(messageIn, 
message.id(), System.currentTimeMillis(), false);
-                deliver.run();
+                int version = imessage.version();
+
+                MessagingService.validateMagic(input.readInt());
+                int id;
+                if (version < MessagingService.VERSION_20)
+                    id = Integer.parseInt(input.readUTF());
+                else
+                    id = input.readInt();
+                assert imessage.id() == id;
+
+                long timestamp = System.currentTimeMillis();
+                boolean isCrossNodeTimestamp = false;
+
+                // make sure to readInt, even if cross_node_to is not enabled
+                int partial = input.readInt();
+                if (DatabaseDescriptor.hasCrossNodeTimeout())
+                {
+                    long crossNodeTimestamp = (timestamp & 
0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+                    isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
+                    timestamp = crossNodeTimestamp;
+                }
+
+                MessageIn message = MessageIn.read(input, version, id);
+                if (message == null)
+                {
+                    // callback expired; nothing to do
+                    return;
+                }
+                if (version <= MessagingService.current_version)
+                {
+                    MessagingService.instance().receive(message, id, 
timestamp, isCrossNodeTimestamp);
+                }
+                // else ignore message
             }
             catch (Throwable t)
             {
@@ -251,7 +326,6 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             {
                 mkdirs();
 
-                Config.setOverrideLoadConfig(() -> loadConfig(config));
                 DatabaseDescriptor.setDaemonInitialized();
                 DatabaseDescriptor.createAllDirectories();
 
@@ -281,8 +355,28 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                     throw new RuntimeException(e);
                 }
 
-                initializeRing(cluster);
-                registerMockMessaging(cluster);
+                if (config.has(NETWORK))
+                {
+                    registerFilter(cluster);
+                    MessagingService.instance().listen();
+                }
+                else
+                {
+                    // Even though we don't use MessagingService, access the 
static SocketFactory
+                    // instance here so that we start the static event loop 
state
+//                    -- not sure what that means?  
SocketFactory.instance.getClass();
+                    registerMockMessaging(cluster);
+                }
+
+                // TODO: this is more than just gossip
+                if (config.has(GOSSIP))
+                {
+                    StorageService.instance.initServer();
+                }
+                else
+                {
+                    initializeRing(cluster);
+                }
 
                 SystemKeyspace.finishStartup();
 
@@ -366,32 +460,47 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
 
     public Future<Void> shutdown()
     {
+        return shutdown(true);
+    }
+
+    public Future<Void> shutdown(boolean graceful)
+    {
         Future<?> future = async((ExecutorService executor) -> {
             Throwable error = null;
+
+            if (config.has(GOSSIP) || config.has(NETWORK))
+            {
+                StorageService.instance.shutdownServer();
+
+                error = parallelRun(error, executor,
+                    () -> 
NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
+                );
+            }
+
             error = parallelRun(error, executor,
-                    Gossiper.instance::stop,
-                    CompactionManager.instance::forceShutdown,
-                    CommitLog.instance::shutdownBlocking,
-                    ColumnFamilyStore::shutdownFlushExecutor,
-                    ColumnFamilyStore::shutdownPostFlushExecutor,
-                    ColumnFamilyStore::shutdownReclaimExecutor,
-                    PendingRangeCalculatorService.instance::shutdownExecutor,
-                    StorageService.instance::shutdownBGMonitor,
-                    Ref::shutdownReferenceReaper,
-                    Memtable.MEMORY_POOL::shutdown,
-                    ScheduledExecutors::shutdownAndWait,
-                    SSTableReader::shutdownBlocking
+                                () -> 
Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
+                                CompactionManager.instance::forceShutdown,
+                                () -> BatchlogManager.shutdownAndWait(1L, 
MINUTES),
+                                () -> 
HintedHandOffManager.instance.shutdownAndWait(1L, MINUTES),
+                                () -> StreamCoordinator.shutdownAndWait(1L, 
MINUTES),
+                                () -> 
IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
+                                () -> 
ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
+                                () -> 
PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
+                                () -> 
StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
+                                () -> Ref.shutdownReferenceReaper(1L, MINUTES),
+                                () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, 
MINUTES),
+                                () -> SSTableReader.shutdownBlocking(1L, 
MINUTES),
+                                () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES)
             );
             error = parallelRun(error, executor,
+                                CommitLog.instance::shutdownBlocking,
                                 MessagingService.instance()::shutdown
             );
             error = parallelRun(error, executor,
-                                StageManager::shutdownAndWait,
-                                SharedExecutorPool.SHARED::shutdown
+                                () -> StageManager.shutdownAndWait(1L, 
MINUTES),
+                                () -> 
SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
             );
 
-            LoggerContext loggerContext = (LoggerContext) 
LoggerFactory.getILoggerFactory();
-            loggerContext.stop();
             Throwables.maybeFail(error);
         }).apply(isolatedExecutor);
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 6fd5c7e..363a1df 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -48,6 +48,7 @@ public class InstanceClassLoader extends URLClassLoader
                name.startsWith("org.apache.cassandra.distributed.api.")
             || name.startsWith("sun.")
             || name.startsWith("oracle.")
+            || name.startsWith("com.intellij.")
             || name.startsWith("com.sun.")
             || name.startsWith("com.oracle.")
             || name.startsWith("java.")
@@ -63,16 +64,16 @@ public class InstanceClassLoader extends URLClassLoader
         InstanceClassLoader create(int id, URL[] urls, ClassLoader 
sharedClassLoader);
     }
 
-    private final int id;
     private final URL[] urls;
+    private final int generation; // used to help debug class loader leaks, by 
helping determine which classloaders should have been collected
     private final ClassLoader sharedClassLoader;
 
-    InstanceClassLoader(int id, URL[] urls, ClassLoader sharedClassLoader)
+    InstanceClassLoader(int generation, URL[] urls, ClassLoader 
sharedClassLoader)
     {
         super(urls, null);
-        this.id = id;
         this.urls = urls;
         this.sharedClassLoader = sharedClassLoader;
+        this.generation = generation;
     }
 
     @Override
@@ -109,7 +110,7 @@ public class InstanceClassLoader extends URLClassLoader
     public String toString()
     {
         return "InstanceClassLoader{" +
-               "id=" + id +
+               "generation=" + generation +
                ", urls=" + Arrays.toString(urls) +
                '}';
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index ab15fed..efe9a0f 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SimpleSeedProvider;
@@ -30,6 +31,7 @@ import java.lang.reflect.Field;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -45,6 +47,8 @@ public class InstanceConfig implements IInstanceConfig
     public UUID hostId() { return hostId; }
     private final Map<String, Object> params = new TreeMap<>();
 
+    private EnumSet featureFlags;
+
     private volatile InetAddressAndPort broadcastAddressAndPort;
 
     @Override
@@ -103,6 +107,7 @@ public class InstanceConfig implements IInstanceConfig
                         Collections.singletonMap("seeds", "127.0.0.1")))
                 // legacy parameters
                 .forceSet("commitlog_sync_batch_window_in_ms", 1.0);
+        this.featureFlags = EnumSet.noneOf(Feature.class);
     }
 
     private InstanceConfig(InstanceConfig copy)
@@ -110,6 +115,18 @@ public class InstanceConfig implements IInstanceConfig
         this.num = copy.num;
         this.params.putAll(copy.params);
         this.hostId = copy.hostId;
+        this.featureFlags = copy.featureFlags;
+    }
+
+    public InstanceConfig with(Feature featureFlag)
+    {
+        featureFlags.add(featureFlag);
+        return this;
+    }
+
+    public boolean has(Feature featureFlag)
+    {
+        return featureFlags.contains(featureFlag);
     }
 
     public InstanceConfig set(String fieldName, Object value)
@@ -200,13 +217,14 @@ public class InstanceConfig implements IInstanceConfig
         return (String)params.get(name);
     }
 
-    public static InstanceConfig generate(int nodeNum, File root, String token)
+    public static InstanceConfig generate(int nodeNum, int subnet, File root, 
String token)
     {
+        String ipPrefix = "127.0." + subnet + ".";
         return new InstanceConfig(nodeNum,
-                                  "127.0.0." + nodeNum,
-                                  "127.0.0." + nodeNum,
-                                  "127.0.0." + nodeNum,
-                                  "127.0.0." + nodeNum,
+                                  ipPrefix + nodeNum,
+                                  ipPrefix + nodeNum,
+                                  ipPrefix + nodeNum,
+                                  ipPrefix + nodeNum,
                                   String.format("%s/node%d/saved_caches", 
root, nodeNum),
                                   new String[] { 
String.format("%s/node%d/data", root, nodeNum) },
                                   String.format("%s/node%d/commitlog", root, 
nodeNum),
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index d82c9e4..1d26c5d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -27,28 +27,36 @@ import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URLClassLoader;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.LoggerContext;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.utils.ExecutorUtils;
 
 public class IsolatedExecutor implements IIsolatedExecutor
 {
     final ExecutorService isolatedExecutor;
+    private final String name;
     private final ClassLoader classLoader;
     private final Method deserializeOnInstance;
 
     IsolatedExecutor(String name, ClassLoader classLoader)
     {
+        this.name = name;
         this.isolatedExecutor = Executors.newCachedThreadPool(new 
NamedThreadFactory("isolatedExecutor", Thread.NORM_PRIORITY, classLoader, new 
ThreadGroup(name)));
         this.classLoader = classLoader;
         this.deserializeOnInstance = lookupDeserializeOneObject(classLoader);
@@ -57,9 +65,40 @@ public class IsolatedExecutor implements IIsolatedExecutor
     public Future<Void> shutdown()
     {
         isolatedExecutor.shutdown();
-        ThrowingRunnable.toRunnable(((URLClassLoader) 
classLoader)::close).run();
-        return CompletableFuture.runAsync(ThrowingRunnable.toRunnable(() -> 
isolatedExecutor.awaitTermination(60, TimeUnit.SECONDS)),
-                                          Executors.newSingleThreadExecutor());
+
+        /* Use a thread pool with a core pool size of zero to terminate the 
thread as soon as possible
+        ** so the instance class loader can be garbage collected.  Uses a 
custom thread factory
+        ** rather than NamedThreadFactory to avoid calling 
FastThreadLocal.removeAll() in 3.0 and up
+        ** as it was observed crashing during test failures and made it harder 
to find the real cause.
+        */
+        ThreadFactory threadFactory = (Runnable r) -> {
+            Thread t = new Thread(r, name + "_shutdown");
+            t.setDaemon(true);
+            return t;
+        };
+        ExecutorService shutdownExecutor = new ThreadPoolExecutor(0, 
Integer.MAX_VALUE, 0, TimeUnit.SECONDS,
+                                                                  new 
LinkedBlockingQueue<Runnable>(), threadFactory);
+        return shutdownExecutor.submit(() -> {
+            try
+            {
+                ExecutorUtils.awaitTermination(60, TimeUnit.SECONDS, 
isolatedExecutor);
+
+                // Shutdown logging last - this is not ideal as the logging 
subsystem is initialized
+                // outsize of this class, however doing it this way provides 
access to the full
+                // logging system while termination is taking place.
+                LoggerContext loggerContext = (LoggerContext) 
LoggerFactory.getILoggerFactory();
+                loggerContext.stop();
+
+                // Close the instance class loader after shutting down the 
isolatedExecutor and logging
+                // in case error handling triggers loading additional classes
+                ((URLClassLoader) classLoader).close();
+            }
+            finally
+            {
+                shutdownExecutor.shutdownNow();
+            }
+            return null;
+        });
     }
 
     public <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call) { 
return () -> isolatedExecutor.submit(call); }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java 
b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
index a72c7a5..c1607f8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
@@ -32,33 +32,20 @@ import org.apache.cassandra.net.MessagingService;
 
 public class MessageFilters implements IMessageFilters
 {
-    private final ICluster cluster;
     private final Set<Filter> filters = new CopyOnWriteArraySet<>();
 
-    public MessageFilters(AbstractCluster cluster)
+    public boolean permit(IInstance from, IInstance to, int verb)
     {
-        this.cluster = cluster;
-    }
+        if (from == null || to == null)
+            return false; // cannot deliver
+        int fromNum = from.config().num();
+        int toNum = to.config().num();
 
-    public BiConsumer<InetAddressAndPort, IMessage> 
filter(BiConsumer<InetAddressAndPort, IMessage> applyIfNotFiltered)
-    {
-        return (toAddress, message) ->
-        {
-            IInstance from = cluster.get(message.from());
-            IInstance to = cluster.get(toAddress);
-            if (from == null || to == null)
-                return; // cannot deliver
-            int fromNum = from.config().num();
-            int toNum = to.config().num();
-            int verb = message.verb();
-            for (Filter filter : filters)
-            {
-                if (filter.matches(fromNum, toNum, verb))
-                    return;
-            }
+        for (Filter filter : filters)
+            if (filter.matches(fromNum, toNum, verb))
+                return false;
 
-            applyIfNotFiltered.accept(toAddress, message);
-        };
+        return true;
     }
 
     public class Filter implements IMessageFilters.Filter
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 18ca17f..757c17f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -29,6 +29,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 
 import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.impl.IsolatedExecutor;
 
 public class DistributedTestBase
 {
@@ -41,10 +42,37 @@ public class DistributedTestBase
 
     public static String KEYSPACE = "distributed_test_keyspace";
 
+    public static void nativeLibraryWorkaround()
+    {
+        // Disable the C library for in-JVM dtests otherwise it holds a gcroot 
against the InstanceClassLoader
+        System.setProperty("cassandra.disable_clibrary", "true");
+
+        // Disable the Netty tcnative library otherwise the 
io.netty.internal.tcnative.CertificateCallbackTask,
+        // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, 
SSLPrivateKeyMethodSignTask,
+        // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the 
InstanceClassLoader.
+        System.setProperty("cassandra.disable_tcactive_openssl", "true");
+        System.setProperty("io.netty.transport.noNative", "true");
+    }
+
+    public static void processReaperWorkaround()
+    {
+        // Make sure the 'process reaper' thread is initially created under 
the main classloader,
+        // otherwise it gets created with the contextClassLoader pointing to 
an InstanceClassLoader
+        // which prevents it from being garbage collected.
+        IsolatedExecutor.ThrowingRunnable.toRunnable(() -> new 
ProcessBuilder().command("true").start().waitFor()).run();
+    }
+
     @BeforeClass
     public static void setup()
     {
         System.setProperty("org.apache.cassandra.disable_mbean_registration", 
"true");
+        nativeLibraryWorkaround();
+        processReaperWorkaround();
+    }
+
+    static String withKeyspace(String replaceIn)
+    {
+        return String.format(replaceIn, KEYSPACE);
     }
 
     protected static <C extends AbstractCluster<?>> C init(C cluster)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
new file mode 100644
index 0000000..55c700c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.sql.Date;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.management.MBeanServer;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.sun.management.HotSpotDiagnosticMXBean;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SigarLibrary;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+/* Resource Leak Test - useful when tracking down issues with in-JVM framework 
cleanup.
+ * All objects referencing the InstanceClassLoader need to be garbage 
collected or
+ * the JVM runs out of metaspace. This test also calls out to lsof to check 
which
+ * file handles are still opened.
+ *
+ * This is intended to be a burn type test where it is run outside of the test 
suites
+ * when a problem is detected (like OutOfMetaspace exceptions).
+ *
+ * Currently this test demonstrates that the InstanceClassLoader is cleaned up 
(load up
+ * the final hprof and check that the class loaders are not reachable from a 
GC root),
+ * but it shows that the file handles for Data/Index files are being leaked.
+ */
+@Ignore
+public class ResourceLeakTest extends DistributedTestBase
+{
+    // Parameters to adjust while hunting for leaks
+    final int numTestLoops = 1;            // Set this value high to crash on 
leaks, or low when tracking down an issue.
+    final boolean dumpEveryLoop = false;   // Dump heap & possibly files every 
loop
+    final boolean dumpFileHandles = false; // Call lsof whenever dumping 
resources
+    final boolean forceCollection = false; // Whether to explicitly force 
finalization/gc for smaller heap dumps
+    final long finalWaitMillis = 0l;       // Number of millis to wait before 
final resource dump to give gc a chance
+
+    static final SimpleDateFormat format = new 
SimpleDateFormat("yyyyMMddHHmmss");
+    static final String when = format.format(Date.from(Instant.now()));
+
+    static String outputFilename(String base, String description, String 
extension)
+    {
+        Path p = FileSystems.getDefault().getPath("build", "test",
+                                                  String.join("-", when, base, 
description) + extension);
+        return p.toString();
+    }
+
+    /**
+     * Retrieves the process ID or <code>null</code> if the process ID cannot 
be retrieved.
+     * @return the process ID or <code>null</code> if the process ID cannot be 
retrieved.
+     *
+     * (Duplicated from HeapUtils to avoid refactoring older releases where 
this test is useful).
+     */
+    private static Long getProcessId()
+    {
+        // Once Java 9 is ready the process API should provide a better way to 
get the process ID.
+        long pid = SigarLibrary.instance.getPid();
+
+        if (pid >= 0)
+            return Long.valueOf(pid);
+
+        return getProcessIdFromJvmName();
+    }
+
+    /**
+     * Retrieves the process ID from the JVM name.
+     * @return the process ID or <code>null</code> if the process ID cannot be 
retrieved.
+     */
+    private static Long getProcessIdFromJvmName()
+    {
+        // the JVM name in Oracle JVMs is: '<pid>@<hostname>' but this might 
not be the case on all JVMs
+        String jvmName = ManagementFactory.getRuntimeMXBean().getName();
+        try
+        {
+            return Long.parseLong(jvmName.split("@")[0]);
+        }
+        catch (NumberFormatException e)
+        {
+            // ignore
+        }
+        return null;
+    }
+
+    static void dumpHeap(String description, boolean live) throws IOException
+    {
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        HotSpotDiagnosticMXBean mxBean = 
ManagementFactory.newPlatformMXBeanProxy(
+        server, "com.sun.management:type=HotSpotDiagnostic", 
HotSpotDiagnosticMXBean.class);
+        mxBean.dumpHeap(outputFilename("heap", description, ".hprof"), live);
+    }
+
+    static void dumpOpenFiles(String description) throws IOException, 
InterruptedException
+    {
+        long pid = getProcessId();
+        ProcessBuilder map = new ProcessBuilder("/usr/sbin/lsof", "-p", 
Long.toString(pid));
+        File output = new File(outputFilename("lsof", description, ".txt"));
+        map.redirectOutput(output);
+        map.redirectErrorStream(true);
+        map.start().waitFor();
+    }
+
+    void dumpResources(String description) throws IOException, 
InterruptedException
+    {
+        dumpHeap(description, false);
+        if (dumpFileHandles)
+        {
+            dumpOpenFiles(description);
+        }
+    }
+
+    void doTest(int numClusterNodes, Consumer<InstanceConfig> updater) throws 
Throwable
+    {
+        for (int loop = 0; loop < numTestLoops; loop++)
+        {
+            try (Cluster cluster = 
Cluster.build(numClusterNodes).withConfig(updater).start())
+            {
+                if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to 
settle on the seed node
+                    cluster.get(1).runOnInstance(() -> 
CassandraDaemon.waitForGossipToSettle());
+
+                init(cluster);
+                String tableName = "tbl" + loop;
+                cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + 
tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." 
+ tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
+                cluster.get(1).callOnInstance(() -> 
FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush()));
+                if (dumpEveryLoop)
+                {
+                    dumpResources(String.format("loop%03d", loop));
+                }
+            }
+            catch (Throwable tr)
+            {
+                System.out.println("Dumping resources for exception: " + 
tr.getMessage());
+                tr.printStackTrace();
+                dumpResources("exception");
+            }
+            if (forceCollection)
+            {
+                System.runFinalization();
+                System.gc();
+            }
+        }
+    }
+
+    @Test
+    public void looperTest() throws Throwable
+    {
+        doTest(1, config -> {});
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final");
+    }
+
+    @Test
+    public void looperGossipNetworkTest() throws Throwable
+    {
+        doTest(2, config -> config.with(GOSSIP).with(NETWORK));
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final-gossip-network");
+    }
+}
diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java 
b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index 7bb4a51..0d61ad8 100644
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -62,7 +63,7 @@ public class SEPExecutorTest
         }
 
         // shutdown does not guarantee that threads are actually dead once it 
exits, only that they will stop promptly afterwards
-        sharedPool.shutdown();
+        sharedPool.shutdownAndWait(1L, TimeUnit.MINUTES);
         for (Thread thread : Thread.getAllStackTraces().keySet())
         {
             if (thread.getName().contains(MAGIC))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to