This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c2cfebf44f93af061131d73e4dcbf2a9ff582fe8
Merge: f2c9b4c 1f72cc6
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Fri Mar 27 19:04:38 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 build.xml                                          |   5 +
 src/java/org/apache/cassandra/net/MessageOut.java  |   1 +
 .../org/apache/cassandra/net/MessagingService.java |   2 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |  11 +
 src/java/org/apache/cassandra/tools/NodeTool.java  |   4 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  32 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  32 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 --
 .../cassandra/distributed/api/ICoordinator.java    |  36 --
 .../cassandra/distributed/api/IInstance.java       |  57 ----
 .../cassandra/distributed/api/IInstanceConfig.java |  60 ----
 .../distributed/api/IIsolatedExecutor.java         | 126 -------
 .../apache/cassandra/distributed/api/IListen.java  |  28 --
 .../apache/cassandra/distributed/api/IMessage.java |  37 --
 .../cassandra/distributed/api/IMessageFilters.java |  56 ----
 .../distributed/impl/AbstractCluster.java          | 373 +++++----------------
 .../cassandra/distributed/impl/Coordinator.java    |  56 ++--
 .../impl/DelegatingInvokableInstance.java          |  11 +-
 .../distributed/impl/DistributedTestSnitch.java    |  31 +-
 .../distributed/impl/IInvokableInstance.java       |  67 ----
 .../distributed/impl/IUpgradeableInstance.java     |   1 +
 .../cassandra/distributed/impl/Instance.java       | 272 ++++++++++++---
 .../distributed/impl/InstanceClassLoader.java      | 130 -------
 .../cassandra/distributed/impl/InstanceConfig.java |  98 +++---
 .../cassandra/distributed/impl/InstanceKiller.java |  50 +++
 .../distributed/impl/IsolatedExecutor.java         |   4 +
 .../apache/cassandra/distributed/impl/Listen.java  |   1 -
 .../cassandra/distributed/impl/MessageFilters.java | 168 ----------
 .../impl/{Message.java => MessageImpl.java}        |  27 +-
 .../distributed/impl/NetworkTopology.java          | 137 --------
 .../apache/cassandra/distributed/impl/RowUtil.java |   1 +
 .../cassandra/distributed/impl/TracingUtil.java    |   2 +-
 .../cassandra/distributed/impl/Versions.java       | 190 -----------
 .../mock/nodetool/InternalNodeProbe.java           |  33 +-
 .../mock/nodetool/InternalNodeProbeFactory.java    |  11 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  50 +--
 .../test/DistributedReadWritePathTest.java         | 300 -----------------
 .../distributed/test/DistributedTestBase.java      | 166 ---------
 .../distributed/test/GossipSettlesTest.java        |  16 +-
 .../cassandra/distributed/test/GossipTest.java     |   4 +-
 .../distributed/test/MessageFiltersTest.java       |  85 +++--
 .../distributed/test/MessageForwardingTest.java    |  10 +-
 .../distributed/test/NativeProtocolTest.java       |  49 +--
 .../distributed/test/NetworkTopologyTest.java      |  40 ++-
 .../cassandra/distributed/test/NodeToolTest.java   |   2 +-
 .../distributed/test/ResourceLeakTest.java         |  13 +-
 .../SharedClusterTestBase.java}                    |  38 ++-
 .../distributed/test/SimpleReadWriteTest.java      | 276 +++++++++++++++
 .../cassandra/distributed/test/TestBaseImpl.java   |  47 +++
 .../upgrade/CompactStorage2to3UpgradeTest.java     |  33 +-
 .../upgrade/MixedModeReadRepairTest.java           |   8 +-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  57 ++--
 .../distributed/upgrade/UpgradeTestBase.java       |  21 +-
 .../apache/cassandra/LogbackStatusListener.java    |   2 +-
 54 files changed, 1170 insertions(+), 2221 deletions(-)

diff --cc build.xml
index a40cb7d,ed9c1a2..2527883
--- a/build.xml
+++ b/build.xml
@@@ -520,7 -534,19 +524,8 @@@
                  artifactId="cassandra-parent"
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
+         <dependency groupId="org.mockito" artifactId="mockito-core" />
 -        <dependency groupId="org.apache.pig" artifactId="pig">
 -          <exclusion groupId="xmlenc" artifactId="xmlenc"/>
 -          <exclusion groupId="tomcat" artifactId="jasper-runtime"/>
 -          <exclusion groupId="tomcat" artifactId="jasper-compiler"/>
 -          <exclusion groupId="org.eclipse.jdt" artifactId="core"/>
 -          <exclusion groupId="net.sf.kosmosfs" artifactId="kfs"/>
 -          <exclusion groupId="hsqldb" artifactId="hsqldb"/>
 -          <exclusion groupId="antlr" artifactId="antlr"/>
 -        </dependency>
 -        <!-- TODO CASSANDRA-9543
          <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
 -        -->
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
diff --cc src/java/org/apache/cassandra/net/MessageOut.java
index ce190cb,1e291c2..09ff63b
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@@ -30,6 -30,6 +30,7 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.TypeSizes;
  import org.apache.cassandra.io.IVersionedSerializer;
++import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.FBUtilities;
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 55dbee1,05c8af8..82c06da
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -527,257 -553,19 +553,19 @@@ public abstract class AbstractCluster<
          }
      }
  
