This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to annotated tag 0.0.1-1
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git

commit 10ff11fd8e34e66992d4fb08aa476fd1198d6260
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Mar 12 13:25:25 2020 +0100

    Bring in CASSANDRA-15564 changes
---
 pom.xml                                            |  13 +-
 .../apache/cassandra/distributed/api/ICluster.java |  69 ++++-
 .../cassandra/distributed/api/ICoordinator.java    |   6 +-
 .../cassandra/distributed/api/IInstance.java       |  16 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   3 +-
 .../apache/cassandra/distributed/api/IListen.java  |   2 +-
 .../apache/cassandra/distributed/api/IMessage.java |   5 +-
 .../cassandra/distributed/api/IMessageFilters.java |  52 +++-
 .../api/{IMessage.java => LongTokenRange.java}     |  28 +-
 .../cassandra/distributed/api/NodeToolResult.java  | 182 +++++++++++++
 .../cassandra/distributed/api/QueryResult.java     | 139 ++++++++++
 .../org/apache/cassandra/distributed/api/Row.java  | 110 ++++++++
 .../cassandra/distributed/shared/AssertUtils.java  | 130 +++++++++
 .../distributed/shared/DistributedTestBase.java    | 156 +----------
 .../distributed/shared/MessageFilters.java         |  73 ++++--
 .../distributed/shared/MessageFiltersTest.java     | 113 --------
 .../distributed/shared/NetworkTopology.java        |  67 +----
 .../cassandra/distributed/shared/Versions.java     |   7 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  10 +-
 .../test/DistributedReadWritePathTest.java         | 290 ---------------------
 .../distributed/test/LargeColumnTest.java          |  96 -------
 .../distributed/test/MessageFiltersTest.java       | 132 ++++++++++
 .../distributed/test/NativeProtocolTest.java       |   2 +
 .../distributed/test/SimpleReadWriteTest.java      |  68 +----
 24 files changed, 927 insertions(+), 842 deletions(-)

diff --git a/pom.xml b/pom.xml
index 4b92a62..1757a52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,10 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
-        <version>10</version>
+        <version>23</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>
@@ -28,14 +27,20 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.12</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.cassandra</groupId>
             <artifactId>in-jvm-dtest-cassandra-tryout</artifactId>
-            <version>0.0.7-3.11-SNAPSHOT</version>
+            <version>0.0.1-2.2-1</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java 
b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
index 6d86e99..dffd980 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -18,24 +18,81 @@
 
 package org.apache.cassandra.distributed.api;
 
-import org.apache.cassandra.distributed.shared.NetworkTopology;
-
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.stream.Stream;
 
