This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to annotated tag 0.0.1-1 in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
commit 10ff11fd8e34e66992d4fb08aa476fd1198d6260 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Mar 12 13:25:25 2020 +0100 Bring in CASSANDRA-15564 changes --- pom.xml | 13 +- .../apache/cassandra/distributed/api/ICluster.java | 69 ++++- .../cassandra/distributed/api/ICoordinator.java | 6 +- .../cassandra/distributed/api/IInstance.java | 16 +- .../cassandra/distributed/api/IInstanceConfig.java | 3 +- .../apache/cassandra/distributed/api/IListen.java | 2 +- .../apache/cassandra/distributed/api/IMessage.java | 5 +- .../cassandra/distributed/api/IMessageFilters.java | 52 +++- .../api/{IMessage.java => LongTokenRange.java} | 28 +- .../cassandra/distributed/api/NodeToolResult.java | 182 +++++++++++++ .../cassandra/distributed/api/QueryResult.java | 139 ++++++++++ .../org/apache/cassandra/distributed/api/Row.java | 110 ++++++++ .../cassandra/distributed/shared/AssertUtils.java | 130 +++++++++ .../distributed/shared/DistributedTestBase.java | 156 +---------- .../distributed/shared/MessageFilters.java | 73 ++++-- .../distributed/shared/MessageFiltersTest.java | 113 -------- .../distributed/shared/NetworkTopology.java | 67 +---- .../cassandra/distributed/shared/Versions.java | 7 +- .../cassandra/distributed/test/BootstrapTest.java | 10 +- .../test/DistributedReadWritePathTest.java | 290 --------------------- .../distributed/test/LargeColumnTest.java | 96 ------- .../distributed/test/MessageFiltersTest.java | 132 ++++++++++ .../distributed/test/NativeProtocolTest.java | 2 + .../distributed/test/SimpleReadWriteTest.java | 68 +---- 24 files changed, 927 insertions(+), 842 deletions(-) diff --git a/pom.xml b/pom.xml index 4b92a62..1757a52 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,9 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache</groupId> <artifactId>apache</artifactId> - <version>10</version> + <version>23</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -28,14 +27,20 @@ <dependencies> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.cassandra</groupId> <artifactId>in-jvm-dtest-cassandra-tryout</artifactId> - <version>0.0.7-3.11-SNAPSHOT</version> + <version>0.0.1-2.2-1</version> <scope>test</scope> </dependency> </dependencies> diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java index 6d86e99..dffd980 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java +++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java @@ -18,24 +18,81 @@ package org.apache.cassandra.distributed.api; -import org.apache.cassandra.distributed.shared.NetworkTopology; - +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.stream.Stream; -public interface ICluster<I extends IInstance> extends AutoCloseable -{ +public interface ICluster<I extends IInstance> extends AutoCloseable { void startup(); + I bootstrap(IInstanceConfig config); + I get(int i); - I get(NetworkTopology.AddressAndPort endpoint); + + I get(InetSocketAddress endpoint); + ICoordinator coordinator(int node); + void schemaChange(String query); + void schemaChange(String statement, int instance); int size(); Stream<I> stream(); + Stream<I> stream(String dcName); + Stream<I> stream(String dcName, String rackName); + IMessageFilters filters(); -} + + static void setup() throws Throwable { + setupLogging(); + setSystemProperties(); + nativeLibraryWorkaround(); + processReaperWorkaround(); + } + + static void nativeLibraryWorkaround() { + // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask, + // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask, + // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader. + System.setProperty("cassandra.disable_tcactive_openssl", "true"); + System.setProperty("io.netty.transport.noNative", "true"); + } + + static void processReaperWorkaround() throws Throwable { + // Make sure the 'process reaper' thread is initially created under the main classloader, + // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader + // which prevents it from being garbage collected. + new ProcessBuilder().command("true").start().waitFor(); + } + + static void setSystemProperties() { + System.setProperty("cassandra.ring_delay_ms", Integer.toString(30 * 1000)); + System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); + } + + static void setupLogging() { + try { + File root = Files.createTempDirectory("in-jvm-dtest").toFile(); + root.deleteOnExit(); + String testConfPath = "test/conf/logback-dtest.xml"; + Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml"); + + if (!logConfPath.toFile().exists()) { + Files.copy(new File(testConfPath).toPath(), + logConfPath); + } + + System.setProperty("logback.configurationFile", "file://" + logConfPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java index da17df6..3d07a3d 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java +++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java @@ -25,7 +25,11 @@ import java.util.concurrent.Future; // The cross-version API requires that a Coordinator can be constructed without any constructor arguments public interface ICoordinator { - Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues); + default Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays(); + } + QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues); Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues); Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java index dec3549..0dd4865 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java @@ -18,8 +18,7 @@ package org.apache.cassandra.distributed.api; -import org.apache.cassandra.distributed.shared.NetworkTopology; - +import java.net.InetSocketAddress; import java.util.UUID; import java.util.concurrent.Future; @@ -33,7 +32,7 @@ public interface IInstance extends IIsolatedExecutor public Object[][] executeInternal(String query, Object... args); IInstanceConfig config(); - NetworkTopology.AddressAndPort broadcastAddress(); + InetSocketAddress broadcastAddress(); UUID schemaVersion(); void startup(); @@ -43,7 +42,14 @@ public interface IInstance extends IIsolatedExecutor int liveMemberCount(); - int nodetool(String... commandAndArgs); + NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs); + default NodeToolResult nodetoolResult(String... commandAndArgs) + { + return nodetoolResult(true, commandAndArgs); + } + default int nodetool(String... commandAndArgs) { + return nodetoolResult(commandAndArgs).getRc(); + } void uncaughtException(Thread t, Throwable e); /** @@ -59,7 +65,7 @@ public interface IInstance extends IIsolatedExecutor void receiveMessage(IMessage message); int getMessagingVersion(); - void setMessagingVersion(NetworkTopology.AddressAndPort addressAndPort, int version); + void setMessagingVersion(InetSocketAddress addressAndPort, int version); void flush(String keyspace); void forceCompact(String keyspace, String table); diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java index 05969f8..97c1534 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java @@ -18,6 +18,7 @@ package org.apache.cassandra.distributed.api; +import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Objects; @@ -34,7 +35,7 @@ public interface IInstanceConfig int num(); UUID hostId(); - NetworkTopology.AddressAndPort broadcastAddress(); + InetSocketAddress broadcastAddress(); NetworkTopology networkTopology(); String localRack(); diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java index c2e8dd6..d21c594 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IListen.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java @@ -20,7 +20,7 @@ package org.apache.cassandra.distributed.api; public interface IListen { - public interface Cancel { void cancel(); } + interface Cancel { void cancel(); } Cancel schema(Runnable onChange); diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java index da536cc..f75861e 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java @@ -18,9 +18,8 @@ package org.apache.cassandra.distributed.api; -import org.apache.cassandra.distributed.shared.NetworkTopology; - import java.io.Serializable; +import java.net.InetSocketAddress; /** * A cross-version interface for delivering internode messages via message sinks. @@ -34,5 +33,5 @@ public interface IMessage extends Serializable // TODO: need to make this a long int id(); int version(); - NetworkTopology.AddressAndPort from(); + InetSocketAddress from(); } diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java index 01fe972..f2cd6ee 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java @@ -18,6 +18,8 @@ package org.apache.cassandra.distributed.api; +import java.util.function.Predicate; + public interface IMessageFilters { public interface Filter @@ -31,6 +33,21 @@ public interface IMessageFilters Builder from(int ... nums); Builder to(int ... nums); + Builder verbs(int... verbs); + Builder allVerbs(); + + Builder inbound(boolean inbound); + + default Builder inbound() + { + return inbound(true); + } + + default Builder outbound() + { + return inbound(false); + } + /** * Every message for which matcher returns `true` will be _dropped_ (assuming all * other matchers in the chain will return `true` as well). @@ -42,15 +59,42 @@ public interface IMessageFilters public interface Matcher { boolean matches(int from, int to, IMessage message); + + static Matcher of(Predicate<IMessage> fn) { + return (from, to, m) -> fn.test(m); + } } - Builder verbs(int... verbs); - Builder allVerbs(); + Builder inbound(boolean inbound); + default Builder inbound() { + return inbound(true); + } + default Builder outbound() { + return inbound(false); + } + default Builder verbs(int... verbs) { + return inbound().verbs(verbs); + } + default Builder allVerbs() { + return inbound().allVerbs(); + } void reset(); /** - * {@code true} value returned by the implementation implies that the message was + * Checks if the message should be delivered. This is expected to run on "inbound", or on the reciever of + * the message (instance.config.num == to). + * + * @return {@code true} value returned by the implementation implies that the message was + * not matched by any filters and therefore should be delivered. + */ + boolean permitInbound(int from, int to, IMessage msg); + + /** + * Checks if the message should be delivered. This is expected to run on "outbound", or on the sender of + * the message (instance.config.num == from). + * + * @return {@code true} value returned by the implementation implies that the message was * not matched by any filters and therefore should be delivered. */ - boolean permit(int from, int to, IMessage msg); + boolean permitOutbound(int from, int to, IMessage msg); } diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java similarity index 67% copy from src/main/java/org/apache/cassandra/distributed/api/IMessage.java copy to src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java index da536cc..06327e8 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java +++ b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java @@ -18,21 +18,21 @@ package org.apache.cassandra.distributed.api; -import org.apache.cassandra.distributed.shared.NetworkTopology; - import java.io.Serializable; -/** - * A cross-version interface for delivering internode messages via message sinks. - * - * Message implementations should be serializable so we could load into instances. - */ -public interface IMessage extends Serializable +public final class LongTokenRange implements Serializable { - int verb(); - byte[] bytes(); - // TODO: need to make this a long - int id(); - int version(); - NetworkTopology.AddressAndPort from(); + public final long minExclusive; + public final long maxInclusive; + + public LongTokenRange(long minExclusive, long maxInclusive) + { + this.minExclusive = minExclusive; + this.maxInclusive = maxInclusive; + } + + public String toString() + { + return "(" + minExclusive + "," + maxInclusive + "]"; + } } diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java new file mode 100644 index 0000000..773f617 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.shared.AssertUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.management.Notification; + +public class NodeToolResult +{ + private final String[] commandAndArgs; + private final int rc; + private final List<Notification> notifications; + private final Throwable error; + + public NodeToolResult(String[] commandAndArgs, int rc, List<Notification> notifications, Throwable error) + { + this.commandAndArgs = commandAndArgs; + this.rc = rc; + this.notifications = notifications; + this.error = error; + } + + public String[] getCommandAndArgs() + { + return commandAndArgs; + } + + public int getRc() + { + return rc; + } + + public List<Notification> getNotifications() + { + return notifications; + } + + public Throwable getError() + { + return error; + } + + public Asserts asserts() + { + return new Asserts(); + } + + public final class Asserts { + public Asserts success() { + AssertUtils.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc); + return this; + } + + public Asserts failure() { + AssertUtils.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc); + return this; + } + + public Asserts errorContains(String msg) { + AssertUtils.assertNotNull("No exception was found but expected one", error); + AssertUtils.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg)); + return this; + } + + public Asserts notificationContains(String msg) { + AssertUtils.assertNotNull("notifications not defined", notifications); + AssertUtils.assertFalse("notifications not defined", notifications.isEmpty()); + for (Notification n : notifications) { + if (n.getMessage().contains(msg)) { + return this; + } + } + AssertUtils.fail("Unable to locate message " + msg + " in notifications: " + notifications); + return this; // unreachable + } + + public Asserts notificationContains(ProgressEventType type, String msg) { + int userType = type.ordinal(); + AssertUtils.assertNotNull("notifications not defined", notifications); + AssertUtils.assertFalse("notifications not defined", notifications.isEmpty()); + for (Notification n : notifications) { + if (notificationType(n) == userType) { + if (n.getMessage().contains(msg)) { + return this; + } + } + } + AssertUtils.fail("Unable to locate message '" + msg + "' in notifications: " + notifications); + return this; // unreachable + } + } + + private static int notificationType(Notification n) + { + return ((Map<String, Integer>) n.getUserData()).get("type").intValue(); + } + + public String toString() + { + return "NodeToolResult{" + + "commandAndArgs=" + Arrays.toString(commandAndArgs) + + ", rc=" + rc + + ", notifications=[" + notifications.stream().map(n -> ProgressEventType.values()[notificationType(n)].name()).collect(Collectors.joining(", ")) + "]" + + ", error=" + error + + '}'; + } + + /** + * Progress event type. + * + * <p> + * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events, + * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}. + * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process. + * </p> + * <p> + * {@link #NOTIFICATION} event type is used to just notify message without progress. + * </p> + */ + public enum ProgressEventType + { + /** + * Fired first when progress starts. + * Happens only once. + */ + START, + + /** + * Fire when progress happens. + * This can be zero or more time after START. + */ + PROGRESS, + + /** + * When observing process completes with error, this is sent once before COMPLETE. + */ + ERROR, + + /** + * When observing process is aborted by user, this is sent once before COMPLETE. + */ + ABORT, + + /** + * When observing process completes successfully, this is sent once before COMPLETE. + */ + SUCCESS, + + /** + * Fire when progress complete. + * This is fired once, after ERROR/ABORT/SUCCESS is fired. + * After this, no more ProgressEvent should be fired for the same event. + */ + COMPLETE, + + /** + * Used when sending message without progress. + */ + NOTIFICATION + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java new file mode 100644 index 0000000..dcdfa14 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Predicate; + +/** + * A table of data representing a complete query result. + * + * A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in several key ways: + * + * <ul> + * <li>represents a complete result rather than a cursor</li> + * <li>returns a {@link Row} to access the current row of data</li> + * <li>relies on object pooling; {@link #hasNext()} may return the same object just with different data, accessing a + * {@link Row} from a previous {@link #hasNext()} call has undefined behavior.</li> + * <li>includes {@link #filter(Predicate)}, this will do client side filtering since Apache Cassandra is more + * restrictive on server side filtering</li> + * </ul> + * + * <h2>Unsafe patterns</h2> + * + * Below are a few unsafe patterns which may lead to unexpected results + * + * <code>{@code + * while (rs.hasNext()) { + * list.add(rs.next()); + * } + * }</code> + * + * <code>{@code + * rs.forEach(list::add) + * }</code> + * + * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row} + * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact + * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()} + * should be used; this will clone the {@link Row} and return a new object pointing to the same data. + */ +public class QueryResult implements Iterator<Row> +{ + public static final QueryResult EMPTY = new QueryResult(new String[0], null); + + private final String[] names; + private final Object[][] results; + private final Predicate<Row> filter; + private final Row row; + private int offset = -1; + + public QueryResult(String[] names, Object[][] results) + { + this.names = Objects.requireNonNull(names, "names"); + this.results = results; + this.row = new Row(names); + this.filter = ignore -> true; + } + + private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset) + { + this.names = names; + this.results = results; + this.filter = filter; + this.offset = offset; + this.row = new Row(names); + } + + public String[] getNames() + { + return names; + } + + public boolean isEmpty() + { + return results.length == 0; + } + + public int size() + { + return results.length; + } + + public QueryResult filter(Predicate<Row> fn) + { + return new QueryResult(names, results, filter.and(fn), offset); + } + + /** + * Get all rows as a 2d array. Any calls to {@link #filter(Predicate)} will be ignored and the array returned will + * be the full set from the query. + */ + public Object[][] toObjectArrays() + { + return results; + } + + @Override + public boolean hasNext() + { + if (results == null) + return false; + while ((offset += 1) < results.length) + { + row.setResults(results[offset]); + if (filter.test(row)) + { + return true; + } + } + row.setResults(null); + return false; + } + + @Override + public Row next() + { + if (offset < 0 || offset >= results.length) + throw new NoSuchElementException(); + return row; + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java new file mode 100644 index 0000000..1dd05fe --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.*; + +/** + * Data representing a single row in a query result. + * + * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls + * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call; + * to get around this, a call to {@link #copy()} will return a new object pointing to the same row. + */ +public class Row +{ + private final Map<String, Integer> nameIndex; + private Object[] results; // mutable to avoid allocations in loops + + public Row(String[] names) + { + Objects.requireNonNull(names, "names"); + this.nameIndex = new HashMap<>(names.length); + for (int i = 0; i < names.length; i++) { + nameIndex.put(names[i], i); + } + } + + private Row(Map<String, Integer> nameIndex) + { + this.nameIndex = nameIndex; + } + + void setResults(Object[] results) + { + this.results = results; + } + + /** + * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}. + */ + public Row copy() { + Row copy = new Row(nameIndex); + copy.setResults(results); + return copy; + } + + public <T> T get(String name) + { + checkAccess(); + int idx = findIndex(name); + if (idx == -1) + return null; + return (T) results[idx]; + } + + public String getString(String name) + { + return get(name); + } + + public UUID getUUID(String name) + { + return get(name); + } + + public Date getTimestamp(String name) + { + return get(name); + } + + public <T> Set<T> getSet(String name) + { + return get(name); + } + + public String toString() + { + return "Row{" + + "names=" + nameIndex.keySet() + + ", results=" + Arrays.toString(results) + + '}'; + } + + private void checkAccess() + { + if (results == null) + throw new NoSuchElementException(); + } + + private int findIndex(String name) + { + return nameIndex.getOrDefault(name, -1); + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java new file mode 100644 index 0000000..f914e90 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java @@ -0,0 +1,130 @@ +package org.apache.cassandra.distributed.shared; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +public class AssertUtils { + + public static void assertRows(Object[][] actual, Object[]... expected) + { + assertEquals(rowsNotEqualErrorMessage(actual, expected), + expected.length, actual.length); + + for (int i = 0; i < expected.length; i++) + { + Object[] expectedRow = expected[i]; + Object[] actualRow = actual[i]; + assertTrue(rowsNotEqualErrorMessage(actual, expected), + Arrays.equals(expectedRow, actualRow)); + } + } + + public static void assertRow(Object[] actual, Object... expected) + { + assertTrue(rowNotEqualErrorMessage(actual, expected), + Arrays.equals(actual, expected)); + } + + public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected) + { + while (actual.hasNext() && expected.hasNext()) + assertRow(actual.next(), expected.next()); + + assertTrue("Resultsets have different sizes", actual.hasNext() == expected.hasNext()); + } + + public static void assertRows(Iterator<Object[]> actual, Object[]... expected) + { + assertRows(actual, new Iterator<Object[]>() { + + int i = 0; + @Override + public boolean hasNext() { + return i < expected.length; + } + + @Override + public Object[] next() { + return expected[i++]; + } + }); + } + + public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected) + { + return String.format("Expected: %s\nActual:%s\n", + Arrays.toString(expected), + Arrays.toString(actual)); + } + + public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected) + { + return String.format("Expected: %s\nActual: %s\n", + rowsToString(expected), + rowsToString(actual)); + } + + public static String rowsToString(Object[][] rows) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + boolean isFirst = true; + for (Object[] row : rows) + { + if (isFirst) + isFirst = false; + else + builder.append(","); + builder.append(Arrays.toString(row)); + } + builder.append("]"); + return builder.toString(); + } + + public static Object[][] toObjectArray(Iterator<Object[]> iter) + { + List<Object[]> res = new ArrayList<>(); + while (iter.hasNext()) + res.add(iter.next()); + + return res.toArray(new Object[res.size()][]); + } + + public static Object[] row(Object... expected) + { + return expected; + } + + public static void assertEquals(String message, long expected, long actual) { + if (expected != actual) + fail(message); + } + + public static void assertNotEquals(String message, long expected, long actual) { + if (expected == actual) + fail(message); + } + + public static void assertNotNull(String message, Object object) { + if (object == null) + fail(message); + } + + public static void assertTrue(String message, boolean condition) { + if (!condition) + fail(message); + } + + public static void assertFalse(String message, boolean condition) { + if (condition) + fail(message); + } + + + public static void fail(String message) { + throw new AssertionError(message); + } + +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java index 89dc0cd..d28af2a 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java @@ -20,81 +20,22 @@ package org.apache.cassandra.distributed.shared; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; -import org.junit.After; -import org.junit.Assert; -import org.junit.BeforeClass; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; public abstract class DistributedTestBase { - @After public void afterEach() { System.runFinalization(); System.gc(); } - public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder(); - - public static String KEYSPACE = "distributed_test_keyspace"; - - // TODO: move this to Cluster.java? - public static void nativeLibraryWorkaround() - { - // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask, - // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask, - // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader. - System.setProperty("cassandra.disable_tcactive_openssl", "true"); - System.setProperty("io.netty.transport.noNative", "true"); - } - - public static void processReaperWorkaround() throws Throwable { - // Make sure the 'process reaper' thread is initially created under the main classloader, - // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader - // which prevents it from being garbage collected. - new ProcessBuilder().command("true").start().waitFor(); + public static void beforeClass() throws Throwable { + ICluster.setup(); } - public static void setupLogging() - { - try - { - File root = Files.createTempDirectory("in-jvm-dtest").toFile(); - root.deleteOnExit(); - String testConfPath = "test/conf/logback-dtest.xml"; - Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml"); - - if (!logConfPath.toFile().exists()) - { - Files.copy(new File(testConfPath).toPath(), - logConfPath); - } - - System.setProperty("logback.configurationFile", "file://" + logConfPath); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } + public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder(); - @BeforeClass - public static void setup() throws Throwable { - setupLogging(); - System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000)); - System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); - nativeLibraryWorkaround(); - processReaperWorkaround(); - } + public static String KEYSPACE = "distributed_test_keyspace"; public static String withKeyspace(String replaceIn) { @@ -112,94 +53,5 @@ public abstract class DistributedTestBase return cluster; } - public static void assertRows(Object[][] actual, Object[]... expected) - { - Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected), - expected.length, actual.length); - - for (int i = 0; i < expected.length; i++) - { - Object[] expectedRow = expected[i]; - Object[] actualRow = actual[i]; - Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected), - Arrays.equals(expectedRow, actualRow)); - } - } - - public static void assertRow(Object[] actual, Object... expected) - { - Assert.assertTrue(rowNotEqualErrorMessage(actual, expected), - Arrays.equals(actual, expected)); - } - - public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected) - { - while (actual.hasNext() && expected.hasNext()) - assertRow(actual.next(), expected.next()); - - Assert.assertEquals("Resultsets have different sizes", actual.hasNext(), expected.hasNext()); - } - - public static void assertRows(Iterator<Object[]> actual, Object[]... expected) - { - assertRows(actual, new Iterator<Object[]>() { - - int i = 0; - @Override - public boolean hasNext() { - return i < expected.length; - } - - @Override - public Object[] next() { - return expected[i++]; - } - }); - } - - public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected) - { - return String.format("Expected: %s\nActual:%s\n", - Arrays.toString(expected), - Arrays.toString(actual)); - } - - public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected) - { - return String.format("Expected: %s\nActual: %s\n", - rowsToString(expected), - rowsToString(actual)); - } - - public static String rowsToString(Object[][] rows) - { - StringBuilder builder = new StringBuilder(); - builder.append("["); - boolean isFirst = true; - for (Object[] row : rows) - { - if (isFirst) - isFirst = false; - else - builder.append(","); - builder.append(Arrays.toString(row)); - } - builder.append("]"); - return builder.toString(); - } - - public static Object[][] toObjectArray(Iterator<Object[]> iter) - { - List<Object[]> res = new ArrayList<>(); - while (iter.hasNext()) - res.add(iter.next()); - - return res.toArray(new Object[res.size()][]); - } - - public static Object[] row(Object... expected) - { - return expected; - } } diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java index 57b8f57..c1731db 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java @@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.shared; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.cassandra.distributed.api.IMessage; @@ -27,9 +28,20 @@ import org.apache.cassandra.distributed.api.IMessageFilters; public class MessageFilters implements IMessageFilters { - private final List<Filter> filters = new CopyOnWriteArrayList<>(); + private final List<Filter> inboundFilters = new CopyOnWriteArrayList<>(); + private final List<Filter> outboundFilters = new CopyOnWriteArrayList<>(); - public boolean permit(int from, int to, IMessage msg) + public boolean permitInbound(int from, int to, IMessage msg) + { + return permit(inboundFilters, from, to, msg); + } + + public boolean permitOutbound(int from, int to, IMessage msg) + { + return permit(outboundFilters, from, to, msg); + } + + private static boolean permit(List<Filter> filters, int from, int to, IMessage msg) { for (Filter filter : filters) { @@ -39,14 +51,15 @@ public class MessageFilters implements IMessageFilters return true; } - public class Filter implements IMessageFilters.Filter + public static class Filter implements IMessageFilters.Filter { final int[] from; final int[] to; final int[] verbs; final Matcher matcher; + final List<Filter> parent; - Filter(int[] from, int[] to, int[] verbs, Matcher matcher) + Filter(int[] from, int[] to, int[] verbs, Matcher matcher, List<Filter> parent) { if (from != null) { @@ -67,13 +80,15 @@ public class MessageFilters implements IMessageFilters this.to = to; this.verbs = verbs; this.matcher = matcher; + this.parent = Objects.requireNonNull(parent, "parent"); } public int hashCode() { return (from == null ? 0 : Arrays.hashCode(from)) + (to == null ? 0 : Arrays.hashCode(to)) - + (verbs == null ? 0 : Arrays.hashCode(verbs)); + + (verbs == null ? 0 : Arrays.hashCode(verbs) + + parent.hashCode()); } public boolean equals(Object that) @@ -85,18 +100,19 @@ public class MessageFilters implements IMessageFilters { return Arrays.equals(from, that.from) && Arrays.equals(to, that.to) - && Arrays.equals(verbs, that.verbs); + && Arrays.equals(verbs, that.verbs) + && parent.equals(that.parent); } public Filter off() { - filters.remove(this); + parent.remove(this); return this; } public Filter on() { - filters.add(this); + parent.add(this); return this; } @@ -115,10 +131,11 @@ public class MessageFilters implements IMessageFilters int[] to; int[] verbs; Matcher matcher; + boolean inbound; - private Builder(int[] verbs) + private Builder(boolean inbound) { - this.verbs = verbs; + this.inbound = inbound; } public Builder from(int... nums) @@ -133,33 +150,45 @@ public class MessageFilters implements IMessageFilters return this; } - public IMessageFilters.Builder messagesMatching(Matcher matcher) + public IMessageFilters.Builder verbs(int... verbs) { - this.matcher = matcher; + this.verbs = verbs; return this; } - public Filter drop() + public IMessageFilters.Builder allVerbs() { - return new Filter(from, to, verbs, matcher).on(); + this.verbs = null; + return this; } - } + public IMessageFilters.Builder inbound(boolean inbound) + { + this.inbound = inbound; + return this; + } - public Builder verbs(int... verbs) - { - return new Builder(verbs); + public IMessageFilters.Builder messagesMatching(Matcher matcher) + { + this.matcher = matcher; + return this; + } + + public IMessageFilters.Filter drop() + { + return new Filter(from, to, verbs, matcher, inbound ? inboundFilters : outboundFilters).on(); + } } - @Override - public Builder allVerbs() + public IMessageFilters.Builder inbound(boolean inbound) { - return new Builder(null); + return new Builder(inbound); } @Override public void reset() { - filters.clear(); + inboundFilters.clear(); + outboundFilters.clear(); } } diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java deleted file mode 100644 index 6eedb16..0000000 --- a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.shared; - -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.cassandra.distributed.api.IMessage; - -public class MessageFiltersTest -{ - @Test - public void simpleFiltersTest() throws Throwable - { - int VERB1 = 1; - int VERB2 = 2; - int VERB3 = 3; - int i1 = 1; - int i2 = 2; - int i3 = 3; - String MSG1 = "msg1"; - String MSG2 = "msg2"; - - MessageFilters filters = new MessageFilters(); - MessageFilters.Filter filter = filters.allVerbs().from(1).drop(); - - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1))); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1))); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); - filter.off(); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); - filters.reset(); - - filters.verbs(VERB1).from(1).to(2).drop(); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1))); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1))); - - filters.reset(); - AtomicInteger counter = new AtomicInteger(); - filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> { - counter.incrementAndGet(); - return Arrays.equals(msg.bytes(), MSG1.getBytes()); - }).drop(); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertEquals(counter.get(), 1); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2))); - Assert.assertEquals(counter.get(), 2); - - // filter chain gets interrupted because a higher level filter returns no match - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); - Assert.assertEquals(counter.get(), 2); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1))); - Assert.assertEquals(counter.get(), 2); - filters.reset(); - - filters.allVerbs().from(3, 2).to(2, 1).drop(); - Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1))); - Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1))); - Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1))); - filters.reset(); - - counter.set(0); - filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> { - counter.incrementAndGet(); - return false; - }).drop(); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertEquals(2, counter.get()); - } - - IMessage msg(int verb, String msg) - { - return new IMessage() - { - public int verb() { return verb; } - public byte[] bytes() { return msg.getBytes(); } - public int id() { return 0; } - public int version() { return 0; } - public NetworkTopology.AddressAndPort from() { return null; } - public int fromPort() - { - return 0; - } - }; - } -} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java index 9a8e8f6..7bd91d3 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java @@ -18,8 +18,8 @@ package org.apache.cassandra.distributed.shared; -import java.io.Serializable; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; @@ -30,7 +30,7 @@ import java.util.stream.IntStream; public class NetworkTopology { - private final Map<AddressAndPort, DcAndRack> map; + private final Map<InetSocketAddress, DcAndRack> map; public static class DcAndRack { @@ -69,68 +69,21 @@ public class NetworkTopology } } - public static class AddressAndPort implements Serializable - { - private final InetAddress address; - private final int port; - - public AddressAndPort(InetAddress address, int port) - { - this.address = address; - this.port = port; - } - - public int getPort() - { - return port; - } - - public InetAddress getAddress() - { - return address; - } - - @Override - public String toString() - { - return "AddressAndPort{" + - "address=" + address + - ", port=" + port + - '}'; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AddressAndPort that = (AddressAndPort) o; - return port == that.port && - Objects.equals(address, that.address); - } - - @Override - public int hashCode() - { - return Objects.hash(address, port); - } - } - public static DcAndRack dcAndRack(String dc, String rack) { return new DcAndRack(dc, rack); } - public static AddressAndPort addressAndPort(InetAddress address, int port) + public static InetSocketAddress addressAndPort(InetAddress address, int port) { - return new AddressAndPort(address, port); + return new InetSocketAddress(address, port); } - public static AddressAndPort addressAndPort(String address, int port) + public static InetSocketAddress addressAndPort(String address, int port) { try { - return new AddressAndPort(InetAddress.getByName(address), port); + return new InetSocketAddress(InetAddress.getByName(address), port); } catch (UnknownHostException e) { @@ -166,12 +119,12 @@ public class NetworkTopology return topology; } - public DcAndRack put(AddressAndPort addressAndPort, DcAndRack value) + public DcAndRack put(InetSocketAddress addressAndPort, DcAndRack value) { return map.put(addressAndPort, value); } - public String localRack(NetworkTopology.AddressAndPort key) + public String localRack(InetSocketAddress key) { DcAndRack p = map.get(key); if (p == null) @@ -179,7 +132,7 @@ public class NetworkTopology return p.rack; } - public String localDC(NetworkTopology.AddressAndPort key) + public String localDC(InetSocketAddress key) { DcAndRack p = map.get(key); if (p == null) @@ -187,7 +140,7 @@ public class NetworkTopology return p.dc; } - public boolean contains(NetworkTopology.AddressAndPort key) + public boolean contains(InetSocketAddress key) { return map.containsKey(key); } diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java index 30c0c7c..51c9e49 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java @@ -18,6 +18,9 @@ package org.apache.cassandra.distributed.shared; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.net.MalformedURLException; import java.net.URL; @@ -29,6 +32,8 @@ import java.util.stream.Collectors; public class Versions { + private static final Logger logger = LoggerFactory.getLogger(Versions.class); + public static final String PROPERTY_PREFIX = "cassandra."; public static URL[] getClassPath() @@ -159,7 +164,7 @@ public class Versions { final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build"); final File sourceDirectory = new File(dtestJarDirectory); - System.out.println("Looking for dtest jars in " + sourceDirectory.getAbsolutePath()); + logger.info("Looking for dtest jars in " + sourceDirectory.getAbsolutePath()); final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar"); final Map<Major, List<Version>> versions = new HashMap<>(); for (Major major : Major.values()) diff --git a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java index e61faa6..79f5191 100644 --- a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java +++ b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java @@ -36,11 +36,9 @@ import org.apache.cassandra.distributed.shared.NetworkTopology; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; -import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE; public class BootstrapTest extends TestBaseImpl { - @Test public void bootstrapTest() throws Throwable { @@ -62,11 +60,6 @@ public class BootstrapTest extends TestBaseImpl config.set("auto_bootstrap", true); cluster.bootstrap(config).startup(); - - cluster.stream().forEach(instance -> { - instance.nodetool("cleanup", KEYSPACE, "tbl"); - }); - withBootstrap = count(cluster); } @@ -80,7 +73,8 @@ public class BootstrapTest extends TestBaseImpl naturally = count(cluster); } - Assert.assertEquals(withBootstrap, naturally); + for (Map.Entry<Integer, Long> e : withBootstrap.entrySet()) + Assert.assertTrue(e.getValue() >= naturally.get(e.getKey())); } public void populate(ICluster cluster) diff --git a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java deleted file mode 100644 index 0ce0e74..0000000 --- a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test; - -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.ICluster; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.cassandra.distributed.api.Feature.NETWORK; - -public class DistributedReadWritePathTest extends TestBaseImpl -{ - @Test - public void coordinatorReadTest() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); - - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - 1), - row(1, 1, 1), - row(1, 2, 2), - row(1, 3, 3)); - } - } - - @Test - public void largeMessageTest() throws Throwable - { - int largeMessageThreshold = 1024 * 64; - try (ICluster cluster = init(builder().withNodes(2).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))"); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < largeMessageThreshold ; i++) - builder.append('a'); - String s = builder.toString(); - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", - ConsistencyLevel.ALL, - s); - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - 1), - row(1, 1, s)); - } - } - - @Test - public void coordinatorWriteTest() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); - - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", - ConsistencyLevel.QUORUM); - - for (int i = 0; i < 3; i++) - { - assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), - row(1, 1, 1)); - } - - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", - ConsistencyLevel.QUORUM), - row(1, 1, 1)); - } - } - - @Test - public void readRepairTest() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); - - assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); - - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", - ConsistencyLevel.ALL), // ensure node3 in preflist - row(1, 1, 1)); - - // Verify that data got repaired to the third node - assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), - row(1, 1, 1)); - } - } - - @Test - public void writeWithSchemaDisagreement() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - - // Introduce schema disagreement - cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - - Exception thrown = null; - try - { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", - ConsistencyLevel.QUORUM); - } - catch (RuntimeException e) - { - thrown = e; - } - - Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); - Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); - } - } - - @Test - public void readWithSchemaDisagreement() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - - // Introduce schema disagreement - cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - - Exception thrown = null; - try - { - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", - ConsistencyLevel.ALL), - row(1, 1, 1, null)); - } - catch (Exception e) - { - thrown = e; - } - - Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); - Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); - } - } - - @Test - public void simplePagedReadsTest() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - - int size = 100; - Object[][] results = new Object[size][]; - for (int i = 0; i < size; i++) - { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", - ConsistencyLevel.QUORUM, - i, i); - results[i] = new Object[] { 1, i, i}; - } - - // Make sure paged read returns same results with different page sizes - for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50}) - { - assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", - ConsistencyLevel.QUORUM, - pageSize), - results); - } - } - } - - @Test - public void pagingWithRepairTest() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - - int size = 100; - Object[][] results = new Object[size][]; - for (int i = 0; i < size; i++) - { - // Make sure that data lands on different nodes and not coordinator - cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", - i, i); - - results[i] = new Object[] { 1, i, i}; - } - - // Make sure paged read returns same results with different page sizes - for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50}) - { - assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", - ConsistencyLevel.ALL, - pageSize), - results); - } - - assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"), - results); - } - } - - @Test - public void pagingTests() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).start()); - ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - - for (int i = 0; i < 10; i++) - { - for (int j = 0; j < 10; j++) - { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", - ConsistencyLevel.QUORUM, - i, j, i + i); - singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", - ConsistencyLevel.QUORUM, - i, j, i + i); - } - } - - int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50}; - String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2", - "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2", - "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3", - "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)", - "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2" - }; - for (String statement : statements) - { - for (int pageSize : pageSizes) - { - assertRows(cluster.coordinator(1) - .executeWithPaging(statement, - ConsistencyLevel.QUORUM, pageSize), - singleNode.coordinator(1) - .executeWithPaging(statement, - ConsistencyLevel.QUORUM, Integer.MAX_VALUE)); - } - } - } - } -} - diff --git a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java deleted file mode 100644 index 3c3ee47..0000000 --- a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test; - -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.ICluster; - -import static java.util.concurrent.TimeUnit.SECONDS; - -public class LargeColumnTest extends TestBaseImpl -{ - private static final Logger logger = LoggerFactory.getLogger(LargeColumnTest.class); - - private static String str(int length, Random random, long seed) - { - random.setSeed(seed); - char[] chars = new char[length]; - int i = 0; - int s = 0; - long v = 0; - while (i < length) - { - if (s == 0) - { - v = random.nextLong(); - s = 8; - } - chars[i] = (char) (((v & 127) + 32) & 127); - v >>= 8; - --s; - ++i; - } - return new String(chars); - } - - private void testLargeColumns(int nodes, int columnSize, int rowCount) throws Throwable - { - Random random = new Random(); - long seed = ThreadLocalRandom.current().nextLong(); - logger.info("Using seed {}", seed); - - try (ICluster cluster = init(builder() - .withNodes(nodes) - .withConfig(config -> - config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20) - .set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", columnSize * 2) - .set("internode_application_send_queue_reserve_global_capacity_in_bytes", columnSize * 3) - .set("write_request_timeout_in_ms", SECONDS.toMillis(30L)) - .set("read_request_timeout_in_ms", SECONDS.toMillis(30L)) - .set("memtable_heap_space_in_mb", 1024) - ) - .start())) - { - cluster.schemaChange(String.format("CREATE TABLE %s.cf (k int, c text, PRIMARY KEY (k))", KEYSPACE)); - - for (int i = 0; i < rowCount; ++i) - cluster.coordinator(1).execute(String.format("INSERT INTO %s.cf (k, c) VALUES (?, ?);", KEYSPACE), ConsistencyLevel.ALL, i, str(columnSize, random, seed | i)); - - for (int i = 0; i < rowCount; ++i) - { - Object[][] results = cluster.coordinator(1).execute(String.format("SELECT k, c FROM %s.cf WHERE k = ?;", KEYSPACE), ConsistencyLevel.ALL, i); - Assert.assertTrue(str(columnSize, random, seed | i).equals(results[0][1])); - } - } - } - - @Test - public void test() throws Throwable - { - testLargeColumns(2, 16 << 20, 5); - } -} \ No newline at end of file diff --git a/src/test/java/org/apache/cassandra/distributed/test/MessageFiltersTest.java b/src/test/java/org/apache/cassandra/distributed/test/MessageFiltersTest.java new file mode 100644 index 0000000..2241ed4 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/MessageFiltersTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.apache.cassandra.distributed.api.IMessage; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.shared.MessageFilters; +import org.junit.Assert; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +public class MessageFiltersTest +{ + @Test + public void simpleInboundFiltersTest() + { + simpleFiltersTest(true); + } + + @Test + public void simpleOutboundFiltersTest() + { + simpleFiltersTest(false); + } + + private interface Permit + { + boolean test(int from, int to, IMessage msg); + } + + private void simpleFiltersTest(boolean inbound) + { + int VERB1 = 1; + int VERB2 = 2; + int VERB3 = 3; + int i1 = 1; + int i2 = 2; + int i3 = 3; + String MSG1 = "msg1"; + String MSG2 = "msg2"; + + MessageFilters filters = new MessageFilters(); + Permit permit = inbound ? filters::permitInbound : filters::permitOutbound; + + IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1))); + Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1))); + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); + filter.off(); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1))); + filters.reset(); + + filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1))); + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1))); + + filters.reset(); + AtomicInteger counter = new AtomicInteger(); + filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> { + counter.incrementAndGet(); + return Arrays.equals(msg.bytes(), MSG1.getBytes()); + }).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertEquals(counter.get(), 1); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2))); + Assert.assertEquals(counter.get(), 2); + + // filter chain gets interrupted because a higher level filter returns no match + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); + Assert.assertEquals(counter.get(), 2); + Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1))); + Assert.assertEquals(counter.get(), 2); + filters.reset(); + + filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop(); + Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1))); + Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1))); + Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1))); + filters.reset(); + + counter.set(0); + filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> { + counter.incrementAndGet(); + return false; + }).drop(); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertEquals(2, counter.get()); + } + + IMessage msg(int verb, String msg) + { + return new IMessage() + { + public int verb() { return verb; } + public byte[] bytes() { return msg.getBytes(); } + public int id() { return 0; } + public int version() { return 0; } + public InetSocketAddress from() { return null; } + public int fromPort() + { + return 0; + } + }; + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java index c7e9b26..2bbe2a9 100644 --- a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java +++ b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java @@ -33,6 +33,8 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.shared.AssertUtils.*; + public class NativeProtocolTest extends TestBaseImpl { diff --git a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 70790bc..4c4ef0f 100644 --- a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java +++ b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@ -25,6 +25,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICluster; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.shared.AssertUtils.*; public class SimpleReadWriteTest extends TestBaseImpl { @@ -33,7 +34,7 @@ public class SimpleReadWriteTest extends TestBaseImpl { try (ICluster cluster = init(builder().withNodes(3).start())) { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); @@ -53,7 +54,7 @@ public class SimpleReadWriteTest extends TestBaseImpl { try (ICluster cluster = init(builder().withNodes(3).start())) { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", ConsistencyLevel.QUORUM); @@ -75,7 +76,7 @@ public class SimpleReadWriteTest extends TestBaseImpl { try (ICluster cluster = init(builder().withNodes(3).start())) { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); @@ -93,67 +94,6 @@ public class SimpleReadWriteTest extends TestBaseImpl } @Test - public void writeWithSchemaDisagreement() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - - // Introduce schema disagreement - cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - - Exception thrown = null; - try - { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", - ConsistencyLevel.QUORUM); - } - catch (RuntimeException e) - { - thrown = e; - } - - Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); - Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); - } - } - - @Test - public void readWithSchemaDisagreement() throws Throwable - { - try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - - // Introduce schema disagreement - cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - - Exception thrown = null; - try - { - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", - ConsistencyLevel.ALL), - row(1, 1, 1, null)); - } - catch (Exception e) - { - thrown = e; - } - - Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); - Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); - } - } - - @Test public void simplePagedReadsTest() throws Throwable { try (ICluster cluster = init(builder().withNodes(3).start())) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org