-     protected interface Factory<I extends IInstance, C extends 
AbstractCluster<I>>
-     {
-         C newCluster(File root, Versions.Version version, 
List<InstanceConfig> configs, ClassLoader sharedClassLoader);
-     }
- 
-     public static class Builder<I extends IInstance, C extends 
AbstractCluster<I>>
+     private void uncaughtExceptions(Thread thread, Throwable error)
      {
-         private final Factory<I, C> factory;
-         private int nodeCount;
-         private int subnet;
-         private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
-         private TokenSupplier tokenSupplier;
-         private File root;
-         private Versions.Version version = Versions.CURRENT;
-         private Consumer<InstanceConfig> configUpdater;
- 
-         public Builder(Factory<I, C> factory)
-         {
-             this.factory = factory;
-         }
- 
-         public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
-         {
-             this.tokenSupplier = tokenSupplier;
-             return this;
-         }
- 
-         public Builder<I, C> withSubnet(int subnet)
-         {
-             this.subnet = subnet;
-             return this;
-         }
- 
-         public Builder<I, C> withNodes(int nodeCount)
-         {
-             this.nodeCount = nodeCount;
-             return this;
-         }
- 
-         public Builder<I, C> withDCs(int dcCount)
-         {
-             return withRacks(dcCount, 1);
-         }
- 
-         public Builder<I, C> withRacks(int dcCount, int racksPerDC)
-         {
-             if (nodeCount == 0)
-                 throw new IllegalStateException("Node count will be 
calculated. Do not supply total node count in the builder");
- 
-             int totalRacks = dcCount * racksPerDC;
-             int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // 
round up to next integer
-             return withRacks(dcCount, racksPerDC, nodesPerRack);
-         }
- 
-         public Builder<I, C> withRacks(int dcCount, int racksPerDC, int 
nodesPerRack)
-         {
-             if (nodeIdTopology != null)
-                 throw new IllegalStateException("Network topology already 
created. Call withDCs/withRacks once or before withDC/withRack calls");
- 
-             nodeIdTopology = new HashMap<>();
-             int nodeId = 1;
-             for (int dc = 1; dc <= dcCount; dc++)
-             {
-                 for (int rack = 1; rack <= racksPerDC; rack++)
-                 {
-                     for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; 
rackNodeIdx++)
-                         nodeIdTopology.put(nodeId++, 
NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
-                 }
-             }
-             // adjust the node count to match the allocatation
-             final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
-             if (adjustedNodeCount != nodeCount)
-             {
-                 assert adjustedNodeCount > nodeCount : "withRacks should only 
ever increase the node count";
-                 logger.info("Network topology of {} DCs with {} racks per DC 
and {} nodes per rack required increasing total nodes to {}",
-                             dcCount, racksPerDC, nodesPerRack, 
adjustedNodeCount);
-                 nodeCount = adjustedNodeCount;
-             }
-             return this;
-         }
- 
-         public Builder<I, C> withDC(String dcName, int nodeCount)
+         if (!(thread.getContextClassLoader() instanceof InstanceClassLoader))
          {
-             return withRack(dcName, rackName(1), nodeCount);
-         }
- 
-         public Builder<I, C> withRack(String dcName, String rackName, int 
nodesInRack)
-         {
-             if (nodeIdTopology == null)
-             {
-                 if (nodeCount > 0)
-                     throw new IllegalStateException("Node count must not be 
explicitly set, or allocated using withDCs/withRacks");
- 
-                 nodeIdTopology = new HashMap<>();
-             }
-             for (int nodeId = nodeCount + 1; nodeId <= nodeCount + 
nodesInRack; nodeId++)
-                 nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, 
rackName));
- 
-             nodeCount += nodesInRack;
-             return this;
-         }
- 
-         // Map of node ids to dc and rack - must be contiguous with an entry 
nodeId 1 to nodeCount
-         public Builder<I, C> withNodeIdTopology(Map<Integer, 
NetworkTopology.DcAndRack> nodeIdTopology)
-         {
-             if (nodeIdTopology.isEmpty())
-                 throw new IllegalStateException("Topology is empty. It must 
have an entry for every nodeId.");
- 
-             IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> 
{
-                 if (nodeIdTopology.get(nodeId) == null)
-                     throw new IllegalStateException("Topology is missing 
entry for nodeId " + nodeId);
-             });
- 
-             if (nodeCount != nodeIdTopology.size())
-             {
-                 nodeCount = nodeIdTopology.size();
-                 logger.info("Adjusting node count to {} for supplied network 
topology", nodeCount);
-             }
- 
-             this.nodeIdTopology = new HashMap<>(nodeIdTopology);
- 
-             return this;
-         }
- 
-         public Builder<I, C> withRoot(File root)
-         {
-             this.root = root;
-             return this;
-         }
- 
-         public Builder<I, C> withVersion(Versions.Version version)
-         {
-             this.version = version;
-             return this;
-         }
- 
-         public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)
-         {
-             this.configUpdater = updater;
-             return this;
-         }
- 
-         public C createWithoutStarting() throws IOException
-         {
-             if (root == null)
-                 root = Files.createTempDirectory("dtests").toFile();
- 
-             if (nodeCount <= 0)
-                 throw new IllegalStateException("Cluster must have at least 
one node");
- 
-             if (nodeIdTopology == null)
-             {
-                 nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
-                                           .collect(Collectors.toMap(nodeId -> 
nodeId,
-                                                                     nodeId -> 
NetworkTopology.dcAndRack(dcName(0), rackName(0))));
-             }
- 
-             root.mkdirs();
-             setupLogging(root);
- 
-             ClassLoader sharedClassLoader = 
Thread.currentThread().getContextClassLoader();
- 
-             List<InstanceConfig> configs = new ArrayList<>();
- 
-             if (tokenSupplier == null)
-                 tokenSupplier = evenlyDistributedTokens(nodeCount);
- 
-             for (int i = 0; i < nodeCount; ++i)
-             {
-                 int nodeNum = i + 1;
-                 configs.add(createInstanceConfig(nodeNum));
-             }
- 
-             return factory.newCluster(root, version, configs, 
sharedClassLoader);
-         }
- 
-         public InstanceConfig newInstanceConfig(C cluster)
-         {
-             return createInstanceConfig(cluster.size() + 1);
-         }
- 
-         private InstanceConfig createInstanceConfig(int nodeNum)
-         {
-             String ipPrefix = "127.0." + subnet + ".";
-             String seedIp = ipPrefix + "1";
-             String ipAddress = ipPrefix + nodeNum;
-             long token = tokenSupplier.token(nodeNum);
- 
-             NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, 
nodeIdTopology);
- 
-             InstanceConfig config = InstanceConfig.generate(nodeNum, 
ipAddress, topology, root, String.valueOf(token), seedIp);
-             if (configUpdater != null)
-                 configUpdater.accept(config);
- 
-             return config;
-         }
- 
-         public C start() throws IOException
-         {
-             C cluster = createWithoutStarting();
-             cluster.startup();
-             return cluster;
-         }
-     }
- 
-     public static TokenSupplier evenlyDistributedTokens(int numNodes)
-     {
-         long increment = (Long.MAX_VALUE / numNodes) * 2;
-         return (int nodeId) -> {
-             assert nodeId <= numNodes : String.format("Can not allocate a 
token for a node %s, since only %s nodes are allowed by the token allocation 
strategy",
-                                                       nodeId, numNodes);
-             return Long.MIN_VALUE + 1 + nodeId * increment;
-         };
-     }
- 
-     public static interface TokenSupplier
-     {
-         public long token(int nodeId);
-     }
- 
-     static String dcName(int index)
-     {
-         return "datacenter" + index;
-     }
- 
-     static String rackName(int index)
-     {
-         return "rack" + index;
-     }
- 
-     private static void setupLogging(File root)
-     {
-         try
-         {
-             String testConfPath = "test/conf/logback-dtest.xml";
-             Path logConfPath = Paths.get(root.getPath(), 
"/logback-dtest.xml");
- 
-             if (!logConfPath.toFile().exists())
-             {
-                 Files.copy(new File(testConfPath).toPath(),
-                            logConfPath);
-             }
- 
-             System.setProperty("logback.configurationFile", "file://" + 
logConfPath);
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException(e);
+             Thread.UncaughtExceptionHandler handler = previousHandler;
+             if (null != handler)
+                 handler.uncaughtException(thread, error);
+             return;
          }
+         InstanceClassLoader cl = (InstanceClassLoader) 
thread.getContextClassLoader();
+         get(cl.getInstanceId()).uncaughtException(thread, error);
      }
--
++    
      @Override
      public void close()
      {
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 69c2e44,91a2aaf..e2ebef0
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -31,11 -31,15 +31,13 @@@ import org.apache.cassandra.cql3.QueryO
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.cql3.statements.SelectStatement;
- import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
  import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.api.QueryResult;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
 -import org.apache.cassandra.service.pager.Pageable;
  import org.apache.cassandra.service.pager.QueryPager;
 -import org.apache.cassandra.service.pager.QueryPagers;
  import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
@@@ -71,19 -75,23 +73,23 @@@ public class Coordinator implements ICo
          }).call();
      }
  
