This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2ae1ec5dd2d98178f3ab4b3ed64a87147e713560
Merge: 0e0056c 34dde96
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Oct 12 11:06:42 2020 -0700

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   2 +
 build.xml                                          |   3 +
 .../apache/cassandra/service/CassandraDaemon.java  | 108 +++++------
 .../apache/cassandra/service/StorageService.java   |   2 +-
 .../distributed/impl/AbstractCluster.java          |  18 +-
 .../cassandra/distributed/impl/Instance.java       |  41 +++-
 .../cassandra/distributed/shared/Byteman.java      | 207 +++++++++++++++++++++
 .../cassandra/distributed/shared/Shared.java       |  37 ++++
 .../test/BootstrapBinaryDisabledTest.java          | 165 ++++++++++++++++
 .../test/ClientNetworkStopStartTest.java           |  79 ++++++++
 .../distributed/test/TopologyChangeTest.java       |  45 +++--
 test/resources/byteman/stream_failure.btm          |  14 ++
 12 files changed, 633 insertions(+), 88 deletions(-)

diff --cc CHANGES.txt
index bf80c8c,ee70af5..6829dac
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -29,34 -6,15 +29,36 @@@ Merged from 3.11
  Merged from 3.0:
   * Handle unexpected columns due to schema races (CASSANDRA-15899)
   * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
+ Merged from 2.2:
+  * Fixed a NullPointerException when calling nodetool enablethrift 
(CASSANDRA-16127)
  
 -3.11.8
 +4.0-beta2
 + * Add addition incremental repair visibility to nodetool repair_admin 
(CASSANDRA-14939)
 + * Always access system properties and environment variables via the new 
CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
 + * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
 + * Prevent repair from overrunning compaction (CASSANDRA-15817)
 + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
 + * Strip comment blocks from cqlsh input before processing statements 
(CASSANDRA-15802)
 + * Fix unicode chars error input (CASSANDRA-15990)
 + * Improved testability for CacheMetrics and ChunkCacheMetrics 
(CASSANDRA-15788)
 + * Handle errors in StreamSession#prepare (CASSANDRA-15852)
 + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
 + * Remove COMPACT STORAGE internals (CASSANDRA-13994)
 + * Make TimestampSerializer accept fractional seconds of varying precision 
(CASSANDRA-15976)
 + * Improve cassandra-stress logging when using a profile file that doesn't 
exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy 
(CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 + * Resolve JMX output inconsistencies from CASSANDRA-7544 
storage-port-configurable-per-node (CASSANDRA-15937)
 +Merged from 3.11:
   * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in 
megabytes not bytes (CASSANDRA-16071)
   * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
  Merged from 3.0:
 - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema 
files (CASSANDRA-13935)
   * Fix gossip shutdown order (CASSANDRA-15816)
   * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc build.xml
index e026630,191c1c8..5c9ac2f
--- a/build.xml
+++ b/build.xml
@@@ -582,13 -412,20 +582,14 @@@
            <dependency groupId="com.fasterxml.jackson.core" 
artifactId="jackson-annotations" version="2.9.10"/>
            <dependency groupId="com.googlecode.json-simple" 
artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" 
version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="0.3.0"/>
 -
 -          <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
 -          <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" 
version="0.9.2">
 -               <exclusion groupId="commons-logging" 
artifactId="commons-logging"/>
 -          </dependency>
 -          <dependency groupId="junit" artifactId="junit" version="4.6" />
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="${jamm.version}"/>
 +          <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.26"/>
 +          <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" 
version="0.25" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" 
artifactId="java-allocation-instrumenter" 
version="${allocation-instrumenter.version}" />
            <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.5" />
+           <dependency groupId="org.reflections" artifactId="reflections" 
version="0.9.12" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" 
version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
@@@ -731,19 -542,22 +732,20 @@@
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
          <dependency groupId="org.mockito" artifactId="mockito-core" />
 -        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
 +        <dependency groupId="org.quicktheories" artifactId="quicktheories" />
+         <dependency groupId="org.reflections" artifactId="reflections" />
 +        <dependency groupId="com.google.code.java-allocation-instrumenter" 
artifactId="java-allocation-instrumenter" 
version="${allocation-instrumenter.version}" />
 +        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
 +        <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" 
/>
          <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
 -              <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster"/>
 -              <dependency groupId="com.google.code.findbugs" 
artifactId="jsr305"/>
 +        <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster"/>
 +        <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
          <dependency groupId="org.antlr" artifactId="antlr"/>
 -        <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded">
 -          <exclusion groupId="io.netty" artifactId="netty-buffer"/>
 -          <exclusion groupId="io.netty" artifactId="netty-codec"/>
 -          <exclusion groupId="io.netty" artifactId="netty-handler"/>
 -          <exclusion groupId="io.netty" artifactId="netty-transport"/>
 -        </dependency>
 +        <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
 -        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" 
version="0.4.4" />
 -        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" 
version="0.4.4" />
 +        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
 +        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
          <dependency groupId="org.openjdk.jmh" 
artifactId="jmh-generator-annprocess"/>
          <dependency groupId="net.ju-n.compile-command-annotations" 
artifactId="compile-command-annotations"/>
@@@ -770,7 -572,14 +772,8 @@@
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
          <dependency groupId="org.mockito" artifactId="mockito-core" />
 -        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
+         <dependency groupId="org.reflections" artifactId="reflections" />
 -        <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded">
 -          <exclusion groupId="io.netty" artifactId="netty-buffer"/>
 -          <exclusion groupId="io.netty" artifactId="netty-codec"/>
 -          <exclusion groupId="io.netty" artifactId="netty-handler"/>
 -          <exclusion groupId="io.netty" artifactId="netty-transport"/>
 -        </dependency>
 +        <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
          <dependency groupId="io.netty" artifactId="netty-all"/>
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 4b92d69,d8bd165..6d6bc70
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -38,26 -44,19 +44,20 @@@ import com.codahale.metrics.jvm.BufferP
  import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
  import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
  import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
- import com.google.common.annotations.VisibleForTesting;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
 -import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 +import org.apache.cassandra.audit.AuditLogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
- import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
- import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
- import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.StartupClusterConnectivityChecker;
- import org.apache.cassandra.schema.TableMetadata;
 -import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.schema.Schema;
- import org.apache.cassandra.schema.SchemaConstants;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.cql3.QueryProcessor;
- import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SizeEstimatesRecorder;
+ import org.apache.cassandra.db.SystemKeyspace;
++import org.apache.cassandra.db.SystemKeyspaceMigrator40;
+ import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
  import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
++import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
++import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.exceptions.StartupException;
  import org.apache.cassandra.gms.Gossiper;
@@@ -65,22 -65,21 +65,33 @@@ import org.apache.cassandra.io.FSError
  import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.SSTableHeaderFix;
  import org.apache.cassandra.io.util.FileUtils;
++import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.metrics.CassandraMetricsRegistry;
  import org.apache.cassandra.metrics.DefaultNameFactory;
  import org.apache.cassandra.metrics.StorageMetrics;
- import org.apache.cassandra.tracing.Tracing;
- import org.apache.cassandra.utils.*;
 -import org.apache.cassandra.schema.LegacySchemaMigrator;
++import org.apache.cassandra.net.StartupClusterConnectivityChecker;
++import org.apache.cassandra.schema.Schema;
++import org.apache.cassandra.schema.SchemaConstants;
++import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.security.ThreadAwareSecurityManager;
 -import org.apache.cassandra.thrift.ThriftServer;
+ import org.apache.cassandra.tracing.Tracing;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.JMXServerUtils;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.MBeanWrapper;
+ import org.apache.cassandra.utils.Mx4jTool;
+ import org.apache.cassandra.utils.NativeLibrary;
+ import org.apache.cassandra.utils.WindowsTimer;
  
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND;
- import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_JMX_REMOTE_PORT;
++import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_CLASS_PATH;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME;
 +
  /**
   * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
   * service, which defines not only a way to activate and deactivate it, but 
also
@@@ -167,24 -166,10 +178,24 @@@ public class CassandraDaemo
          }
      }
  
 +    @VisibleForTesting
 +    public static Runnable SPECULATION_THRESHOLD_UPDATER = 
 +        () -> 
 +        {
 +            try
 +            {
 +                Keyspace.allExisting().forEach(k -> 
k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold));
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.warn("Failed to update speculative retry thresholds.", 
t);
 +                JVMStabilityInspector.inspectThrowable(t);
 +            }
 +        };
 +    
      static final CassandraDaemon instance = new CassandraDaemon();
  
-     private NativeTransportService nativeTransportService;
 -    private volatile Server thriftServer;
+     private volatile NativeTransportService nativeTransportService;
      private JMXConnectorServer jmxServer;
  
      private final boolean runManaged;
@@@ -445,25 -435,20 +456,25 @@@
          // due to scheduling errors or race conditions
          
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(),
 5, 1, TimeUnit.MINUTES);
  
 +        // schedule periodic recomputation of speculative retry thresholds
 +        
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SPECULATION_THRESHOLD_UPDATER,
 
 +                                                                
DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +                                                                
DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +                                                                NANOSECONDS);
 +
-         initializeNativeTransport();
+         initializeClientTransports();
  
          completeSetup();
      }
  
 -    public synchronized void initializeClientTransports()
 +    public void setupVirtualKeyspaces()
      {
 -        // Thrift
 -        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
 -        int rpcPort = DatabaseDescriptor.getRpcPort();
 -        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
 -        if (thriftServer == null)
 -            thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 +        
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
 +        
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
 +    }
  
-     public void initializeNativeTransport()
++    public synchronized void initializeClientTransports()
 +    {
          // Native transport
          if (nativeTransportService == null)
              nativeTransportService = new NativeTransportService();
@@@ -560,34 -530,24 +571,28 @@@
       */
      public void start()
      {
 +        StartupClusterConnectivityChecker connectivityChecker = 
StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
 +                                                                              
                           
DatabaseDescriptor.getBlockForPeersInRemoteDatacenters());
 +        connectivityChecker.execute(Gossiper.instance.getEndpoints(), 
DatabaseDescriptor.getEndpointSnitch()::getDatacenter);
 +
-         // We only start transports if bootstrap has completed and we're not 
in survey mode,
-         // OR if we are in survey mode and streaming has completed but we're 
not using auth
-         // OR if we have not joined the ring yet.
-         if (StorageService.instance.hasJoined())
+         // check to see if transports may start else return without starting. 
 This is needed when in survey mode or
+         // when bootstrap has not completed.
+         try
          {
-             if (StorageService.instance.isSurveyMode())
-             {
-                 if (StorageService.instance.isBootstrapMode() || 
DatabaseDescriptor.getAuthenticator().requireAuthentication())
-                 {
-                     logger.info("Not starting client transports in 
write_survey mode as it's bootstrapping or " +
-                             "auth is enabled");
-                     return;
-                 }
-             }
-             else
-             {
-                 if (!SystemKeyspace.bootstrapComplete())
-                 {
-                     logger.info("Not starting client transports as bootstrap 
has not completed");
-                     return;
-                 }
-             }
+             validateTransportsCanStart();
          }
+         catch (IllegalStateException isx)
+         {
+             // If there are any errors, we just log and return in this case
 -            logger.info(isx.getMessage());
++            logger.warn(isx.getMessage());
+             return;
+         }
+ 
+         startClientTransports();
+     }
  
+     private void startClientTransports()
+     {
          String nativeFlag = 
System.getProperty("cassandra.start_native_transport");
          if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || 
(nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
          {
@@@ -631,16 -596,14 +635,13 @@@
      }
  
      @VisibleForTesting
-     public void destroyNativeTransport() throws InterruptedException
+     public void destroyClientTransports()
      {
 -        stopThriftServer();
+         stopNativeTransport();
          if (nativeTransportService != null)
-         {
              nativeTransportService.destroy();
-             nativeTransportService = null;
-         }
      }
  
- 
      /**
       * Clean up all resources obtained during the lifetime of the daemon. This
       * is a hook for JSVC.
@@@ -765,10 -726,42 +768,9 @@@
  
      public boolean isNativeTransportRunning()
      {
-         return nativeTransportService != null ? 
nativeTransportService.isRunning() : false;
+         return nativeTransportService != null && 
nativeTransportService.isRunning();
      }
  
 -    public void startThriftServer()
 -    {
 -        validateTransportsCanStart();
 -
 -        if (thriftServer == null)
 -            throw new IllegalStateException("setup() must be called first for 
CassandraDaemon");
 -        thriftServer.start();
 -    }
 -
 -    public void stopThriftServer()
 -    {
 -        if (thriftServer != null)
 -        {
 -            thriftServer.stop();
 -        }
 -    }
 -
 -    public boolean isThriftServerRunning()
 -    {
 -        return thriftServer != null && thriftServer.isRunning();
 -    }
 -
 -    public int getMaxNativeProtocolVersion()
 -    {
 -        return nativeTransportService.getMaxProtocolVersion();
 -    }
 -
 -    public void refreshMaxNativeProtocolVersion()
 -    {
 -        if (nativeTransportService != null)
 -            nativeTransportService.refreshMaxNegotiableProtocolVersion();
 -    }
--
      /**
       * A convenience method to stop and destroy the daemon in one shot.
       */
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 3fee754,5477e36..361519d
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -35,8 -35,8 +35,9 @@@ import java.util.concurrent.atomic.Atom
  import java.util.function.BiConsumer;
  import java.util.function.BiPredicate;
  import java.util.function.Consumer;
+ import java.util.function.Predicate;
  import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  import java.util.stream.Stream;
  
  import com.google.common.collect.Sets;
@@@ -68,12 -70,11 +70,13 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.shared.ShutdownException;
  import org.apache.cassandra.distributed.shared.Versions;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.SimpleCondition;
+ import org.reflections.Reflections;
  
 +import static 
org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
 +
  /**
   * AbstractCluster creates, initializes and manages Cassandra instances 
({@link Instance}.
   *
@@@ -186,11 -174,11 +196,11 @@@ public abstract class AbstractCluster<
  
          private IInvokableInstance newInstance(int generation)
          {
-             ClassLoader classLoader = new InstanceClassLoader(generation, 
config.num(), version.classpath, sharedClassLoader);
+             ClassLoader classLoader = new InstanceClassLoader(generation, 
config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE);
              if (instanceInitializer != null)
                  instanceInitializer.accept(classLoader, config.num());
 -            return 
Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, 
IInvokableInstance>)Instance::new, classLoader)
 -                                        .apply(config, classLoader);
 +            return 
Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, 
Instance>)Instance::new, classLoader)
 +                                        
.apply(config.forVersion(version.major), classLoader);
          }
  
          public IInstanceConfig config()
@@@ -769,5 -729,11 +779,11 @@@
                 .collect(Collectors.toList());
      }
  
+     private static Set<String> findClassesMarkedForSharedClassLoader()
+     {
+         return new 
Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream()
 -                                                      .map(Class::getName)
 -                                                      
.collect(Collectors.toSet());
++                                .map(Class::getName)
++                                .collect(Collectors.toSet());
+     }
  }
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 83669e2,aa37029..6ad0712
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -152,11 -142,8 +152,12 @@@ public class Instance extends IsolatedE
          // Set the config at instance creation, possibly before startup() has 
run on all other instances.
          // setMessagingVersions below will call runOnInstance which will 
instantiate
          // the MessagingService and dependencies preventing later changes to 
network parameters.
-         Config.setOverrideLoadConfig(() -> loadConfig(config));
+         Config single = loadConfig(config);
+         Config.setOverrideLoadConfig(() -> single);
 +
 +        // Enable streaming inbound handler tracking so they can be closed 
properly without leaking
 +        // the blocking IO thread.
 +        StreamingInboundHandler.trackInboundHandlers();
      }
  
      @Override
@@@ -405,6 -511,6 +406,9 @@@
                      throw e;
                  }
  
++                // Start up virtual table support
++                
CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
++
                  Keyspace.setInitialized();
  
                  // Replay any CommitLogSegments found on disk
@@@ -430,12 -535,9 +434,13 @@@
  //                    -- not sure what that means?  
SocketFactory.instance.getClass();
                      registerMockMessaging(cluster);
                  }
 +                registerInboundFilter(cluster);
 +                registerOutboundFilter(cluster);
 +
 +                JVMStabilityInspector.replaceKiller(new InstanceKiller());
  
                  // TODO: this is more than just gossip
+                 
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
                  if (config.has(GOSSIP))
                  {
                      StorageService.instance.initServer();
@@@ -451,21 -552,18 +456,19 @@@
  
                  SystemKeyspace.finishStartup();
  
+                 CassandraDaemon.getInstanceForTesting().setupCompleted();
+ 
                  if (config.has(NATIVE_PROTOCOL))
                  {
-                     // Start up virtual table support
-                     
CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
- 
-                     
CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
-                     
CassandraDaemon.getInstanceForTesting().startNativeTransport();
-                     StorageService.instance.setRpcReady(true);
+                     
CassandraDaemon.getInstanceForTesting().initializeClientTransports();
+                     CassandraDaemon.getInstanceForTesting().start();
                  }
  
 -                if 
(!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
 -                    throw new IllegalStateException();
 -                if (DatabaseDescriptor.getStoragePort() != 
broadcastAddress().getPort())
 -                    throw new IllegalStateException();
 +                if 
(!FBUtilities.getBroadcastAddressAndPort().address.equals(broadcastAddress().getAddress())
 ||
 +                    FBUtilities.getBroadcastAddressAndPort().port != 
broadcastAddress().getPort())
 +                    throw new IllegalStateException(String.format("%s != %s", 
FBUtilities.getBroadcastAddressAndPort(), broadcastAddress()));
 +
 +                ActiveRepairService.instance.start();
              }
              catch (Throwable t)
              {
@@@ -586,20 -682,15 +589,21 @@@
                                  () -> BufferPool.shutdownLocalCleaner(1L, 
MINUTES),
                                  () -> Ref.shutdownReferenceReaper(1L, 
MINUTES),
                                  () -> 
Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
 +                                () -> 
DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES),
                                  () -> SSTableReader.shutdownBlocking(1L, 
MINUTES),
 -                                () -> 
DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
 +                                () -> 
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES)
              );
 +
              error = parallelRun(error, executor,
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES),
 -                                (IgnoreThrowingRunnable) 
MessagingService.instance()::shutdown
 +                                CommitLog.instance::shutdownBlocking,
-                                 () -> 
MessagingService.instance().shutdown(1L, MINUTES, false, true)
++                                // can only shutdown message once, so if the 
test shutsdown an instance, then ignore the failure
++                                (IgnoreThrowingRunnable) () -> 
MessagingService.instance().shutdown(1L, MINUTES, false, true)
              );
              error = parallelRun(error, executor,
 -                                () -> StageManager.shutdownAndWait(1L, 
MINUTES),
 +                                () -> 
GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES),
 +                                () -> Stage.shutdownAndWait(1L, MINUTES),
                                  () -> 
SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
              error = parallelRun(error, executor,
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
index 0000000,3ac5028..a7ac605
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
@@@ -1,0 -1,165 +1,165 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.TimeoutException;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.distributed.api.LogResult;
+ import org.apache.cassandra.distributed.api.SimpleQueryResult;
+ import org.apache.cassandra.distributed.api.TokenSupplier;
+ import org.apache.cassandra.distributed.shared.Byteman;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.shared.Shared;
+ 
+ /**
+  * Replaces python dtest 
bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled
+  */
+ public class BootstrapBinaryDisabledTest extends TestBaseImpl
+ {
+     @Test
+     public void test() throws IOException, TimeoutException
+     {
+         Map<String, Object> config = new HashMap<>();
+         config.put("authenticator", 
"org.apache.cassandra.auth.PasswordAuthenticator");
+         config.put("authorizer", 
"org.apache.cassandra.auth.CassandraAuthorizer");
+         config.put("role_manager", 
"org.apache.cassandra.auth.CassandraRoleManager");
+         config.put("permissions_validity_in_ms", 0);
+         config.put("roles_validity_in_ms", 0);
+ 
+         int originalNodeCount = 1;
+         int expandedNodeCount = originalNodeCount + 2;
+         Byteman byteman = 
Byteman.createFromScripts("test/resources/byteman/stream_failure.btm");
+         try (Cluster cluster = init(Cluster.build(originalNodeCount)
+                                            
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                            
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                            .withConfig(c -> {
+                                                config.forEach(c::set);
+                                                c.with(Feature.GOSSIP, 
Feature.NETWORK, Feature.NATIVE_PROTOCOL);
+                                            })
+                                            .withInstanceInitializer((cl, 
nodeNumber) -> {
+                                                switch (nodeNumber) {
+                                                    case 1:
+                                                    case 2:
+                                                        byteman.install(cl);
+                                                        break;
+                                                }
+                                            })
+                                            .start()))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk text 
primary key)");
+             populate(cluster.get(1));
+             cluster.forEach(c -> c.flush(KEYSPACE));
+ 
+             bootstrap(cluster, config, false);
+             // Test write survey behaviour
+             bootstrap(cluster, config, true);
+         }
+     }
+ 
+     private static void bootstrap(Cluster cluster,
+                                   Map<String, Object> config,
+                                   boolean isWriteSurvey) throws 
TimeoutException
+     {
+         IInstanceConfig nodeConfig = cluster.newInstanceConfig();
+         nodeConfig.set("auto_bootstrap", true);
+         config.forEach(nodeConfig::set);
+ 
+         //TODO can we make this more isolated?
+         System.setProperty("cassandra.ring_delay_ms", "5000");
+         if (isWriteSurvey)
+             System.setProperty("cassandra.write_survey", "true");
+ 
+         RewriteEnabled.enable();
+         cluster.bootstrap(nodeConfig).startup();
+         IInvokableInstance node = cluster.get(cluster.size());
+         assertLogHas(node, "Some data streaming failed");
+         assertLogHas(node, isWriteSurvey ?
+                            "Not starting client transports in write_survey 
mode as it's bootstrapping or auth is enabled" :
+                            "Node is not yet bootstrapped completely");
+ 
+         node.nodetoolResult("join").asserts()
 -             .failure()
 -             .errorContains("Cannot join the ring until bootstrap completes");
++            .failure()
++            .errorContains("Cannot join the ring until bootstrap completes");
+ 
+         RewriteEnabled.disable();
+         node.nodetoolResult("bootstrap", "resume").asserts().success();
+         if (isWriteSurvey)
+             assertLogHas(node, "Not starting client transports in 
write_survey mode as it's bootstrapping or auth is enabled");
+ 
+         if (isWriteSurvey)
+         {
+             node.nodetoolResult("join").asserts().success();
+             assertLogHas(node, "Leaving write survey mode and joining ring at 
operator request");
+         }
+ 
+         node.logs().watchFor("Starting listening for CQL clients");
+         assertBootstrapState(node, "COMPLETED");
+     }
+ 
+     private static void assertBootstrapState(IInvokableInstance node, String 
expected)
+     {
+         SimpleQueryResult qr = node.executeInternalWithResult("SELECT 
bootstrapped FROM system.local WHERE key='local'");
+         Assert.assertTrue("No rows found", qr.hasNext());
+         Assert.assertEquals(expected, qr.next().getString("bootstrapped"));
+     }
+ 
+     private static void assertLogHas(IInvokableInstance node, String msg)
+     {
+         LogResult<List<String>> results = node.logs().grep(msg);
+         Assert.assertFalse("Unable to find '" + msg + "'", 
results.getResult().isEmpty());
+     }
+ 
+     private void populate(IInvokableInstance inst)
+     {
+         for (int i = 0; i < 10; i++)
+             inst.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) 
VALUES (?)", Integer.toString(i));
+     }
+ 
+     @Shared
+     public static final class RewriteEnabled
+     {
+         private static volatile boolean enabled = false;
+ 
+         public static boolean isEnabled()
+         {
+             return enabled;
+         }
+ 
+         public static void enable()
+         {
+             enabled = true;
+         }
+ 
+         public static void disable()
+         {
+             enabled = false;
+         }
+     }
+ }
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
index 0000000,1d23ac7..1d9029b
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
@@@ -1,0 -1,192 +1,79 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
 -import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
 -import java.io.PrintStream;
 -import java.util.Arrays;
 -import java.util.Collections;
 -import java.util.Objects;
+ 
 -import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import com.datastax.driver.core.Session;
 -import org.apache.cassandra.db.marshal.CompositeType;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
++import org.apache.cassandra.distributed.api.NodeToolResult;
+ import org.apache.cassandra.distributed.api.QueryResults;
+ import org.apache.cassandra.distributed.api.SimpleQueryResult;
+ import org.apache.cassandra.distributed.shared.AssertUtils;
 -import org.apache.cassandra.thrift.Column;
 -import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 -import org.apache.cassandra.thrift.Mutation;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.thrift.TException;
 -import org.hamcrest.BaseMatcher;
 -import org.hamcrest.Description;
+ 
+ public class ClientNetworkStopStartTest extends TestBaseImpl
+ {
+     /**
+      * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-16127";>CASSANDRA-16127</a>
+      */
+     @Test
 -    public void stopStartThrift() throws IOException, TException
 -    {
 -        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL)).start()))
 -        {
 -            IInvokableInstance node = cluster.get(1);
 -            assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
 -            node.nodetoolResult("disablethrift").asserts().success();
 -            assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", false);
 -            node.nodetoolResult("enablethrift").asserts().success();
 -            assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
 -
 -            // now use it to make sure it still works!
 -            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
value int, PRIMARY KEY (pk))");
 -
 -            ThriftClientUtils.thriftClient(node, thrift -> {
 -                thrift.set_keyspace(KEYSPACE);
 -                Mutation mutation = new Mutation();
 -                ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
 -                Column column = new Column();
 -                
column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
 -                column.setValue(ByteBufferUtil.bytes(0));
 -                column.setTimestamp(System.currentTimeMillis());
 -                csoc.setColumn(column);
 -                mutation.setColumn_or_supercolumn(csoc);
 -
 -                
thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
 -                                                             
Collections.singletonMap("tbl", Arrays.asList(mutation))),
 -                                    
org.apache.cassandra.thrift.ConsistencyLevel.ALL);
 -            });
 -
 -            SimpleQueryResult qr = 
cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", 
ConsistencyLevel.ALL);
 -            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 
0).build());
 -        }
 -    }
 -
 -    /**
 -     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-16127";>CASSANDRA-16127</a>
 -     */
 -    @Test
+     public void stopStartNative() throws IOException
+     {
 -        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL)).start()))
++        //TODO why does trunk need GOSSIP for native to work but no other 
branch does?
++        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)).start()))
+         {
+             IInvokableInstance node = cluster.get(1);
+             assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
+             node.nodetoolResult("disablebinary").asserts().success();
+             assertTransportStatus(node, "binary", false);
 -            assertTransportStatus(node, "thrift", true);
+             node.nodetoolResult("enablebinary").asserts().success();
+             assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
+ 
+             // now use it to make sure it still works!
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
value int, PRIMARY KEY (pk))");
+ 
+             try (com.datastax.driver.core.Cluster client = 
com.datastax.driver.core.Cluster.builder().addContactPoints(node.broadcastAddress().getAddress()).build();
+                  Session session = client.connect())
+             {
+                 session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, value) 
VALUES (?, ?)", 0, 0);
+             }
+ 
+             SimpleQueryResult qr = 
cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", 
ConsistencyLevel.ALL);
+             AssertUtils.assertRows(qr, QueryResults.builder().row(0, 
0).build());
+         }
+     }
+ 
+     private static void assertTransportStatus(IInvokableInstance node, String 
transport, boolean running)
+     {
+         assertNodetoolStdout(node, running ? "running" : "not running", 
running ? "not running" : null, "status" + transport);
+     }
+ 
+     private static void assertNodetoolStdout(IInvokableInstance node, String 
expectedStatus, String notExpected, String... nodetool)
+     {
 -        // without CASSANDRA-16057 need this hack
 -        PrintStream previousStdout = System.out;
 -        try
 -        {
 -            ByteArrayOutputStream out = new ByteArrayOutputStream();
 -            PrintStream stdout = new PrintStream(out, true);
 -            System.setOut(stdout);
 -
 -            node.nodetoolResult(nodetool).asserts().success();
 -
 -            stdout.flush();
 -            String output = out.toString();
 -            Assert.assertThat(output, new StringContains(expectedStatus));
 -            if (notExpected != null)
 -                Assert.assertThat(output, new StringNotContains(notExpected));
 -        }
 -        finally
 -        {
 -            System.setOut(previousStdout);
 -        }
 -    }
 -
 -    private static final class StringContains extends BaseMatcher<String>
 -    {
 -        private final String expected;
 -
 -        private StringContains(String expected)
 -        {
 -            this.expected = Objects.requireNonNull(expected);
 -        }
 -
 -        public boolean matches(Object o)
 -        {
 -            return o.toString().contains(expected);
 -        }
 -
 -        public void describeTo(Description description)
 -        {
 -            description.appendText("Expected to find '" + expected + "', but 
did not");
 -        }
 -    }
 -
 -    private static final class StringNotContains extends BaseMatcher<String>
 -    {
 -        private final String notExpected;
 -
 -        private StringNotContains(String expected)
 -        {
 -            this.notExpected = Objects.requireNonNull(expected);
 -        }
 -
 -        public boolean matches(Object o)
 -        {
 -            return !o.toString().contains(notExpected);
 -        }
 -
 -        public void describeTo(Description description)
 -        {
 -            description.appendText("Expected not to find '" + notExpected + 
"', but did");
 -        }
++        NodeToolResult result = node.nodetoolResult(nodetool);
++        result.asserts().success().stdoutContains(expectedStatus);
++        if (notExpected != null)
++            result.asserts().stdoutNotContains(notExpected);
+     }
+ }
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java
index 731a87e,0000000..0fff01c
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java
@@@ -1,199 -1,0 +1,196 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.InetSocketAddress;
- import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Objects;
++import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import com.datastax.driver.core.Host;
 +import com.datastax.driver.core.Session;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy;
 +import 