-public interface ICluster<I extends IInstance> extends AutoCloseable
-{
+public interface ICluster<I extends IInstance> extends AutoCloseable {
     void startup();
+
     I bootstrap(IInstanceConfig config);
+
     I get(int i);
-    I get(NetworkTopology.AddressAndPort endpoint);
+
+    I get(InetSocketAddress endpoint);
+
     ICoordinator coordinator(int node);
+
     void schemaChange(String query);
+
     void schemaChange(String statement, int instance);
 
     int size();
 
     Stream<I> stream();
+
     Stream<I> stream(String dcName);
+
     Stream<I> stream(String dcName, String rackName);
+
     IMessageFilters filters();
-}
+
+    static void setup() throws Throwable {
+        setupLogging();
+        setSystemProperties();
+        nativeLibraryWorkaround();
+        processReaperWorkaround();
+    }
+
+    static void nativeLibraryWorkaround() {
+        // Disable the Netty tcnative library otherwise the 
io.netty.internal.tcnative.CertificateCallbackTask,
+        // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, 
SSLPrivateKeyMethodSignTask,
+        // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the 
InstanceClassLoader.
+        System.setProperty("cassandra.disable_tcactive_openssl", "true");
+        System.setProperty("io.netty.transport.noNative", "true");
+    }
+
+    static void processReaperWorkaround() throws Throwable {
+        // Make sure the 'process reaper' thread is initially created under 
the main classloader,
+        // otherwise it gets created with the contextClassLoader pointing to 
an InstanceClassLoader
+        // which prevents it from being garbage collected.
+        new ProcessBuilder().command("true").start().waitFor();
+    }
+
+    static void setSystemProperties() {
+        System.setProperty("cassandra.ring_delay_ms", Integer.toString(30 * 
1000));
+        System.setProperty("org.apache.cassandra.disable_mbean_registration", 
"true");
+    }
+
+    static void setupLogging() {
+        try {
+            File root = Files.createTempDirectory("in-jvm-dtest").toFile();
+            root.deleteOnExit();
+            String testConfPath = "test/conf/logback-dtest.xml";
+            Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+
+            if (!logConfPath.toFile().exists()) {
+                Files.copy(new File(testConfPath).toPath(),
+                           logConfPath);
+            }
+
+            System.setProperty("logback.configurationFile", "file://" + 
logConfPath);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java 
b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
index da17df6..3d07a3d 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -25,7 +25,11 @@ import java.util.concurrent.Future;
 // The cross-version API requires that a Coordinator can be constructed 
without any constructor arguments
 public interface ICoordinator
 {
-    Object[][] execute(String query, ConsistencyLevel consistencyLevel, 
Object... boundValues);
+    default Object[][] execute(String query, ConsistencyLevel 
consistencyLevel, Object... boundValues)
+    {
+        return executeWithResult(query, consistencyLevel, 
boundValues).toObjectArrays();
+    }
+    QueryResult executeWithResult(String query, ConsistencyLevel 
consistencyLevel, Object... boundValues);
     Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel 
consistencyLevel, int pageSize, Object... boundValues);
 
     Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, 
ConsistencyLevel consistencyLevel, Object... boundValues);
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java 
b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
index dec3549..0dd4865 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -18,8 +18,7 @@
 
 package org.apache.cassandra.distributed.api;
 
-import org.apache.cassandra.distributed.shared.NetworkTopology;
-
+import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
@@ -33,7 +32,7 @@ public interface IInstance extends IIsolatedExecutor
     public Object[][] executeInternal(String query, Object... args);
 
     IInstanceConfig config();
-    NetworkTopology.AddressAndPort broadcastAddress();
+    InetSocketAddress broadcastAddress();
     UUID schemaVersion();
 
     void startup();
@@ -43,7 +42,14 @@ public interface IInstance extends IIsolatedExecutor
 
     int liveMemberCount();
 
-    int nodetool(String... commandAndArgs);
+    NodeToolResult nodetoolResult(boolean withNotifications, String... 
commandAndArgs);
+    default NodeToolResult nodetoolResult(String... commandAndArgs)
+    {
+        return nodetoolResult(true, commandAndArgs);
+    }
+    default int nodetool(String... commandAndArgs) {
+        return nodetoolResult(commandAndArgs).getRc();
+    }
     void uncaughtException(Thread t, Throwable e);
 
     /**
@@ -59,7 +65,7 @@ public interface IInstance extends IIsolatedExecutor
     void receiveMessage(IMessage message);
 
     int getMessagingVersion();
-    void setMessagingVersion(NetworkTopology.AddressAndPort addressAndPort, 
int version);
+    void setMessagingVersion(InetSocketAddress addressAndPort, int version);
 
     void flush(String keyspace);
     void forceCompact(String keyspace, String table);
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java 
b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
index 05969f8..97c1534 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.api;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -34,7 +35,7 @@ public interface IInstanceConfig
 
     int num();
     UUID hostId();
-    NetworkTopology.AddressAndPort broadcastAddress();
+    InetSocketAddress broadcastAddress();
     NetworkTopology networkTopology();
 
     String localRack();
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java 
b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
index c2e8dd6..d21c594 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IListen.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.distributed.api;
 
 public interface IListen
 {
-    public interface Cancel { void cancel(); }
+    interface Cancel { void cancel(); }
 
     Cancel schema(Runnable onChange);
 
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java 
b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
index da536cc..f75861e 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
@@ -18,9 +18,8 @@
 
 package org.apache.cassandra.distributed.api;
 
-import org.apache.cassandra.distributed.shared.NetworkTopology;
-
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 
 /**
  * A cross-version interface for delivering internode messages via message 
sinks.
@@ -34,5 +33,5 @@ public interface IMessage extends Serializable
     // TODO: need to make this a long
     int id();
     int version();
-    NetworkTopology.AddressAndPort from();
+    InetSocketAddress from();
 }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java 
b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
index 01fe972..f2cd6ee 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.distributed.api;
 
+import java.util.function.Predicate;
+
 public interface IMessageFilters
 {
     public interface Filter
@@ -31,6 +33,21 @@ public interface IMessageFilters
         Builder from(int ... nums);
         Builder to(int ... nums);
 
+        Builder verbs(int... verbs);
+        Builder allVerbs();
+
+        Builder inbound(boolean inbound);
+
+        default Builder inbound()
+        {
+            return inbound(true);
+        }
+
+        default Builder outbound()
+        {
+            return inbound(false);
+        }
+
         /**
          * Every message for which matcher returns `true` will be _dropped_ 
(assuming all
          * other matchers in the chain will return `true` as well).
@@ -42,15 +59,42 @@ public interface IMessageFilters
     public interface Matcher
     {
         boolean matches(int from, int to, IMessage message);
+
+        static Matcher of(Predicate<IMessage> fn) {
+            return (from, to, m) -> fn.test(m);
+        }
     }
 
-    Builder verbs(int... verbs);
-    Builder allVerbs();
+    Builder inbound(boolean inbound);
+    default Builder inbound() {
+        return inbound(true);
+    }
+    default Builder outbound() {
+        return inbound(false);
+    }
+    default Builder verbs(int... verbs) {
+        return inbound().verbs(verbs);
+    }
+    default Builder allVerbs() {
+        return inbound().allVerbs();
+    }
     void reset();
 
     /**
-     * {@code true} value returned by the implementation implies that the 
message was
+     * Checks if the message should be delivered.  This is expected to run on 
"inbound", or on the reciever of
+     * the message (instance.config.num == to).
+     *
+     * @return {@code true} value returned by the implementation implies that 
the message was
+     * not matched by any filters and therefore should be delivered.
+     */
+    boolean permitInbound(int from, int to, IMessage msg);
+
+    /**
+     * Checks if the message should be delivered.  This is expected to run on 
"outbound", or on the sender of
+     * the message (instance.config.num == from).
+     *
+     * @return {@code true} value returned by the implementation implies that 
the message was
      * not matched by any filters and therefore should be delivered.
      */
-    boolean permit(int from, int to, IMessage msg);
+    boolean permitOutbound(int from, int to, IMessage msg);
 }
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java 
b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java
similarity index 67%
copy from src/main/java/org/apache/cassandra/distributed/api/IMessage.java
copy to src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java
index da536cc..06327e8 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java
@@ -18,21 +18,21 @@
 
 package org.apache.cassandra.distributed.api;
 
-import org.apache.cassandra.distributed.shared.NetworkTopology;
-
 import java.io.Serializable;
 
-/**
- * A cross-version interface for delivering internode messages via message 
sinks.
- *
- * Message implementations should be serializable so we could load into 
instances.
- */
-public interface IMessage extends Serializable
+public final class LongTokenRange implements Serializable
 {
-    int verb();
-    byte[] bytes();
-    // TODO: need to make this a long
-    int id();
-    int version();
-    NetworkTopology.AddressAndPort from();
+    public final long minExclusive;
+    public final long maxInclusive;
+
+    public LongTokenRange(long minExclusive, long maxInclusive)
+    {
+        this.minExclusive = minExclusive;
+        this.maxInclusive = maxInclusive;
+    }
+
+    public String toString()
+    {
+        return "(" + minExclusive + "," + maxInclusive + "]";
+    }
 }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java 
b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
new file mode 100644
index 0000000..773f617
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.AssertUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.management.Notification;
+
+public class NodeToolResult
+{
+    private final String[] commandAndArgs;
+    private final int rc;
+    private final List<Notification> notifications;
+    private final Throwable error;
+
+    public NodeToolResult(String[] commandAndArgs, int rc, List<Notification> 
notifications, Throwable error)
+    {
+        this.commandAndArgs = commandAndArgs;
+        this.rc = rc;
+        this.notifications = notifications;
+        this.error = error;
+    }
+
+    public String[] getCommandAndArgs()
+    {
+        return commandAndArgs;
+    }
+
+    public int getRc()
+    {
+        return rc;
+    }
+
+    public List<Notification> getNotifications()
+    {
+        return notifications;
+    }
+
+    public Throwable getError()
+    {
+        return error;
+    }
+
+    public Asserts asserts()
+    {
+        return new Asserts();
+    }
+
+    public final class Asserts {
+        public Asserts success() {
+            AssertUtils.assertEquals("nodetool command " + commandAndArgs[0] + 
" was not successful", 0, rc);
+            return this;
+        }
+
+        public Asserts failure() {
+            AssertUtils.assertNotEquals("nodetool command " + 
commandAndArgs[0] + " was successful but not expected to be", 0, rc);
+            return this;
+        }
+
+        public Asserts errorContains(String msg) {
+            AssertUtils.assertNotNull("No exception was found but expected 
one", error);
+            AssertUtils.assertTrue("Error message '" + error.getMessage() + "' 
does not contain '" + msg + "'", error.getMessage().contains(msg));
+            return this;
+        }
+
+        public Asserts notificationContains(String msg) {
+            AssertUtils.assertNotNull("notifications not defined", 
notifications);
+            AssertUtils.assertFalse("notifications not defined", 
notifications.isEmpty());
+            for (Notification n : notifications) {
+                if (n.getMessage().contains(msg)) {
+                    return this;
+                }
+            }
+            AssertUtils.fail("Unable to locate message " + msg + " in 
notifications: " + notifications);
+            return this; // unreachable
+        }
+
+        public Asserts notificationContains(ProgressEventType type, String 
msg) {
+            int userType = type.ordinal();
+            AssertUtils.assertNotNull("notifications not defined", 
notifications);
+            AssertUtils.assertFalse("notifications not defined", 
notifications.isEmpty());
+            for (Notification n : notifications) {
+                if (notificationType(n) == userType) {
+                    if (n.getMessage().contains(msg)) {
+                        return this;
+                    }
+                }
+            }
+            AssertUtils.fail("Unable to locate message '" + msg + "' in 
notifications: " + notifications);
+            return this; // unreachable
+        }
+    }
+
+    private static int notificationType(Notification n)
+    {
+        return ((Map<String, Integer>) n.getUserData()).get("type").intValue();
+    }
+
+    public String toString()
+    {
+        return "NodeToolResult{" +
+               "commandAndArgs=" + Arrays.toString(commandAndArgs) +
+               ", rc=" + rc +
+               ", notifications=[" + notifications.stream().map(n -> 
ProgressEventType.values()[notificationType(n)].name()).collect(Collectors.joining(",
 ")) + "]" +
+               ", error=" + error +
+               '}';
+    }
+
+    /**
+     * Progress event type.
+     *
+     * <p>
+     * Progress starts by emitting {@link #START}, followed by emitting zero 
or more {@link #PROGRESS} events,
+     * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link 
#SUCCESS}.
+     * Progress indicates its completion by emitting {@link #COMPLETE} at the 
end of process.
+     * </p>
+     * <p>
+     * {@link #NOTIFICATION} event type is used to just notify message without 
progress.
+     * </p>
+     */
+    public enum ProgressEventType
+    {
+        /**
+         * Fired first when progress starts.
+         * Happens only once.
+         */
+        START,
+
+        /**
+         * Fire when progress happens.
+         * This can be zero or more time after START.
+         */
+        PROGRESS,
+
+        /**
+         * When observing process completes with error, this is sent once 
before COMPLETE.
+         */
+        ERROR,
+
+        /**
+         * When observing process is aborted by user, this is sent once before 
COMPLETE.
+         */
+        ABORT,
+
+        /**
+         * When observing process completes successfully, this is sent once 
before COMPLETE.
+         */
+        SUCCESS,
+
+        /**
+         * Fire when progress complete.
+         * This is fired once, after ERROR/ABORT/SUCCESS is fired.
+         * After this, no more ProgressEvent should be fired for the same 
event.
+         */
+        COMPLETE,
+
+        /**
+         * Used when sending message without progress.
+         */
+        NOTIFICATION
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java 
b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
new file mode 100644
index 0000000..dcdfa14
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * A table of data representing a complete query result.
+ *
+ * A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in 
several key ways:
+ *
+ * <ul>
+ *     <li>represents a complete result rather than a cursor</li>
+ *     <li>returns a {@link Row} to access the current row of data</li>
+ *     <li>relies on object pooling; {@link #hasNext()} may return the same 
object just with different data, accessing a
+ *     {@link Row} from a previous {@link #hasNext()} call has undefined 
behavior.</li>
+ *     <li>includes {@link #filter(Predicate)}, this will do client side 
filtering since Apache Cassandra is more
+ *     restrictive on server side filtering</li>
+ * </ul>
+ *
+ * <h2>Unsafe patterns</h2>
+ *
+ * Below are a few unsafe patterns which may lead to unexpected results
+ *
+ * <code>{@code
+ * while (rs.hasNext()) {
+ *   list.add(rs.next());
+ * }
+ * }</code>
+ *
+ * <code>{@code
+ * rs.forEach(list::add)
+ * }</code>
+ *
+ * Both cases have the same issue; reference to a row from a previous call to 
{@link #hasNext()}.  Since the same {@link Row}
+ * object can be used accross different calls to {@link #hasNext()} this would 
mean any attempt to access after the fact
+ * points to newer data.  If this behavior is not desirable and access is 
needed between calls, then {@link Row#copy()}
+ * should be used; this will clone the {@link Row} and return a new object 
pointing to the same data.
+ */
+public class QueryResult implements Iterator<Row>
+{
+    public static final QueryResult EMPTY = new QueryResult(new String[0], 
null);
+
+    private final String[] names;
+    private final Object[][] results;
+    private final Predicate<Row> filter;
+    private final Row row;
+    private int offset = -1;
+
+    public QueryResult(String[] names, Object[][] results)
+    {
+        this.names = Objects.requireNonNull(names, "names");
+        this.results = results;
+        this.row = new Row(names);
+        this.filter = ignore -> true;
+    }
+
+    private QueryResult(String[] names, Object[][] results, Predicate<Row> 
filter, int offset)
+    {
+        this.names = names;
+        this.results = results;
+        this.filter = filter;
+        this.offset = offset;
+        this.row = new Row(names);
+    }
+
+    public String[] getNames()
+    {
+        return names;
+    }
+
+    public boolean isEmpty()
+    {
+        return results.length == 0;
+    }
+
+    public int size()
+    {
+        return results.length;
+    }
+
+    public QueryResult filter(Predicate<Row> fn)
+    {
+        return new QueryResult(names, results, filter.and(fn), offset);
+    }
+
+    /**
+     * Get all rows as a 2d array.  Any calls to {@link #filter(Predicate)} 
will be ignored and the array returned will
+     * be the full set from the query.
+     */
+    public Object[][] toObjectArrays()
+    {
+        return results;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (results == null)
+            return false;
+        while ((offset += 1) < results.length)
+        {
+            row.setResults(results[offset]);
+            if (filter.test(row))
+            {
+                return true;
+            }
+        }
+        row.setResults(null);
+        return false;
+    }
+
+    @Override
+    public Row next()
+    {
+        if (offset < 0 || offset >= results.length)
+            throw new NoSuchElementException();
+        return row;
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java 
b/src/main/java/org/apache/cassandra/distributed/api/Row.java
new file mode 100644
index 0000000..1dd05fe
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.*;
+
+/**
+ * Data representing a single row in a query result.
+ *
+ * This class is mutable from the parent {@link QueryResult} and can have the 
row it points to changed between calls
+ * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold 
reference to this class after that call;
+ * to get around this, a call to {@link #copy()} will return a new object 
pointing to the same row.
+ */
+public class Row
+{
+    private final Map<String, Integer> nameIndex;
+    private Object[] results; // mutable to avoid allocations in loops
+
+    public Row(String[] names)
+    {
+        Objects.requireNonNull(names, "names");
+        this.nameIndex = new HashMap<>(names.length);
+        for (int i = 0; i < names.length; i++) {
+            nameIndex.put(names[i], i);
+        }
+    }
+
+    private Row(Map<String, Integer> nameIndex)
+    {
+        this.nameIndex = nameIndex;
+    }
+
+    void setResults(Object[] results)
+    {
+        this.results = results;
+    }
+
+    /**
+     * Creates a copy of the current row; can be used past calls to {@link 
QueryResult#hasNext()}.
+     */
+    public Row copy() {
+        Row copy = new Row(nameIndex);
+        copy.setResults(results);
+        return copy;
+    }
+
+    public <T> T get(String name)
+    {
+        checkAccess();
+        int idx = findIndex(name);
+        if (idx == -1)
+            return null;
+        return (T) results[idx];
+    }
+
+    public String getString(String name)
+    {
+        return get(name);
+    }
+
+    public UUID getUUID(String name)
+    {
+        return get(name);
+    }
+
+    public Date getTimestamp(String name)
+    {
+        return get(name);
+    }
+
+    public <T> Set<T> getSet(String name)
+    {
+        return get(name);
+    }
+
+    public String toString()
+    {
+        return "Row{" +
+               "names=" + nameIndex.keySet() +
+               ", results=" + Arrays.toString(results) +
+               '}';
+    }
+
+    private void checkAccess()
+    {
+        if (results == null)
+            throw new NoSuchElementException();
+    }
+
+    private int findIndex(String name)
+    {
+        return nameIndex.getOrDefault(name, -1);
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java 
b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
new file mode 100644
index 0000000..f914e90
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
@@ -0,0 +1,130 @@
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+public class AssertUtils {
+
+    public static void assertRows(Object[][] actual, Object[]... expected)
+    {
+        assertEquals(rowsNotEqualErrorMessage(actual, expected),
+                     expected.length, actual.length);
+
+        for (int i = 0; i < expected.length; i++)
+        {
+            Object[] expectedRow = expected[i];
+            Object[] actualRow = actual[i];
+            assertTrue(rowsNotEqualErrorMessage(actual, expected),
+                       Arrays.equals(expectedRow, actualRow));
+        }
+    }
+
+    public static void assertRow(Object[] actual, Object... expected)
+    {
+        assertTrue(rowNotEqualErrorMessage(actual, expected),
+                   Arrays.equals(actual, expected));
+    }
+
+    public static void assertRows(Iterator<Object[]> actual, 
Iterator<Object[]> expected)
+    {
+        while (actual.hasNext() && expected.hasNext())
+            assertRow(actual.next(), expected.next());
+
+        assertTrue("Resultsets have different sizes", actual.hasNext() == 
expected.hasNext());
+    }
+
+    public static void assertRows(Iterator<Object[]> actual, Object[]... 
expected)
+    {
+        assertRows(actual, new Iterator<Object[]>() {
+
+            int i = 0;
+            @Override
+            public boolean hasNext() {
+                return i < expected.length;
+            }
+
+            @Override
+            public Object[] next() {
+                return expected[i++];
+            }
+        });
+    }
+
+    public static String rowNotEqualErrorMessage(Object[] actual, Object[] 
expected)
+    {
+        return String.format("Expected: %s\nActual:%s\n",
+                             Arrays.toString(expected),
+                             Arrays.toString(actual));
+    }
+
+    public static String rowsNotEqualErrorMessage(Object[][] actual, 
Object[][] expected)
+    {
+        return String.format("Expected: %s\nActual: %s\n",
+                             rowsToString(expected),
+                             rowsToString(actual));
+    }
+
+    public static String rowsToString(Object[][] rows)
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        boolean isFirst = true;
+        for (Object[] row : rows)
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                builder.append(",");
+            builder.append(Arrays.toString(row));
+        }
+        builder.append("]");
+        return builder.toString();
+    }
+
+    public static Object[][] toObjectArray(Iterator<Object[]> iter)
+    {
+        List<Object[]> res = new ArrayList<>();
+        while (iter.hasNext())
+            res.add(iter.next());
+
+        return res.toArray(new Object[res.size()][]);
+    }
+
+    public static Object[] row(Object... expected)
+    {
+        return expected;
+    }
+
+    public static void assertEquals(String message, long expected, long 
actual) {
+        if (expected != actual)
+            fail(message);
+    }
+
+    public static void assertNotEquals(String message, long expected, long 
actual) {
+        if (expected == actual)
+            fail(message);
+    }
+
+    public static void assertNotNull(String message, Object object) {
+        if (object == null)
+            fail(message);
+    }
+
+    public static void assertTrue(String message, boolean condition) {
+        if (!condition)
+            fail(message);
+    }
+
+    public static void assertFalse(String message, boolean condition) {
+        if (condition)
+            fail(message);
+    }
+
+
+    public static void fail(String message) {
+        throw new AssertionError(message);
+    }
+
+}
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
 
b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
index 89dc0cd..d28af2a 100644
--- 
a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
+++ 
b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
@@ -20,81 +20,22 @@ package org.apache.cassandra.distributed.shared;
 
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
 
 public abstract class DistributedTestBase
 {
-    @After
     public void afterEach()
     {
         System.runFinalization();
         System.gc();
     }
 
-    public abstract <I extends IInstance, C extends ICluster> Builder<I, C> 
builder();
-
-    public static String KEYSPACE = "distributed_test_keyspace";
-
-    // TODO: move this to Cluster.java?
-    public static void nativeLibraryWorkaround()
-    {
-        // Disable the Netty tcnative library otherwise the 
io.netty.internal.tcnative.CertificateCallbackTask,
-        // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, 
SSLPrivateKeyMethodSignTask,
-        // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the 
InstanceClassLoader.
-        System.setProperty("cassandra.disable_tcactive_openssl", "true");
-        System.setProperty("io.netty.transport.noNative", "true");
-    }
-
-    public static void processReaperWorkaround() throws Throwable {
-        // Make sure the 'process reaper' thread is initially created under 
the main classloader,
-        // otherwise it gets created with the contextClassLoader pointing to 
an InstanceClassLoader
-        // which prevents it from being garbage collected.
-        new ProcessBuilder().command("true").start().waitFor();
+    public static void beforeClass() throws Throwable {
+        ICluster.setup();
     }
 
-    public static void setupLogging()
-    {
-        try
-        {
-            File root = Files.createTempDirectory("in-jvm-dtest").toFile();
-            root.deleteOnExit();
-            String testConfPath = "test/conf/logback-dtest.xml";
-            Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
-
-            if (!logConfPath.toFile().exists())
-            {
-                Files.copy(new File(testConfPath).toPath(),
-                           logConfPath);
-            }
-
-            System.setProperty("logback.configurationFile", "file://" + 
logConfPath);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
+    public abstract <I extends IInstance, C extends ICluster> Builder<I, C> 
builder();
 
-    @BeforeClass
-    public static void setup() throws Throwable {
-        setupLogging();
-        System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 
1000));
-        System.setProperty("org.apache.cassandra.disable_mbean_registration", 
"true");
-        nativeLibraryWorkaround();
-        processReaperWorkaround();
-    }
+    public static String KEYSPACE = "distributed_test_keyspace";
 
     public static String withKeyspace(String replaceIn)
     {
@@ -112,94 +53,5 @@ public abstract class DistributedTestBase
         return cluster;
     }
 
-    public static void assertRows(Object[][] actual, Object[]... expected)
-    {
-        Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
-                            expected.length, actual.length);
-
-        for (int i = 0; i < expected.length; i++)
-        {
-            Object[] expectedRow = expected[i];
-            Object[] actualRow = actual[i];
-            Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
-                              Arrays.equals(expectedRow, actualRow));
-        }
-    }
-
-    public static void assertRow(Object[] actual, Object... expected)
-    {
-        Assert.assertTrue(rowNotEqualErrorMessage(actual, expected),
-                          Arrays.equals(actual, expected));
-    }
-
-    public static void assertRows(Iterator<Object[]> actual, 
Iterator<Object[]> expected)
-    {
-        while (actual.hasNext() && expected.hasNext())
-            assertRow(actual.next(), expected.next());
-
-        Assert.assertEquals("Resultsets have different sizes", 
actual.hasNext(), expected.hasNext());
-    }
-
-    public static void assertRows(Iterator<Object[]> actual, Object[]... 
expected)
-    {
-        assertRows(actual, new Iterator<Object[]>() {
-
-            int i = 0;
-            @Override
-            public boolean hasNext() {
-                return i < expected.length;
-            }
-
-            @Override
-            public Object[] next() {
-                return expected[i++];
-            }
-        });
-    }
-
-    public static String rowNotEqualErrorMessage(Object[] actual, Object[] 
expected)
-    {
-        return String.format("Expected: %s\nActual:%s\n",
-                             Arrays.toString(expected),
-                             Arrays.toString(actual));
-    }
-
-    public static String rowsNotEqualErrorMessage(Object[][] actual, 
Object[][] expected)
-    {
-        return String.format("Expected: %s\nActual: %s\n",
-                             rowsToString(expected),
-                             rowsToString(actual));
-    }
-
-    public static String rowsToString(Object[][] rows)
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append("[");
-        boolean isFirst = true;
-        for (Object[] row : rows)
-        {
-            if (isFirst)
-                isFirst = false;
-            else
-                builder.append(",");
-            builder.append(Arrays.toString(row));
-        }
-        builder.append("]");
-        return builder.toString();
-    }
-
-    public static Object[][] toObjectArray(Iterator<Object[]> iter)
-    {
-        List<Object[]> res = new ArrayList<>();
-        while (iter.hasNext())
-            res.add(iter.next());
-
-        return res.toArray(new Object[res.size()][]);
-    }
-
-    public static Object[] row(Object... expected)
-    {
-        return expected;
-    }
 
 }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java 
b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
index 57b8f57..c1731db 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.shared;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.cassandra.distributed.api.IMessage;
@@ -27,9 +28,20 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
 
 public class MessageFilters implements IMessageFilters
 {
-    private final List<Filter> filters = new CopyOnWriteArrayList<>();
+    private final List<Filter> inboundFilters = new CopyOnWriteArrayList<>();
+    private final List<Filter> outboundFilters = new CopyOnWriteArrayList<>();
 
-    public boolean permit(int from, int to, IMessage msg)
+    public boolean permitInbound(int from, int to, IMessage msg)
+    {
+        return permit(inboundFilters, from, to, msg);
+    }
+
+    public boolean permitOutbound(int from, int to, IMessage msg)
+    {
+        return permit(outboundFilters, from, to, msg);
+    }
+
+    private static boolean permit(List<Filter> filters, int from, int to, 
IMessage msg)
     {
         for (Filter filter : filters)
         {
@@ -39,14 +51,15 @@ public class MessageFilters implements IMessageFilters
         return true;
     }
 
-    public class Filter implements IMessageFilters.Filter
+    public static class Filter implements IMessageFilters.Filter
     {
         final int[] from;
         final int[] to;
         final int[] verbs;
         final Matcher matcher;
+        final List<Filter> parent;
 
-        Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
+        Filter(int[] from, int[] to, int[] verbs, Matcher matcher, 
List<Filter> parent)
         {
             if (from != null)
             {
@@ -67,13 +80,15 @@ public class MessageFilters implements IMessageFilters
             this.to = to;
             this.verbs = verbs;
             this.matcher = matcher;
+            this.parent = Objects.requireNonNull(parent, "parent");
         }
 
         public int hashCode()
         {
             return (from == null ? 0 : Arrays.hashCode(from))
                    + (to == null ? 0 : Arrays.hashCode(to))
-                   + (verbs == null ? 0 : Arrays.hashCode(verbs));
+                    + (verbs == null ? 0 : Arrays.hashCode(verbs)
+                    + parent.hashCode());
         }
 
         public boolean equals(Object that)
@@ -85,18 +100,19 @@ public class MessageFilters implements IMessageFilters
         {
             return Arrays.equals(from, that.from)
                    && Arrays.equals(to, that.to)
-                   && Arrays.equals(verbs, that.verbs);
+                    && Arrays.equals(verbs, that.verbs)
+                    && parent.equals(that.parent);
         }
 
         public Filter off()
         {
-            filters.remove(this);
+            parent.remove(this);
             return this;
         }
 
         public Filter on()
         {
-            filters.add(this);
+            parent.add(this);
             return this;
         }
 
@@ -115,10 +131,11 @@ public class MessageFilters implements IMessageFilters
         int[] to;
         int[] verbs;
         Matcher matcher;
+        boolean inbound;
 
-        private Builder(int[] verbs)
+        private Builder(boolean inbound)
         {
-            this.verbs = verbs;
+            this.inbound = inbound;
         }
 
         public Builder from(int... nums)
@@ -133,33 +150,45 @@ public class MessageFilters implements IMessageFilters
             return this;
         }
 
-        public IMessageFilters.Builder messagesMatching(Matcher matcher)
+        public IMessageFilters.Builder verbs(int... verbs)
         {
-            this.matcher = matcher;
+            this.verbs = verbs;
             return this;
         }
 
-        public Filter drop()
+        public IMessageFilters.Builder allVerbs()
         {
-            return new Filter(from, to, verbs, matcher).on();
+            this.verbs = null;
+            return this;
         }
-    }
 
+        public IMessageFilters.Builder inbound(boolean inbound)
+        {
+            this.inbound = inbound;
+            return this;
+        }
 
-    public Builder verbs(int... verbs)
-    {
-        return new Builder(verbs);
+        public IMessageFilters.Builder messagesMatching(Matcher matcher)
+        {
+            this.matcher = matcher;
+            return this;
+        }
+
+        public IMessageFilters.Filter drop()
+        {
+            return new Filter(from, to, verbs, matcher, inbound ? 
inboundFilters : outboundFilters).on();
+        }
     }
 
-    @Override
-    public Builder allVerbs()
+    public IMessageFilters.Builder inbound(boolean inbound)
     {
-        return new Builder(null);
+        return new Builder(inbound);
     }
 
     @Override
     public void reset()
     {
-        filters.clear();
+        inboundFilters.clear();
+        outboundFilters.clear();
     }
 }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java 
b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java
deleted file mode 100644
index 6eedb16..0000000
--- 
a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.shared;
-
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.api.IMessage;
-
-public class MessageFiltersTest
-{
-    @Test
-    public void simpleFiltersTest() throws Throwable
-    {
-        int VERB1 = 1;
-        int VERB2 = 2;
-        int VERB3 = 3;
-        int i1 = 1;
-        int i2 = 2;
-        int i3 = 3;
-        String MSG1 = "msg1";
-        String MSG2 = "msg2";
-
-        MessageFilters filters = new MessageFilters();
-        MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
-
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        filter.off();
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        filters.reset();
-
-        filters.verbs(VERB1).from(1).to(2).drop();
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
-
-        filters.reset();
-        AtomicInteger counter = new AtomicInteger();
-        filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> 
{
-            counter.incrementAndGet();
-            return Arrays.equals(msg.bytes(), MSG1.getBytes());
-        }).drop();
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertEquals(counter.get(), 1);
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
-        Assert.assertEquals(counter.get(), 2);
-
-        // filter chain gets interrupted because a higher level filter returns 
no match
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertEquals(counter.get(), 2);
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
-        Assert.assertEquals(counter.get(), 2);
-        filters.reset();
-
-        filters.allVerbs().from(3, 2).to(2, 1).drop();
-        Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
-        filters.reset();
-
-        counter.set(0);
-        filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
-            counter.incrementAndGet();
-            return false;
-        }).drop();
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertEquals(2, counter.get());
-    }
-
-    IMessage msg(int verb, String msg)
-    {
-        return new IMessage()
-        {
-            public int verb() { return verb; }
-            public byte[] bytes() { return msg.getBytes(); }
-            public int id() { return 0; }
-            public int version() { return 0;  }
-            public NetworkTopology.AddressAndPort from() { return null; }
-            public int fromPort()
-            {
-                return 0;
-            }
-        };
-    }
-}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java 
b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
index 9a8e8f6..7bd91d3 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
@@ -18,8 +18,8 @@
 
 package org.apache.cassandra.distributed.shared;
 
-import java.io.Serializable;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,7 +30,7 @@ import java.util.stream.IntStream;
 
 public class NetworkTopology
 {
-    private final Map<AddressAndPort, DcAndRack> map;
+    private final Map<InetSocketAddress, DcAndRack> map;
 
     public static class DcAndRack
     {
@@ -69,68 +69,21 @@ public class NetworkTopology
         }
     }
 
-    public static class AddressAndPort implements Serializable
-    {
-        private final InetAddress address;
-        private final int port;
-
-        public AddressAndPort(InetAddress address, int port)
-        {
-            this.address = address;
-            this.port = port;
-        }
-
-        public int getPort()
-        {
-            return port;
-        }
-
-        public InetAddress getAddress()
-        {
-            return address;
-        }
-
-        @Override
-        public String toString()
-        {
-            return "AddressAndPort{" +
-                   "address=" + address +
-                   ", port=" + port +
-                   '}';
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            AddressAndPort that = (AddressAndPort) o;
-            return port == that.port &&
-                   Objects.equals(address, that.address);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hash(address, port);
-        }
-    }
-
     public static DcAndRack dcAndRack(String dc, String rack)
     {
         return new DcAndRack(dc, rack);
     }
 
-    public static AddressAndPort addressAndPort(InetAddress address, int port)
+    public static InetSocketAddress addressAndPort(InetAddress address, int 
port)
     {
-        return new AddressAndPort(address, port);
+        return new InetSocketAddress(address, port);
     }
 
-    public static AddressAndPort addressAndPort(String address, int port)
+    public static InetSocketAddress addressAndPort(String address, int port)
     {
         try
         {
-            return new AddressAndPort(InetAddress.getByName(address), port);
+            return new InetSocketAddress(InetAddress.getByName(address), port);
         }
         catch (UnknownHostException e)
         {
@@ -166,12 +119,12 @@ public class NetworkTopology
         return topology;
     }
 
-    public DcAndRack put(AddressAndPort addressAndPort, DcAndRack value)
+    public DcAndRack put(InetSocketAddress addressAndPort, DcAndRack value)
     {
         return map.put(addressAndPort, value);
     }
 
-    public String localRack(NetworkTopology.AddressAndPort key)
+    public String localRack(InetSocketAddress key)
     {
         DcAndRack p = map.get(key);
         if (p == null)
@@ -179,7 +132,7 @@ public class NetworkTopology
         return p.rack;
     }
 
-    public String localDC(NetworkTopology.AddressAndPort key)
+    public String localDC(InetSocketAddress key)
     {
         DcAndRack p = map.get(key);
         if (p == null)
@@ -187,7 +140,7 @@ public class NetworkTopology
         return p.dc;
     }
 
-    public boolean contains(NetworkTopology.AddressAndPort key)
+    public boolean contains(InetSocketAddress key)
     {
         return map.containsKey(key);
     }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java 
b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
index 30c0c7c..51c9e49 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.distributed.shared;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -29,6 +32,8 @@ import java.util.stream.Collectors;
 
 public class Versions
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(Versions.class);
+
     public static final String PROPERTY_PREFIX = "cassandra.";
 
     public static URL[] getClassPath()
@@ -159,7 +164,7 @@ public class Versions
     {
         final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + 
"test.dtest_jar_path","build");
         final File sourceDirectory = new File(dtestJarDirectory);
-        System.out.println("Looking for dtest jars in " + 
sourceDirectory.getAbsolutePath());
+        logger.info("Looking for dtest jars in " + 
sourceDirectory.getAbsolutePath());
         final Pattern pattern = 
Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
         final Map<Major, List<Version>> versions = new HashMap<>();
         for (Major major : Major.values())
diff --git 
a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java 
b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
index e61faa6..79f5191 100644
--- a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
+++ b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -36,11 +36,9 @@ import 
org.apache.cassandra.distributed.shared.NetworkTopology;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-import static 
org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE;
 
 public class BootstrapTest extends TestBaseImpl
 {
-
     @Test
     public void bootstrapTest() throws Throwable
     {
@@ -62,11 +60,6 @@ public class BootstrapTest extends TestBaseImpl
             config.set("auto_bootstrap", true);
 
             cluster.bootstrap(config).startup();
-
-            cluster.stream().forEach(instance -> {
-                instance.nodetool("cleanup", KEYSPACE, "tbl");
-            });
-
             withBootstrap = count(cluster);
         }
 
@@ -80,7 +73,8 @@ public class BootstrapTest extends TestBaseImpl
             naturally = count(cluster);
         }
 
-        Assert.assertEquals(withBootstrap, naturally);
+        for (Map.Entry<Integer, Long> e : withBootstrap.entrySet())
+            Assert.assertTrue(e.getValue() >= naturally.get(e.getKey()));
     }
 
     public void populate(ICluster cluster)
diff --git 
a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
 
b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
deleted file mode 100644
index 0ce0e74..0000000
--- 
a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.ICluster;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-public class DistributedReadWritePathTest extends TestBaseImpl
-{
-    @Test
-    public void coordinatorReadTest() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 2, 2)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 3, 3)");
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?",
-                                                      ConsistencyLevel.ALL,
-                                                      1),
-                       row(1, 1, 1),
-                       row(1, 2, 2),
-                       row(1, 3, 3));
-        }
-    }
-
-    @Test
-    public void largeMessageTest() throws Throwable
-    {
-        int largeMessageThreshold = 1024 * 64;
-        try (ICluster cluster = init(builder().withNodes(2).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v text, PRIMARY KEY (pk, ck))");
-            StringBuilder builder = new StringBuilder();
-            for (int i = 0; i < largeMessageThreshold ; i++)
-                builder.append('a');
-            String s = builder.toString();
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, ?)",
-                                           ConsistencyLevel.ALL,
-                                           s);
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?",
-                                                      ConsistencyLevel.ALL,
-                                                      1),
-                       row(1, 1, s));
-        }
-    }
-
-    @Test
-    public void coordinatorWriteTest() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
-
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)",
-                                           ConsistencyLevel.QUORUM);
-
-            for (int i = 0; i < 3; i++)
-            {
-                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
-                           row(1, 1, 1));
-            }
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
-                                                      ConsistencyLevel.QUORUM),
-                       row(1, 1, 1));
-        }
-    }
-
-    @Test
-    public void readRepairTest() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
-
-            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"));
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
-                                                      ConsistencyLevel.ALL), 
// ensure node3 in preflist
-                       row(1, 1, 1));
-
-            // Verify that data got repaired to the third node
-            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
-                       row(1, 1, 1));
-        }
-    }
-
-    @Test
-    public void writeWithSchemaDisagreement() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).withConfig(config 
-> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int", 1);
-
-            Exception thrown = null;
-            try
-            {
-                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                               ConsistencyLevel.QUORUM);
-            }
-            catch (RuntimeException e)
-            {
-                thrown = e;
-            }
-
-            
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 
127.0.0.2"));
-            
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 
127.0.0.3"));
-        }
-    }
-
-    @Test
-    public void readWithSchemaDisagreement() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).withConfig(config 
-> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int", 1);
-
-            Exception thrown = null;
-            try
-            {
-                assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
-                                                          
ConsistencyLevel.ALL),
-                           row(1, 1, 1, null));
-            }
-            catch (Exception e)
-            {
-                thrown = e;
-            }
-
-            
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 
127.0.0.2"));
-            
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 
127.0.0.3"));
-        }
-    }
-
-    @Test
-    public void simplePagedReadsTest() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
-
-            int size = 100;
-            Object[][] results = new Object[size][];
-            for (int i = 0; i < size; i++)
-            {
-                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                               ConsistencyLevel.QUORUM,
-                                               i, i);
-                results[i] = new Object[] { 1, i, i};
-            }
-
-            // Make sure paged read returns same results with different page 
sizes
-            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
-            {
-                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * 
FROM " + KEYSPACE + ".tbl",
-                                                                    
ConsistencyLevel.QUORUM,
-                                                                    pageSize),
-                           results);
-            }
-        }
-    }
-
-    @Test
-    public void pagingWithRepairTest() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
-
-            int size = 100;
-            Object[][] results = new Object[size][];
-            for (int i = 0; i < size; i++)
-            {
-                // Make sure that data lands on different nodes and not 
coordinator
-                cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " 
+ KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                                i, i);
-
-                results[i] = new Object[] { 1, i, i};
-            }
-
-            // Make sure paged read returns same results with different page 
sizes
-            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
-            {
-                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * 
FROM " + KEYSPACE + ".tbl",
-                                                                    
ConsistencyLevel.ALL,
-                                                                    pageSize),
-                           results);
-            }
-
-            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl"),
-                       results);
-        }
-    }
-
-    @Test
-    public void pagingTests() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).start());
-             ICluster singleNode = 
init(builder().withNodes(1).withSubnet(1).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
-            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk 
int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            for (int i = 0; i < 10; i++)
-            {
-                for (int j = 0; j < 10; j++)
-                {
-                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                   ConsistencyLevel.QUORUM,
-                                                   i, j, i + i);
-                    singleNode.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                      ConsistencyLevel.QUORUM,
-                                                      i, j, i + i);
-                }
-            }
-
-            int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
-            String[] statements = new String [] {"SELECT * FROM " + KEYSPACE  
+ ".tbl WHERE pk = 1 AND ck > 5",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck 
>= 5",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 AND ck <= 10",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 LIMIT 3",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck 
>= 5 LIMIT 2",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 AND ck <= 10 LIMIT 2",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 ORDER BY ck DESC",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck 
>= 5 ORDER BY ck DESC",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 AND ck <= 10 ORDER BY ck DESC",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 ORDER BY ck DESC LIMIT 3",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck 
>= 5 ORDER BY ck DESC LIMIT 2",
-                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 
5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
-                    "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl LIMIT 3",
-                    "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN 
(3,5,8,10)",
-                    "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN 
(3,5,8,10) LIMIT 2"
-            };
-            for (String statement : statements)
-            {
-                for (int pageSize : pageSizes)
-                {
-                    assertRows(cluster.coordinator(1)
-                                       .executeWithPaging(statement,
-                                                          
ConsistencyLevel.QUORUM,  pageSize),
-                               singleNode.coordinator(1)
-                                       .executeWithPaging(statement,
-                                                          
ConsistencyLevel.QUORUM,  Integer.MAX_VALUE));
-                }
-            }
-        }
-    }
-}
-
diff --git 
a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java 
b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java
deleted file mode 100644
index 3c3ee47..0000000
--- a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.ICluster;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-public class LargeColumnTest extends TestBaseImpl
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(LargeColumnTest.class);
-
-    private static String str(int length, Random random, long seed)
-    {
-        random.setSeed(seed);
-        char[] chars = new char[length];
-        int i = 0;
-        int s = 0;
-        long v = 0;
-        while (i < length)
-        {
-            if (s == 0)
-            {
-                v = random.nextLong();
-                s = 8;
-            }
-            chars[i] = (char) (((v & 127) + 32) & 127);
-            v >>= 8;
-            --s;
-            ++i;
-        }
-        return new String(chars);
-    }
-
-    private void testLargeColumns(int nodes, int columnSize, int rowCount) 
throws Throwable
-    {
-        Random random = new Random();
-        long seed = ThreadLocalRandom.current().nextLong();
-        logger.info("Using seed {}", seed);
-
-        try (ICluster cluster = init(builder()
-                                     .withNodes(nodes)
-                                     .withConfig(config ->
-                                                 
config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20)
-                                                       
.set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", 
columnSize * 2)
-                                                       
.set("internode_application_send_queue_reserve_global_capacity_in_bytes", 
columnSize * 3)
-                                                       
.set("write_request_timeout_in_ms", SECONDS.toMillis(30L))
-                                                       
.set("read_request_timeout_in_ms", SECONDS.toMillis(30L))
-                                                       
.set("memtable_heap_space_in_mb", 1024)
-                                     )
-                                     .start()))
-        {
-            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k int, c 
text, PRIMARY KEY (k))", KEYSPACE));
-
-            for (int i = 0; i < rowCount; ++i)
-                cluster.coordinator(1).execute(String.format("INSERT INTO 
%s.cf (k, c) VALUES (?, ?);", KEYSPACE), ConsistencyLevel.ALL, i, 
str(columnSize, random, seed | i));
-
-            for (int i = 0; i < rowCount; ++i)
-            {
-                Object[][] results = 
cluster.coordinator(1).execute(String.format("SELECT k, c FROM %s.cf WHERE k = 
?;", KEYSPACE), ConsistencyLevel.ALL, i);
-                Assert.assertTrue(str(columnSize, random, seed | 
i).equals(results[0][1]));
-            }
-        }
-    }
-
-    @Test
-    public void test() throws Throwable
-    {
-        testLargeColumns(2, 16 << 20, 5);
-    }
-}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/cassandra/distributed/test/MessageFiltersTest.java 
b/src/test/java/org/apache/cassandra/distributed/test/MessageFiltersTest.java
new file mode 100644
index 0000000..2241ed4
--- /dev/null
+++ 
b/src/test/java/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.MessageFilters;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MessageFiltersTest
+{
+    @Test
+    public void simpleInboundFiltersTest()
+    {
+        simpleFiltersTest(true);
+    }
+
+    @Test
+    public void simpleOutboundFiltersTest()
+    {
+        simpleFiltersTest(false);
+    }
+
+    private interface Permit
+    {
+        boolean test(int from, int to, IMessage msg);
+    }
+
+    private void simpleFiltersTest(boolean inbound)
+    {
+        int VERB1 = 1;
+        int VERB2 = 2;
+        int VERB3 = 3;
+        int i1 = 1;
+        int i2 = 2;
+        int i3 = 3;
+        String MSG1 = "msg1";
+        String MSG2 = "msg2";
+
+        MessageFilters filters = new MessageFilters();
+        Permit permit = inbound ? filters::permitInbound : 
filters::permitOutbound;
+
+        IMessageFilters.Filter filter = 
filters.allVerbs().inbound(inbound).from(1).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+        filter.off();
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        filters.reset();
+
+        filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
+
+        filters.reset();
+        AtomicInteger counter = new AtomicInteger();
+        
filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, 
msg) -> {
+            counter.incrementAndGet();
+            return Arrays.equals(msg.bytes(), MSG1.getBytes());
+        }).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertEquals(counter.get(), 1);
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2)));
+        Assert.assertEquals(counter.get(), 2);
+
+        // filter chain gets interrupted because a higher level filter returns 
no match
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertEquals(counter.get(), 2);
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1)));
+        Assert.assertEquals(counter.get(), 2);
+        filters.reset();
+
+        filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop();
+        Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
+        filters.reset();
+
+        counter.set(0);
+        
filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, 
msg) -> {
+            counter.incrementAndGet();
+            return false;
+        }).drop();
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertEquals(2, counter.get());
+    }
+
+    IMessage msg(int verb, String msg)
+    {
+        return new IMessage()
+        {
+            public int verb() { return verb; }
+            public byte[] bytes() { return msg.getBytes(); }
+            public int id() { return 0; }
+            public int version() { return 0;  }
+            public InetSocketAddress from() { return null; }
+            public int fromPort()
+            {
+                return 0;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java 
b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index c7e9b26..2bbe2a9 100644
--- 
a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ 
b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -33,6 +33,8 @@ import static 
org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
 public class NativeProtocolTest extends TestBaseImpl
 {
 
diff --git 
a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java 
b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 70790bc..4c4ef0f 100644
--- 
a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ 
b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICluster;
 
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
 public class SimpleReadWriteTest extends TestBaseImpl
 {
@@ -33,7 +34,7 @@ public class SimpleReadWriteTest extends TestBaseImpl
     {
         try (ICluster cluster = init(builder().withNodes(3).start()))
         {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
 
             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 2, 2)");
@@ -53,7 +54,7 @@ public class SimpleReadWriteTest extends TestBaseImpl
     {
         try (ICluster cluster = init(builder().withNodes(3).start()))
         {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
 
             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)",
                                           ConsistencyLevel.QUORUM);
@@ -75,7 +76,7 @@ public class SimpleReadWriteTest extends TestBaseImpl
     {
         try (ICluster cluster = init(builder().withNodes(3).start()))
         {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
 
             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
@@ -93,67 +94,6 @@ public class SimpleReadWriteTest extends TestBaseImpl
     }
 
     @Test
-    public void writeWithSchemaDisagreement() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).withConfig(config 
-> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int", 1);
-
-            Exception thrown = null;
-            try
-            {
-                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                              ConsistencyLevel.QUORUM);
-            }
-            catch (RuntimeException e)
-            {
-                thrown = e;
-            }
-
-            
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 
127.0.0.2"));
-            
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 
127.0.0.3"));
-        }
-    }
-
-    @Test
-    public void readWithSchemaDisagreement() throws Throwable
-    {
-        try (ICluster cluster = init(builder().withNodes(3).withConfig(config 
-> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
-
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int", 1);
-
-            Exception thrown = null;
-            try
-            {
-                assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
-                                                         ConsistencyLevel.ALL),
-                           row(1, 1, 1, null));
-            }
-            catch (Exception e)
-            {
-                thrown = e;
-            }
-
-            Assert.assertTrue(thrown.getMessage(), 
thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
-            Assert.assertTrue(thrown.getMessage(), 
thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
-        }
-    }
-
-    @Test
     public void simplePagedReadsTest() throws Throwable
     {
         try (ICluster cluster = init(builder().withNodes(3).start()))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to