-     private Object[][] executeInternal(String query, Enum<?> 
consistencyLevelOrigin, Object[] boundValues)
+     protected org.apache.cassandra.db.ConsistencyLevel 
toCassandraCL(ConsistencyLevel cl)
+     {
+         return 
org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
+     }
+ 
+     private QueryResult executeInternal(String query, ConsistencyLevel 
consistencyLevelOrigin, Object[] boundValues)
      {
-         ConsistencyLevel consistencyLevel = 
ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 -        ClientState clientState = ClientState.forInternalCalls();
 +        ClientState clientState = makeFakeClientState();
          CQLStatement prepared = QueryProcessor.getStatement(query, 
clientState).statement;
          List<ByteBuffer> boundBBValues = new ArrayList<>();
+         ConsistencyLevel consistencyLevel = 
ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
          for (Object boundValue : boundValues)
-         {
              boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
-         }
  
+         prepared.validate(QueryState.forInternalCalls().getClientState());
          ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
-                                              
QueryOptions.create(consistencyLevel,
+                                              
QueryOptions.create(toCassandraCL(consistencyLevel),
                                                                   
boundBBValues,
                                                                   false,
                                                                   
Integer.MAX_VALUE,
@@@ -109,38 -129,40 +127,38 @@@
              throw new IllegalArgumentException("Page size should be strictly 
positive but was " + pageSize);
  
          return instance.sync(() -> {
-             ConsistencyLevel consistencyLevel = 
ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 +            ClientState clientState = makeFakeClientState();
+             ConsistencyLevel consistencyLevel = 
ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 -            CQLStatement prepared = QueryProcessor.getStatement(query, 
ClientState.forInternalCalls()).statement;
 +            CQLStatement prepared = QueryProcessor.getStatement(query, 
clientState).statement;
              List<ByteBuffer> boundBBValues = new ArrayList<>();
              for (Object boundValue : boundValues)
              {
                  boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
              }
  
--            prepared.validate(QueryState.forInternalCalls().getClientState());
++            prepared.validate(clientState);
              assert prepared instanceof SelectStatement : "Only SELECT 
statements can be executed with paging";
  
 -            ClientState clientState = 
QueryState.forInternalCalls().getClientState();
              SelectStatement selectStatement = (SelectStatement) prepared;
 -            QueryOptions queryOptions = 
QueryOptions.create(toCassandraCL(consistencyLevel),
 -                                                            boundBBValues,
 -                                                            false,
 -                                                            pageSize,
 -                                                            null,
 -                                                            null,
 -                                                            
Server.CURRENT_VERSION);
 -            Pageable pageable = 
selectStatement.getPageableCommand(queryOptions);
 +
-             QueryPager pager = 
selectStatement.getQuery(QueryOptions.create(consistencyLevel,
++            QueryPager pager = 
selectStatement.getQuery(QueryOptions.create(toCassandraCL(consistencyLevel),
 +                                                                            
boundBBValues,
 +                                                                            
false,
 +                                                                            
pageSize,
 +                                                                            
null,
 +                                                                            
null,
 +                                                                            
Server.CURRENT_VERSION),
 +                                                        
FBUtilities.nowInSeconds())
 +                                     .getPager(null, Server.CURRENT_VERSION);
  
              // Usually pager fetches a single page (see 
SelectStatement#execute). We need to iterate over all
              // of the results lazily.
 -            QueryPager pager = QueryPagers.pager(pageable, 
toCassandraCL(consistencyLevel), clientState, null);
 -            Iterator<Object[]> iter = 
RowUtil.toObjects(selectStatement.getResultMetadata().names,
 -                                                        
UntypedResultSet.create(selectStatement,
 -                                                                              
  pager,
 -                                                                              
  pageSize).iterator());
 -
 -            // We have to make sure iterator is not running on main thread.
              return new Iterator<Object[]>() {
-                 Iterator<Object[]> iter = 
RowUtil.toObjects(UntypedResultSet.create(selectStatement, consistencyLevel, 
clientState, pager,  pageSize));
++                Iterator<Object[]> iter = 
RowUtil.toObjects(UntypedResultSet.create(selectStatement, 
toCassandraCL(consistencyLevel), clientState, pager,  pageSize));
 +
                  public boolean hasNext()
                  {
 +                    // We have to make sure iterator is not running on main 
thread.
                      return instance.sync(() -> iter.hasNext()).call();
                  }
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 5a4dcf4,1c19bca..e8c45d8
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -34,7 -39,10 +37,11 @@@ import java.util.concurrent.TimeoutExce
  import java.util.function.BiConsumer;
  import java.util.function.Function;
  
+ import javax.management.ListenerNotFoundException;
+ import javax.management.Notification;
+ import javax.management.NotificationListener;
+ 
 +import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.SharedExecutorPool;
  import org.apache.cassandra.concurrent.StageManager;
@@@ -63,13 -77,11 +74,15 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.index.SecondaryIndexManager;
+ import org.apache.cassandra.io.IVersionedSerializer;
  import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
- import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.io.util.DataOutputPlus;
+ import org.apache.cassandra.net.CompactEndpointSerializationHelper;
  import org.apache.cassandra.net.IMessageSink;
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
@@@ -247,7 -283,57 +286,57 @@@ public class Instance extends IsolatedE
              long timestamp = System.currentTimeMillis();
              out.writeInt((int) timestamp);
              messageOut.serialize(out, version);
-             return new Message(messageOut.verb.ordinal(), out.toByteArray(), 
id, version, from);
+             return new MessageImpl(messageOut.verb.ordinal(), 
out.toByteArray(), id, version, from);
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static IMessage serializeMessage(MessageIn<?> messageIn, int id, 
InetSocketAddress from, InetSocketAddress to)
+     {
+         try (DataOutputBuffer out = new DataOutputBuffer(1024))
+         {
+             // Serialize header
+             int version = 
MessagingService.instance().getVersion(to.getAddress());
+ 
+             out.writeInt(MessagingService.PROTOCOL_MAGIC);
+             out.writeInt(id);
+             long timestamp = System.currentTimeMillis();
+             out.writeInt((int) timestamp);
+ 
+             // Serialize the message itself
+             IVersionedSerializer serializer = 
MessagingService.instance().verbSerializers.get(messageIn.verb);
+             CompactEndpointSerializationHelper.serialize(from.getAddress(), 
out);
+ 
 -            out.writeInt(messageIn.verb.ordinal());
++            
out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(messageIn.verb,
 version).ordinal());
+             out.writeInt(messageIn.parameters.size());
+             for (Map.Entry<String, byte[]> entry : 
messageIn.parameters.entrySet())
+             {
+                 out.writeUTF(entry.getKey());
+                 out.writeInt(entry.getValue().length);
+                 out.write(entry.getValue());
+             }
+ 
+             if (messageIn.payload != null && serializer != 
MessagingService.CallbackDeterminedSerializer.instance)
+             {
+                 try (DataOutputBuffer dob = new DataOutputBuffer())
+                 {
+                     serializer.serialize(messageIn.payload, dob, version);
+ 
+                     int size = dob.getLength();
+                     out.writeInt(size);
+                     out.write(dob.getData(), 0, size);
+                 }
+             }
+             else
+             {
+                 out.writeInt(0);
+             }
+ 
+ 
+             return new MessageImpl(messageIn.verb.ordinal(), 
out.toByteArray(), id, version, from);
          }
          catch (IOException e)
          {
@@@ -332,6 -420,6 +423,8 @@@
              int partial = input.readInt();
  
              return Pair.create(MessageIn.read(input, version, id), partial);
++            //long currentTime = ApproximateTime.currentTimeMillis();
++            //return MessageIn.read(input, version, id, 
MessageIn.readConstructionTime(imessage.from().getAddress(), input, 
currentTime));
          }
          catch (IOException e)
          {
@@@ -536,19 -623,15 +630,19 @@@
  
              for (int i = 0; i < tokens.size(); i++)
              {
-                 InetAddressAndPort ep = hosts.get(i);
+                 InetSocketAddress ep = hosts.get(i);
 -                Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), 
hostIds.get(i), 1);
 -                Gossiper.instance.injectApplicationState(ep.getAddress(),
 -                        ApplicationState.TOKENS,
 -                        new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
 -                storageService.onChange(ep.getAddress(),
 -                        ApplicationState.STATUS,
 -                        new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
 -                Gossiper.instance.realMarkAlive(ep.getAddress(), 
Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
 +                UUID hostId = hostIds.get(i);
 +                Token token = tokens.get(i);
 +                Gossiper.runInGossipStageBlocking(() -> {
-                     Gossiper.instance.initializeNodeUnsafe(ep.address, 
hostId, 1);
-                     Gossiper.instance.injectApplicationState(ep.address,
++                    Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), 
hostId, 1);
++                    Gossiper.instance.injectApplicationState(ep.getAddress(),
 +                                                             
ApplicationState.TOKENS,
 +                                                             new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
-                     storageService.onChange(ep.address,
++                    storageService.onChange(ep.getAddress(),
 +                                            ApplicationState.STATUS,
 +                                            new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                     Gossiper.instance.realMarkAlive(ep.address, 
Gossiper.instance.getEndpointStateForEndpoint(ep.address));
++                    Gossiper.instance.realMarkAlive(ep.getAddress(), 
Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
 +                });
                  int messagingVersion = cluster.get(ep).isShutdown()
                                         ? MessagingService.current_version
                                         : 
Math.min(MessagingService.current_version, 
cluster.get(ep).getMessagingVersion());
@@@ -570,11 -653,9 +664,12 @@@
          return shutdown(true);
      }
  
+     @Override
      public Future<Void> shutdown(boolean graceful)
      {
 +        if (!graceful)
 +            MessagingService.instance().shutdown(false);
 +
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
  
diff --cc 
test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index b76c455,625b4aa..f3eb327
--- 
a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@@ -23,9 -23,10 +23,11 @@@ import java.lang.management.ManagementF
  import java.util.Iterator;
  import java.util.Map;
  
+ import javax.management.ListenerNotFoundException;
+ 
  import com.google.common.collect.Multimap;
  
 +import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.db.ColumnFamilyStoreMBean;
  import org.apache.cassandra.db.HintedHandOffManager;
  import org.apache.cassandra.db.Keyspace;
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index c2e9e4f,0000000..83c62c8
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@@ -1,115 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.InetAddress;
 +import java.util.Collection;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.LockSupport;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
- public class GossipTest extends DistributedTestBase
++public class GossipTest extends TestBaseImpl
 +{
 +
 +    @Test
 +    public void nodeDownDuringMove() throws Throwable
 +    {
 +        int liveCount = 1;
 +        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 
30s default
 +        System.setProperty("cassandra.consistent.rangemovement", "false");
 +        System.setProperty("cassandra.consistent.simultaneousmoves.allow", 
"true");
 +        try (Cluster cluster = Cluster.build(2 + liveCount)
 +                                      .withConfig(config -> 
config.with(NETWORK).with(GOSSIP))
 +                                      .createWithoutStarting())
 +        {
 +            int fail = liveCount + 1;
 +            int late = fail + 1;
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +                cluster.get(i).startup();
 +            cluster.get(fail).startup();
 +            Collection<String> expectTokens = 
cluster.get(fail).callsOnInstance(() ->
 +                
StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress())
 +                                       
.stream().map(Object::toString).collect(Collectors.toList())
 +            ).call();
 +
-             InetAddress failAddress = 
cluster.get(fail).broadcastAddressAndPort().address;
++            InetAddress failAddress = 
cluster.get(fail).broadcastAddress().getAddress();
 +            // wait for NORMAL state
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || ep.getApplicationState(ApplicationState.STATUS) 
== null
 +                           || 
!ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL"))
 +                        
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            // set ourselves to MOVING, and wait for it to propagate
 +            cluster.get(fail).runOnInstance(() -> {
 +
 +                Token token = 
Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()),
 null);
 +                
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.moving(token));
 +            });
 +
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || 
(ep.getApplicationState(ApplicationState.STATUS) == null
 +                               || 
!ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")))
 +                        
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            cluster.get(fail).shutdown(false).get();
 +            cluster.get(late).startup();
 +            cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> {
 +                EndpointState ep;
 +                while (null == (ep = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                       || 
!ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))
 +                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +            }).accept(failAddress);
 +
 +            Collection<String> tokens = 
cluster.get(late).appliesOnInstance((InetAddress endpoint) ->
 +                
StorageService.instance.getTokenMetadata().getTokens(failAddress)
 +                                       
.stream().map(Object::toString).collect(Collectors.toList())
 +            ).apply(failAddress);
 +
 +            Assert.assertEquals(expectTokens, tokens);
 +        }
 +    }
 +    
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 07e7428,062f401..f4398da
--- 
a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -37,10 -37,27 +37,26 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessagingService;
  
- public class MessageFiltersTest extends DistributedTestBase
+ public class MessageFiltersTest extends TestBaseImpl
  {
 -
      @Test
-     public void simpleFiltersTest() throws Throwable
+     public void simpleInboundFiltersTest()
+     {
+         simpleFiltersTest(true);
+     }
+ 
+     @Test
+     public void simpleOutboundFiltersTest()
+     {
+         simpleFiltersTest(false);
+     }
+ 
+     private interface Permit
+     {
+         boolean test(int from, int to, IMessage msg);
+     }
+ 
+     private static void simpleFiltersTest(boolean inbound)
      {
          int VERB1 = MessagingService.Verb.READ.ordinal();
          int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
@@@ -52,21 -69,22 +68,22 @@@
          String MSG2 = "msg2";
  
          MessageFilters filters = new MessageFilters();
-         MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
 -        Permit permit = inbound ? filters::permitInbound : 
filters::permitOutbound;
++        Permit permit = inbound ? (from, to, msg) -> 
filters.permitInbound(from, to, msg) : (from, to, msg) -> 
filters.permitOutbound(from, to, msg);
  
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
-         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         IMessageFilters.Filter filter = 
filters.allVerbs().inbound(inbound).from(1).drop();
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+         Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
          filter.off();
-         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
          filters.reset();
  
-         filters.verbs(VERB1).from(1).to(2).drop();
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-         Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
-         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-         Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+         filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
  
          filters.reset();
          AtomicInteger counter = new AtomicInteger();
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java
index 091e5f0,0000000..c502af2
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java
@@@ -1,36 -1,0 +1,52 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
- package org.apache.cassandra.distributed.api;
++package org.apache.cassandra.distributed.test;
 +
- import org.apache.cassandra.locator.InetAddressAndPort;
++import java.io.IOException;
 +
- import java.util.stream.Stream;
++import org.junit.After;
++import org.junit.AfterClass;
++import org.junit.BeforeClass;
 +
- public interface ICluster
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ICluster;
++
++public class SharedClusterTestBase extends TestBaseImpl
 +{
++    protected static ICluster cluster;
++
++    @BeforeClass
++    public static void before() throws IOException
++    {
++        cluster = init(Cluster.build().withNodes(3).start());
++    }
 +
-     IInstance get(int i);
-     IInstance get(InetAddressAndPort endpoint);
-     int size();
-     Stream<? extends IInstance> stream();
-     Stream<? extends IInstance> stream(String dcName);
-     Stream<? extends IInstance> stream(String dcName, String rackName);
-     IMessageFilters filters();
++    @AfterClass
++    public static void after() throws Exception
++    {
++        cluster.close();
++    }
 +
++    @After
++    public void afterEach()
++    {
++        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
++        init(cluster);
++    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 0000000,0000000..f1f8674
new file mode 100644
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -1,0 -1,0 +1,276 @@@
++package org.apache.cassandra.distributed.test;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
++
++import static org.junit.Assert.assertEquals;
++
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
++
++// TODO: this test should be removed after running in-jvm dtests is set up 
via the shared API repository
++public class SimpleReadWriteTest extends SharedClusterTestBase
++{
++    @Test
++    public void coordinatorReadTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 2, 2)");
++        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 3, 3)");
++
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = ?",
++                                                  ConsistencyLevel.ALL,
++                                                  1),
++                   row(1, 1, 1),
++                   row(1, 2, 2),
++                   row(1, 3, 3));
++    }
++
++    @Test
++    public void largeMessageTest() throws Throwable
++    {
++        int largeMessageThreshold = 1024 * 64;
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v text, PRIMARY KEY (pk, ck))");
++        StringBuilder builder = new StringBuilder();
++        for (int i = 0; i < largeMessageThreshold; i++)
++            builder.append('a');
++        String s = builder.toString();
++        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, ?)",
++                                       ConsistencyLevel.ALL,
++                                       s);
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = ?",
++                                                  ConsistencyLevel.ALL,
++                                                  1),
++                   row(1, 1, s));
++    }
++
++    @Test
++    public void coordinatorWriteTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
++
++        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, 1)",
++                                       ConsistencyLevel.QUORUM);
++
++        for (int i = 0; i < 3; i++)
++        {
++            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
++                       row(1, 1, 1));
++        }
++
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1",
++                                                  ConsistencyLevel.QUORUM),
++                   row(1, 1, 1));
++    }
++
++    @Test
++    public void readRepairTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, 1)");
++
++        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1"));
++
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1",
++                                                  ConsistencyLevel.ALL), // 
ensure node3 in preflist
++                   row(1, 1, 1));
++
++        // Verify that data got repaired to the third node
++        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1"),
++                   row(1, 1, 1));
++    }
++
++    @Test
++    public void writeWithSchemaDisagreement() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
++        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
++
++        // Introduce schema disagreement
++        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 
1);
++
++        Exception thrown = null;
++        try
++        {
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
++                                           ConsistencyLevel.QUORUM);
++        }
++        catch (RuntimeException e)
++        {
++            thrown = e;
++        }
++
++        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
++        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
++    }
++
++    @Test
++    public void readWithSchemaDisagreement() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
++        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
++
++        // Introduce schema disagreement
++        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 
1);
++
++        Exception thrown = null;
++        try
++        {
++            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
++                                                      ConsistencyLevel.ALL),
++                       row(1, 1, 1, null));
++        }
++        catch (Exception e)
++        {
++            thrown = e;
++        }
++
++        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
++        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
++    }
++
++    @Test
++    public void simplePagedReadsTest() throws Throwable
++    {
++
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
++
++        int size = 100;
++        Object[][] results = new Object[size][];
++        for (int i = 0; i < size; i++)
++        {
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, ?, ?)",
++                                           ConsistencyLevel.QUORUM,
++                                           i, i);
++            results[i] = new Object[]{ 1, i, i };
++        }
++
++        // Make sure paged read returns same results with different page sizes
++        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
++        {
++            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * 
FROM " + KEYSPACE + ".tbl",
++                                                                
ConsistencyLevel.QUORUM,
++                                                                pageSize),
++                       results);
++        }
++    }
++
++    @Test
++    public void pagingWithRepairTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
++
++        int size = 100;
++        Object[][] results = new Object[size][];
++        for (int i = 0; i < size; i++)
++        {
++            // Make sure that data lands on different nodes and not 
coordinator
++            cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                                            i, i);
++
++            results[i] = new Object[]{ 1, i, i };
++        }
++
++        // Make sure paged read returns same results with different page sizes
++        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
++        {
++            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * 
FROM " + KEYSPACE + ".tbl",
++                                                                
ConsistencyLevel.ALL,
++                                                                pageSize),
++                       results);
++        }
++
++        assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE 
+ ".tbl"),
++                   results);
++    }
++
++    @Test
++    public void pagingTests() throws Throwable
++    {
++        try (ICluster singleNode = 
init(builder().withNodes(1).withSubnet(1).start()))
++        {
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
++            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk 
int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++            for (int i = 0; i < 10; i++)
++            {
++                for (int j = 0; j < 10; j++)
++                {
++                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE 
+ ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                                   ConsistencyLevel.QUORUM,
++                                                   i, j, i + i);
++                    singleNode.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                                      ConsistencyLevel.QUORUM,
++                                                      i, j, i + i);
++                }
++            }
++
++            int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50 };
++            String[] statements = new String[]{ "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck >= 5",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
++                                                "SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
++                                                "SELECT DISTINCT pk FROM " + 
KEYSPACE + ".tbl LIMIT 3",
++                                                "SELECT DISTINCT pk FROM " + 
KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
++                                                "SELECT DISTINCT pk FROM " + 
KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
++            };
++            for (String statement : statements)
++            {
++                for (int pageSize : pageSizes)
++                {
++                    assertRows(cluster.coordinator(1)
++                                      .executeWithPaging(statement,
++                                                         
ConsistencyLevel.QUORUM, pageSize),
++                               singleNode.coordinator(1)
++                                         .executeWithPaging(statement,
++                                                            
ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
++                }
++            }
++        }
++    }
++
++    @Test
++    public void metricsCountQueriesTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
++        for (int i = 0; i < 100; i++)
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
++
++        long readCount1 = readCount((IInvokableInstance) cluster.get(1));
++        long readCount2 = readCount((IInvokableInstance) cluster.get(2));
++        for (int i = 0; i < 100; i++)
++            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
++
++        readCount1 = readCount((IInvokableInstance) cluster.get(1)) - 
readCount1;
++        readCount2 = readCount((IInvokableInstance) cluster.get(2)) - 
readCount2;
++        assertEquals(readCount1, readCount2);
++        assertEquals(100, readCount1);
++    }
++
++    private long readCount(IInvokableInstance instance)
++    {
++        return instance.callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
++    }
++}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 0000000,a89a352..0e0561a
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@@ -1,0 -1,49 +1,47 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import org.junit.After;
+ import org.junit.BeforeClass;
+ 
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.shared.Builder;
+ import org.apache.cassandra.distributed.shared.DistributedTestBase;
+ 
+ public class TestBaseImpl extends DistributedTestBase
+ {
 -    protected static final TestBaseImpl impl = new TestBaseImpl();
 -
+     @After
+     public void afterEach() {
+         super.afterEach();
+     }
+ 
+     @BeforeClass
+     public static void beforeClass() throws Throwable {
+         ICluster.setup();
+     }
+ 
+     @Override
+     public <I extends IInstance, C extends ICluster> Builder<I, C> builder() {
+         // This is definitely not the smartest solution, but given the 
complexity of the alternatives and low risk, we can just rely on the
+         // fact that this code is going to work accross _all_ versions.
+         return (Builder<I, C>) 
org.apache.cassandra.distributed.Cluster.build();
+     }
+ }
diff --cc 
test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
index 5c45d52,0000000..f138861
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
@@@ -1,102 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.upgrade;
 +
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICoordinator;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
++import org.apache.cassandra.distributed.shared.DistributedTestBase;
++import org.apache.cassandra.distributed.shared.Versions;
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +
 +public class CompactStorage2to3UpgradeTest extends UpgradeTestBase
 +{
 +    @Test
 +    public void multiColumn() throws Throwable
 +    {
 +        new TestCase()
 +        .upgrade(Versions.Major.v22, Versions.Major.v30)
 +        .setup(cluster -> {
 +            assert cluster.size() == 3;
 +            int rf = cluster.size() - 1;
 +            assert rf == 2;
 +            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + 
"};");
 +            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 
text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
 +            ICoordinator coordinator = cluster.coordinator(1);
 +            // these shouldn't be replicated by the 3rd node
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 
3, '3')", ConsistencyLevel.ALL);
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 
9, '9')", ConsistencyLevel.ALL);
-             for (int i=0; i<cluster.size(); i++)
++            for (int i = 0; i < cluster.size(); i++)
 +            {
-                 int nodeNum = i+1;
++                int nodeNum = i + 1;
 +                System.out.println(String.format("****** node %s: %s", 
nodeNum, cluster.get(nodeNum).config()));
 +            }
- 
 +        })
 +        .runAfterNodeUpgrade(((cluster, node) -> {
 +            if (node != 2)
 +                return;
 +
 +            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM 
ks.tbl LIMIT 2", ConsistencyLevel.ALL);
 +            Object[][] expected = {
-                 DistributedTestBase.row(9, 9, "9"),
-                 DistributedTestBase.row(3, 3, "3")
++            row(9, 9, "9"),
++            row(3, 3, "3")
 +            };
-             DistributedTestBase.assertRows(rows, expected);
- 
++            assertRows(rows, expected);
 +        })).run();
 +    }
 +
 +    @Test
 +    public void singleColumn() throws Throwable
 +    {
 +        new TestCase()
 +        .upgrade(Versions.Major.v22, Versions.Major.v30)
 +        .setup(cluster -> {
 +            assert cluster.size() == 3;
 +            int rf = cluster.size() - 1;
 +            assert rf == 2;
 +            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + 
"};");
 +            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY 
KEY (pk)) WITH COMPACT STORAGE");
 +            ICoordinator coordinator = cluster.coordinator(1);
 +            // these shouldn't be replicated by the 3rd node
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", 
ConsistencyLevel.ALL);
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", 
ConsistencyLevel.ALL);
-             for (int i=0; i<cluster.size(); i++)
++            for (int i = 0; i < cluster.size(); i++)
 +            {
-                 int nodeNum = i+1;
++                int nodeNum = i + 1;
 +                System.out.println(String.format("****** node %s: %s", 
nodeNum, cluster.get(nodeNum).config()));
 +            }