org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.Event;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static 
org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
 +import static 
org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.OneNetworkInterface;
 +import static 
org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Down;
 +import static 
org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Remove;
 +import static 
org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Up;
 +import static org.assertj.core.api.Assertions.assertThat;
 +import static org.awaitility.Awaitility.await;
 +
 +@RunWith(Parameterized.class)
 +public class TopologyChangeTest extends TestBaseImpl
 +{
 +    static class EventStateListener implements Host.StateListener
 +    {
 +        enum EventType
 +        {
 +            Add,
 +            Up,
 +            Down,
 +            Remove,
 +        }
 +
 +        static class Event
 +        {
 +            InetSocketAddress host;
 +            EventType type;
 +
 +            Event(EventType type, Host host)
 +            {
 +                this.type = type;
 +                this.host = host.getBroadcastSocketAddress();
 +            }
 +
 +            public Event(EventType type, IInvokableInstance 
iInvokableInstance)
 +            {
 +                this.type = type;
 +                this.host = iInvokableInstance.broadcastAddress();
 +            }
 +
 +
 +            public String toString()
 +            {
 +                return "Event{" +
 +                       "host='" + host + '\'' +
 +                       ", type=" + type +
 +                       '}';
 +            }
 +
 +            public boolean equals(Object o)
 +            {
 +                if (this == o) return true;
 +                if (o == null || getClass() != o.getClass()) return false;
 +                Event event = (Event) o;
 +                return Objects.equals(host, event.host) &&
 +                       type == event.type;
 +            }
 +
 +            public int hashCode()
 +            {
 +                return Objects.hash(host, type);
 +            }
 +        }
 +
-         private List<Event> events = new ArrayList<>();
++        private final List<Event> events = new CopyOnWriteArrayList<>();
 +
 +        public void onAdd(Host host)
 +        {
 +            events.add(new Event(EventType.Add, host));
 +        }
 +
 +        public void onUp(Host host)
 +        {
 +            events.add(new Event(Up, host));
 +        }
 +
 +        public void onDown(Host host)
 +        {
 +            events.add(new Event(EventType.Down, host));
 +        }
 +
 +        public void onRemove(Host host)
 +        {
 +            events.add(new Event(Remove, host));
 +        }
 +
 +        public void onRegister(com.datastax.driver.core.Cluster cluster)
 +        {
 +        }
 +
 +        public void onUnregister(com.datastax.driver.core.Cluster cluster)
 +        {
 +        }
 +    }
 +
 +    @Parameterized.Parameter(0)
 +    public Strategy strategy;
 +
 +    @Parameterized.Parameters(name = "{index}: provision strategy={0}")
 +    public static Collection<Strategy[]> data()
 +    {
 +        return Arrays.asList(new Strategy[][]{ { MultipleNetworkInterfaces },
 +                                               { OneNetworkInterface }
 +        });
 +    }
 +
 +    @Test
 +    public void testDecommission() throws Throwable
 +    {
-         try (Cluster control = 
init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy).withConfig(
-         config -> {
-             config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL);
-         }).start()))
++        try (Cluster control = 
init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy)
++                                           .withConfig(config -> 
config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)).start());
++             com.datastax.driver.core.Cluster cluster = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
++             Session session = cluster.connect())
 +        {
-             final com.datastax.driver.core.Cluster cluster = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
-             Session session = cluster.connect();
 +            EventStateListener eventStateListener = new EventStateListener();
 +            session.getCluster().register(eventStateListener);
-             control.get(3).nodetool("disablebinary");
-             control.get(3).nodetool("decommission", "-f");
++
++            
control.get(3).nodetoolResult("disablebinary").asserts().success();
++            control.get(3).nodetoolResult("decommission", 
"-f").asserts().success();
 +            await().atMost(5, TimeUnit.SECONDS)
 +                   .untilAsserted(() -> Assert.assertEquals(2, 
cluster.getMetadata().getAllHosts().size()));
-             assertThat(eventStateListener.events).containsExactly(new 
Event(Remove, control.get(3)));
-             session.close();
-             cluster.close();
++            session.getCluster().unregister(eventStateListener);
++            // DOWN UP can also be seen if the jvm is slow and connections 
are closed; to avoid this make sure to use
++            // containsSequence to check that down/remove happen in this order
++            assertThat(eventStateListener.events).containsSequence(new 
Event(Down, control.get(3)), new Event(Remove, control.get(3)));
 +        }
 +    }
 +
 +    @Test
 +    public void testRestartNode() throws Throwable
 +    {
-         try (Cluster control = 
init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy).withConfig(
-         config -> {
-             config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL);
-         }).start()))
++        try (Cluster control = 
init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy)
++                                           .withConfig(config -> 
config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)).start());
++             com.datastax.driver.core.Cluster cluster = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
++             Session session = cluster.connect())
 +        {
-             final com.datastax.driver.core.Cluster cluster = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
-             Session session = cluster.connect();
 +            EventStateListener eventStateListener = new EventStateListener();
 +            session.getCluster().register(eventStateListener);
 +
 +            control.get(3).shutdown().get();
 +            await().atMost(5, TimeUnit.SECONDS)
 +                   .untilAsserted(() -> Assert.assertEquals(2, 
cluster.getMetadata().getAllHosts().stream().filter(h -> h.isUp()).count()));
 +
 +            control.get(3).startup();
 +            await().atMost(30, TimeUnit.SECONDS)
 +                   .untilAsserted(() -> Assert.assertEquals(3, 
cluster.getMetadata().getAllHosts().stream().filter(h -> h.isUp()).count()));
 +
-             assertThat(eventStateListener.events).containsExactly(new 
Event(Down, control.get(3)),
-                                                                   new 
Event(Up, control.get(3)));
- 
-             session.close();
-             cluster.close();
++            // DOWN UP can also be seen if the jvm is slow and connections 
are closed, but make sure it at least happens once
++            // given the node restarts
++            assertThat(eventStateListener.events).containsSequence(new 
Event(Down, control.get(3)),
++                                                                   new 
Event(Up, control.get(3)));
 +        }
 +    }
 +}
 +
diff --cc test/resources/byteman/stream_failure.btm
index 0000000,e40f7fe..768c7a3
mode 000000,100644..100644
--- a/test/resources/byteman/stream_failure.btm
+++ b/test/resources/byteman/stream_failure.btm
@@@ -1,0 -1,14 +1,14 @@@
+ #
+ # Inject streaming failure
+ #
+ # Before start streaming files in `StreamSession#prepare()` method,
+ # interrupt streaming by throwing RuntimeException.
+ #
+ RULE inject stream failure
+ CLASS org.apache.cassandra.streaming.StreamSession
 -METHOD prepare
 -AT INVOKE maybeCompleted
++METHOD prepareAck
++AT INVOKE startStreamingFiles
+ IF 
org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest$RewriteEnabled.isEnabled()
+ DO
+    throw new java.lang.RuntimeException("Triggering network failure")
 -ENDRULE
++ENDRULE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to