- 
 +        })
 +        .runAfterNodeUpgrade(((cluster, node) -> {
 +
 +            if (node < 2)
 +                return;
 +
 +            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM 
ks.tbl LIMIT 2", ConsistencyLevel.ALL);
 +            Object[][] expected = {
-                 DistributedTestBase.row(9, 9),
-                 DistributedTestBase.row(3, 3)
++            row(9, 9),
++            row(3, 3)
 +            };
-             DistributedTestBase.assertRows(rows, expected);
- 
++            assertRows(rows, expected);
 +        })).run();
 +    }
- }
++}
diff --cc 
test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index 31f4b84,e69e38a..b98829d
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@@ -20,11 -20,10 +20,11 @@@ package org.apache.cassandra.distribute
  
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.shared.DistributedTestBase;
+ import org.apache.cassandra.distributed.shared.Versions;
  
- import static org.apache.cassandra.distributed.impl.Versions.find;
+ import static org.apache.cassandra.distributed.shared.Versions.find;
  
  public class MixedModeReadRepairTest extends UpgradeTestBase
  {
diff --cc 
test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 5a927fc,93ae78e..81e580d
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@@ -18,16 -18,13 +18,16 @@@
  
  package org.apache.cassandra.distributed.upgrade;
  
 +import java.util.Iterator;
 +
 +import com.google.common.collect.Iterators;
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.shared.Versions;
  
- import static junit.framework.Assert.assertEquals;
+ import junit.framework.Assert;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
  
  public class UpgradeTest extends UpgradeTestBase
  {
@@@ -36,55 -33,22 +36,54 @@@
      public void upgradeTest() throws Throwable
      {
          new TestCase()
-             .upgrade(Versions.Major.v22, Versions.Major.v30)
-             .setup((cluster) -> {
-                 cluster.schemaChange("CREATE TABLE " + 
DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
 -        .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
++        .upgrade(Versions.Major.v22, Versions.Major.v30)
+         .setup((cluster) -> {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
  
-                 cluster.get(1).executeInternal("INSERT INTO " + 
DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-                 cluster.get(2).executeInternal("INSERT INTO " + 
DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-                 cluster.get(3).executeInternal("INSERT INTO " + 
DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-             })
-             .runAfterClusterUpgrade((cluster) -> {
-                 
DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " 
+ DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
-                                                                               
ConsistencyLevel.ALL,
-                                                                               
1),
-                                                DistributedTestBase.row(1, 1, 
1),
-                                                DistributedTestBase.row(1, 2, 
2),
-                                                DistributedTestBase.row(1, 3, 
3));
-             }).run();
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 2, 2)");
+             cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 3, 3)");
+         })
+         .runAfterClusterUpgrade((cluster) -> {
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?",
 -                                                      ConsistencyLevel.ALL,
 -                                                      1),
 -                       row(1, 1, 1),
 -                       row(1, 2, 2),
 -                       row(1, 3, 3));
++                                                                          
ConsistencyLevel.ALL,
++                                                                          1),
++                                           row(1, 1, 1),
++                                           row(1, 2, 2),
++                                           row(1, 3, 3));
+         }).run();
      }
  
 +    @Test
 +    public void mixedModePagingTest() throws Throwable
 +    {
 +        new TestCase()
 +        .upgrade(Versions.Major.v22, Versions.Major.v30)
 +        .nodes(2)
 +        .nodesToUpgrade(2)
 +        .setup((cluster) -> {
-             cluster.schemaChange("ALTER KEYSPACE " + 
DistributedTestBase.KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 
'replication_factor': 1}");
-             cluster.schemaChange("CREATE TABLE " + 
DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, 
ck)) with compact storage");
++            cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) with compact storage");
 +            for (int i = 0; i < 100; i++)
 +                for (int j = 0; j < 200; j++)
-                     cluster.coordinator(2).execute("INSERT INTO " + 
DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", 
ConsistencyLevel.ALL, i, j);
-             cluster.forEach((i) -> i.flush(DistributedTestBase.KEYSPACE));
++                    cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE 
+ ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j);
++            cluster.forEach((i) -> i.flush(KEYSPACE));
 +            for (int i = 0; i < 100; i++)
 +                for (int j = 10; j < 30; j++)
-                     cluster.coordinator(2).execute("DELETE FROM " + 
DistributedTestBase.KEYSPACE + ".tbl where pk=? and ck=?", 
ConsistencyLevel.ALL, i, j);
-             cluster.forEach((i) -> i.flush(DistributedTestBase.KEYSPACE));
++                    cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE 
+ ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j);
++            cluster.forEach((i) -> i.flush(KEYSPACE));
 +        })
 +        .runAfterClusterUpgrade((cluster) -> {
 +            for (int i = 0; i < 100; i++)
 +            {
 +                for (int pageSize = 10; pageSize < 100; pageSize++)
 +                {
-                     Iterator<Object[]> res = 
cluster.coordinator(1).executeWithPaging("SELECT * FROM " + 
DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
++                    Iterator<Object[]> res = 
cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl 
WHERE pk = ?",
 +                                                                              
        ConsistencyLevel.ALL,
 +                                                                              
        pageSize, i);
-                     assertEquals(180, Iterators.size(res));
++                    Assert.assertEquals(180, Iterators.size(res));
 +                }
 +            }
 +        }).run();
 +    }
- 
- }
+ }
diff --cc test/unit/org/apache/cassandra/LogbackStatusListener.java
index d16058b,0000000..1f95bd4
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/LogbackStatusListener.java
+++ b/test/unit/org/apache/cassandra/LogbackStatusListener.java
@@@ -1,538 -1,0 +1,538 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.io.PrintStream;
 +import java.io.UnsupportedEncodingException;
 +import java.util.Locale;
 +
 +import org.slf4j.ILoggerFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import ch.qos.logback.classic.Level;
 +import ch.qos.logback.classic.LoggerContext;
 +import ch.qos.logback.classic.spi.LoggerContextListener;
 +import ch.qos.logback.core.status.Status;
 +import ch.qos.logback.core.status.StatusListener;
- import org.apache.cassandra.distributed.impl.InstanceClassLoader;
++import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 +
 +/*
 + * Listen for logback readiness and then redirect stdout/stderr to logback
 + */
 +public class LogbackStatusListener implements StatusListener, 
LoggerContextListener
 +{
 +
 +    public static final PrintStream originalOut = System.out;
 +    public static final PrintStream originalErr = System.err;
 +
 +    private volatile boolean hadPreInstallError = false;
 +    private volatile boolean haveInstalled = false;
 +    private volatile boolean haveRegisteredListener = false;
 +
 +    private PrintStream replacementOut;
 +    private PrintStream replacementErr;
 +
 +    @Override
 +    public void addStatusEvent(Status s)
 +    {
 +        if (!haveInstalled && (s.getLevel() != 0 || s.getEffectiveLevel() != 
0))
 +        {
 +            // if we encounter an error during setup, we're not sure what 
state we're in, so we just don't switch
 +            // we should log this fact, though, so that we know that we're 
not necessarily capturing stdout
 +            LoggerFactory.getLogger(LogbackStatusListener.class)
 +                         .warn("Encountered non-info status in logger setup; 
aborting stdout capture: '" + s.getMessage() + '\'');
 +            hadPreInstallError = true;
 +        }
 +
 +        if (hadPreInstallError)
 +            return;
 +
 +        if (s.getMessage().startsWith("Registering current configuration as 
safe fallback point"))
 +        {
 +            onStart(null);
 +        }
 +
 +        if (haveInstalled && !haveRegisteredListener)
 +        {
 +            // we register ourselves as a listener after the fact, because we 
enable ourselves before the LoggerFactory
 +            // is properly initialised, hence before it can accept any 
LoggerContextListener registrations
 +            tryRegisterListener();
 +        }
 +
 +        if (s.getMessage().equals("Logback context being closed via shutdown 
hook"))
 +        {
 +            onStop(null);
 +        }
 +    }
 +
 +    private static PrintStream wrapLogger(Logger logger, PrintStream 
original, String encodingProperty, boolean error) throws Exception
 +    {
 +        final String encoding = System.getProperty(encodingProperty);
 +        OutputStream os = new ToLoggerOutputStream(logger, encoding, error);
 +        return encoding != null ? new WrappedPrintStream(os, true, encoding, 
original)
 +                                : new WrappedPrintStream(os, true, original);
 +    }
 +
 +    private static class ToLoggerOutputStream extends ByteArrayOutputStream
 +    {
 +        final Logger logger;
 +        final String encoding;
 +        final boolean error;
 +
 +        private ToLoggerOutputStream(Logger logger, String encoding, boolean 
error)
 +        {
 +            this.logger = logger;
 +            this.encoding = encoding;
 +            this.error = error;
 +        }
 +
 +        @Override
 +        public void flush() throws IOException
 +        {
 +            try
 +            {
 +                //Filter out stupid PrintStream empty flushes
 +                if (size() == 0) return;
 +
 +                //Filter out newlines, log framework provides its own
 +                if (size() == 1)
 +                {
 +                    byte[] bytes = toByteArray();
 +                    if (bytes[0] == 0xA)
 +                        return;
 +                }
 +
 +                //Filter out Windows newline
 +                if (size() == 2)
 +                {
 +                    byte[] bytes = toByteArray();
 +                    if (bytes[0] == 0xD && bytes[1] == 0xA)
 +                        return;
 +                }
 +
 +                String statement;
 +                if (encoding != null)
 +                    statement = new String(toByteArray(), encoding);
 +                else
 +                    statement = new String(toByteArray());
 +
 +                if (error)
 +                    logger.error(statement);
 +                else
 +                    logger.info(statement);
 +            }
 +            finally
 +            {
 +                reset();
 +            }
 +        }
 +    };
 +
 +    private static class WrappedPrintStream extends PrintStream
 +    {
 +        private long asyncAppenderThreadId = Long.MIN_VALUE;
 +        private final PrintStream original;
 +
 +        public WrappedPrintStream(OutputStream out, boolean autoFlush, 
PrintStream original)
 +        {
 +            super(out, autoFlush);
 +            this.original = original;
 +        }
 +
 +        public WrappedPrintStream(OutputStream out, boolean autoFlush, String 
encoding, PrintStream original) throws UnsupportedEncodingException
 +        {
 +            super(out, autoFlush, encoding);
 +            this.original = original;
 +        }
 +
 +        /*
 +         * Long and the short of it is that we don't want to serve logback a 
fake System.out/err.
 +         * ConsoleAppender is replaced so it always goes to the real 
System.out/err, but logback itself
 +         * will at times try to log to System.out/err when it has issues.
 +         *
 +         * Now here is the problem. There is a deadlock if a thread logs to 
System.out, blocks on the async
 +         * appender queue, and the async appender thread tries to log to 
System.out directly as part of some
 +         * internal logback issue.
 +         *
 +         * So to prevent this we have to exhaustively check before locking in 
the PrintStream and forward
 +         * to real System.out/err if it is the async appender
 +         */
 +        private boolean isAsyncAppender()
 +        {
 +            //Set the thread id based on the name
 +            Thread currentThread = Thread.currentThread();
 +            long currentThreadId = currentThread.getId();
 +            if (asyncAppenderThreadId == Long.MIN_VALUE &&
 +                currentThread.getName().equals("AsyncAppender-Worker-ASYNC") 
&&
 +                
!InstanceClassLoader.wasLoadedByAnInstanceClassLoader(currentThread.getClass()))
 +            {
 +                asyncAppenderThreadId = currentThreadId;
 +            }
 +            if (currentThreadId == asyncAppenderThreadId)
 +                original.println("Was in async appender");
 +            return currentThreadId == asyncAppenderThreadId;
 +        }
 +
 +        @Override
 +        public void flush()
 +        {
 +            if (isAsyncAppender())
 +                original.flush();
 +            else
 +                super.flush();
 +        }
 +
 +        @Override
 +        public void close()
 +        {
 +            if (isAsyncAppender())
 +                original.close();
 +            else
 +                super.flush();
 +        }
 +
 +        @Override
 +        public void write(int b)
 +        {
 +            if (isAsyncAppender())
 +                original.write(b);
 +            else
 +                super.write(b);
 +        }
 +
 +        @Override
 +        public void write(byte[] buf, int off, int len)
 +        {
 +            if (isAsyncAppender())
 +                original.write(buf, off, len);
 +            else
 +                super.write(buf, off, len);
 +        }
 +
 +        @Override
 +        public void print(boolean b)
 +        {
 +            if (isAsyncAppender())
 +                original.print(b);
 +            else
 +                super.print(b);
 +        }
 +
 +        @Override
 +        public void print(char c)
 +        {
 +            if (isAsyncAppender())
 +                original.print(c);
 +            else
 +                super.print(c);
 +        }
 +
 +        @Override
 +        public void print(int i)
 +        {
 +            if (isAsyncAppender())
 +                original.print(i);
 +            else
 +                super.print(i);
 +        }
 +
 +        @Override
 +        public void print(long l)
 +        {
 +            if (isAsyncAppender())
 +                original.print(l);
 +            else
 +                super.print(l);
 +        }
 +
 +        @Override
 +        public void print(float f)
 +        {
 +            if (isAsyncAppender())
 +                original.print(f);
 +            else
 +                super.print(f);
 +        }
 +
 +        @Override
 +        public void print(double d)
 +        {
 +            if (isAsyncAppender())
 +                original.print(d);
 +            else
 +                super.print(d);
 +        }
 +
 +        @Override
 +        public void print(char[] s)
 +        {
 +            if(isAsyncAppender())
 +                original.println(s);
 +            else
 +                super.print(s);
 +        }
 +
 +        @Override
 +        public void print(String s)
 +        {
 +            if (isAsyncAppender())
 +                original.print(s);
 +            else
 +                super.print(s);
 +        }
 +
 +        @Override
 +        public void print(Object obj)
 +        {
 +            if (isAsyncAppender())
 +                original.print(obj);
 +            else
 +                super.print(obj);
 +        }
 +
 +        @Override
 +        public void println()
 +        {
 +            if (isAsyncAppender())
 +                original.println();
 +            else
 +                super.println();
 +        }
 +
 +        @Override
 +        public void println(boolean v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(char v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(int v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(long v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(float v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(double v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(char[] v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(String v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(Object v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public PrintStream printf(String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.printf(format, args);
 +            else
 +                return super.printf(format, args);
 +        }
 +
 +        @Override
 +        public PrintStream printf(Locale l, String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.printf(l, format, args);
 +            else
 +                return super.printf(l, format, args);
 +        }
 +
 +        @Override
 +        public PrintStream format(String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.format(format, args);
 +            else
 +                return super.format(format, args);
 +        }
 +
 +        @Override
 +        public PrintStream format(Locale l, String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.format(l, format, args);
 +            else
 +                return super.format(l, format, args);
 +        }
 +
 +        @Override
 +        public PrintStream append(CharSequence csq)
 +        {
 +            if (isAsyncAppender())
 +                return original.append(csq);
 +            else
 +                return super.append(csq);
 +        }
 +
 +        @Override
 +        public PrintStream append(CharSequence csq, int start, int end)
 +        {
 +            if (isAsyncAppender())
 +                return original.append(csq, start, end);
 +            else
 +                return super.append(csq, start, end);
 +        }
 +
 +        @Override
 +        public PrintStream append(char c)
 +        {
 +            if (isAsyncAppender())
 +                return original.append(c);
 +            else
 +                return super.append(c);
 +        }    }
 +
 +    public boolean isResetResistant()
 +    {
 +        return false;
 +    }
 +
 +    public synchronized void onStart(LoggerContext loggerContext)
 +    {
 +        if (!hadPreInstallError && !haveInstalled)
 +        {
 +            if 
(InstanceClassLoader.wasLoadedByAnInstanceClassLoader(getClass())
 +                || 
System.out.getClass().getName().contains("LogbackStatusListener"))
 +            {
 +                // don't operate if we're a dtest node, or if we're not the 
first to swap System.out for some other reason
 +                hadPreInstallError = true;
 +                return;
 +            }
 +            try
 +            {
 +                Logger stdoutLogger = LoggerFactory.getLogger("stdout");
 +                Logger stderrLogger = LoggerFactory.getLogger("stderr");
 +
 +                replacementOut = wrapLogger(stdoutLogger, originalOut, 
"sun.stdout.encoding", false);
 +                System.setOut(replacementOut);
 +                replacementErr = wrapLogger(stderrLogger, originalErr, 
"sun.stderr.encoding", true);
 +                System.setErr(replacementErr);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            haveInstalled = true;
 +        }
 +    }
 +
 +    public synchronized void onReset(LoggerContext loggerContext)
 +    {
 +        onStop(loggerContext);
 +    }
 +
 +    public synchronized void onStop(LoggerContext loggerContext)
 +    {
 +        if (haveInstalled)
 +        {
 +            if (replacementOut != null) replacementOut.flush();
 +            if (replacementErr != null) replacementErr.flush();
 +            System.setErr(originalErr);
 +            System.setOut(originalOut);
 +            hadPreInstallError = false;
 +            haveInstalled = false;
 +            haveRegisteredListener = false;
 +            if (haveRegisteredListener)
 +            {
 +                
((LoggerContext)LoggerFactory.getILoggerFactory()).removeListener(this);
 +            }
 +        }
 +    }
 +
 +    public void onLevelChange(ch.qos.logback.classic.Logger logger, Level 
level)
 +    {
 +    }
 +
 +    private synchronized void tryRegisterListener()
 +    {
 +        if (haveInstalled && !haveRegisteredListener)
 +        {
 +            ILoggerFactory factory = LoggerFactory.getILoggerFactory();
 +            if (factory instanceof LoggerContext)
 +            {
 +                ((LoggerContext) factory).addListener(this);
 +                haveRegisteredListener = true;
 +            }
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to