This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to annotated tag 0.0.1-1 in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
commit 10a3729e259c47101da2c53505177f5159e603da Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Feb 24 12:06:09 2020 +0100 Introduce the extracted in-JVM DTest API Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-15539. --- pom.xml | 94 +++++++ .../distributed/api/ConsistencyLevel.java | 34 +++ .../apache/cassandra/distributed/api/Feature.java | 24 ++ .../apache/cassandra/distributed/api/ICluster.java | 41 +++ .../cassandra/distributed/api/ICoordinator.java | 34 +++ .../cassandra/distributed/api/IInstance.java | 66 +++++ .../cassandra/distributed/api/IInstanceConfig.java | 101 +++++++ .../distributed/api/IInvokableInstance.java | 67 +++++ .../distributed/api/IIsolatedExecutor.java | 128 +++++++++ .../apache/cassandra/distributed/api/IListen.java | 28 ++ .../apache/cassandra/distributed/api/IMessage.java | 38 +++ .../cassandra/distributed/api/IMessageFilters.java | 56 ++++ .../distributed/api/IUpgradeableInstance.java | 29 +++ .../cassandra/distributed/api/TokenSupplier.java | 32 +++ .../cassandra/distributed/shared/Builder.java | 233 +++++++++++++++++ .../distributed/shared/DistributedTestBase.java | 205 +++++++++++++++ .../distributed/shared/InstanceClassLoader.java | 116 +++++++++ .../distributed/shared/MessageFilters.java | 165 ++++++++++++ .../distributed/shared/MessageFiltersTest.java | 113 ++++++++ .../distributed/shared/NetworkTopology.java | 216 +++++++++++++++ .../distributed/shared/ThrowingRunnable.java | 38 +++ .../cassandra/distributed/shared/Versions.java | 201 ++++++++++++++ .../cassandra/distributed/test/BootstrapTest.java | 104 ++++++++ .../test/DistributedReadWritePathTest.java | 290 +++++++++++++++++++++ .../distributed/test/GossipSettlesTest.java | 43 +++ .../distributed/test/LargeColumnTest.java | 96 +++++++ .../distributed/test/NativeProtocolTest.java | 79 ++++++ .../distributed/test/NetworkTopologyTest.java | 98 +++++++ .../distributed/test/SimpleReadWriteTest.java | 270 +++++++++++++++++++ test/conf/logback-dtest.xml | 77 ++++++ 30 files changed, 3116 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..4b92a62 --- /dev/null +++ b/pom.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>10</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.cassandra</groupId> + <artifactId>dtest-api</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>In JVM Test API</name> + <description>In JVM Test API</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <licenses> + <license> + <name>Apache License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + </dependency> + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>in-jvm-dtest-cassandra-tryout</artifactId> + <version>0.0.7-3.11-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + + + <plugin> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.8.2</version> + <executions> + <execution> + <id>default-deploy</id> + <phase>deploy</phase> + <goals> + <goal>deploy</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>sign-artifacts</id> + <phase>verify</phase> + <goals> + <goal>sign</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <scm> + <connection>scm:git:g...@github.com:apache/cassandra-in-jvm-dtests.git</connection> + <developerConnection>scm:git:g...@github.com:apache/cassandra-in-jvm-dtests.git</developerConnection> + <url>g...@github.com:apache/cassandra-in-jvm-dtests.git</url> + <tag>HEAD</tag> + </scm> +</project> + diff --git a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java new file mode 100644 index 0000000..3c057f8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public enum ConsistencyLevel { + ANY, + ONE, + TWO, + THREE, + QUORUM, + ALL, + LOCAL_QUORUM, + EACH_QUORUM, + SERIAL, + LOCAL_SERIAL, + LOCAL_ONE, + NODE_LOCAL +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/Feature.java b/src/main/java/org/apache/cassandra/distributed/api/Feature.java new file mode 100644 index 0000000..b4ba036 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/Feature.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public enum Feature +{ + NETWORK, GOSSIP, NATIVE_PROTOCOL +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java new file mode 100644 index 0000000..6d86e99 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.shared.NetworkTopology; + +import java.util.stream.Stream; + +public interface ICluster<I extends IInstance> extends AutoCloseable +{ + void startup(); + I bootstrap(IInstanceConfig config); + I get(int i); + I get(NetworkTopology.AddressAndPort endpoint); + ICoordinator coordinator(int node); + void schemaChange(String query); + void schemaChange(String statement, int instance); + + int size(); + + Stream<I> stream(); + Stream<I> stream(String dcName); + Stream<I> stream(String dcName, String rackName); + IMessageFilters filters(); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java new file mode 100644 index 0000000..da17df6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.Future; + +// The cross-version API requires that a Coordinator can be constructed without any constructor arguments +public interface ICoordinator +{ + Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues); + Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues); + + Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + IInstance instance(); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java new file mode 100644 index 0000000..dec3549 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.shared.NetworkTopology; + +import java.util.UUID; +import java.util.concurrent.Future; + +// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader) +public interface IInstance extends IIsolatedExecutor +{ + ICoordinator coordinator(); + IListen listen(); + + void schemaChangeInternal(String query); + public Object[][] executeInternal(String query, Object... args); + + IInstanceConfig config(); + NetworkTopology.AddressAndPort broadcastAddress(); + UUID schemaVersion(); + + void startup(); + boolean isShutdown(); + Future<Void> shutdown(); + Future<Void> shutdown(boolean graceful); + + int liveMemberCount(); + + int nodetool(String... commandAndArgs); + void uncaughtException(Thread t, Throwable e); + + /** + * Return the number of times the instance tried to call {@link System#exit(int)}. + * + * When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1} + * to indicate "unknown". + */ + long killAttempts(); + + // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface + void startup(ICluster cluster); + void receiveMessage(IMessage message); + + int getMessagingVersion(); + void setMessagingVersion(NetworkTopology.AddressAndPort addressAndPort, int version); + + void flush(String keyspace); + void forceCompact(String keyspace, String table); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java new file mode 100644 index 0000000..05969f8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Function; + +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.shared.Versions; + +public interface IInstanceConfig +{ + IInstanceConfig with(Feature featureFlag); + IInstanceConfig with(Feature... flags); + + int num(); + UUID hostId(); + NetworkTopology.AddressAndPort broadcastAddress(); + NetworkTopology networkTopology(); + + String localRack(); + String localDatacenter(); + + /** + * write the specified parameters to the Config object; we do not specify Config as the type to support a Config + * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but + * must use the reflection API to modify the state + */ + void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor); + + /** + * Validates whether the config properties are within range of accepted values. + */ + void validate(); + IInstanceConfig set(String fieldName, Object value); + Object get(String fieldName); + String getString(String fieldName); + int getInt(String fieldName); + boolean has(Feature featureFlag); + + public IInstanceConfig forVersion(Versions.Major major); + + public static class ParameterizedClass + { + public static final String CLASS_NAME = "class_name"; + public static final String PARAMETERS = "parameters"; + + public String class_name; + public Map<String, String> parameters; + + public ParameterizedClass(String class_name, Map<String, String> parameters) + { + this.class_name = class_name; + this.parameters = parameters; + } + + @SuppressWarnings("unchecked") + public ParameterizedClass(Map<String, ?> p) + { + this((String)p.get(CLASS_NAME), + p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null); + } + + @Override + public boolean equals(Object that) + { + return that instanceof ParameterizedClass && equals((ParameterizedClass) that); + } + + public boolean equals(ParameterizedClass that) + { + return Objects.equals(class_name, that.class_name) && Objects.equals(parameters, that.parameters); + } + + @Override + public String toString() + { + return class_name + (parameters == null ? "" : parameters.toString()); + } + } + +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java new file mode 100644 index 0000000..1dbc4cc --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.Serializable; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.cassandra.distributed.api.IInstance; + +/** + * This version is only supported for a Cluster running the same code as the test environment, and permits + * ergonomic cross-node behaviours, without editing the cross-version API. + * + * A lambda can be written tto be invoked on any or all of the nodes. + * + * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will + * not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given + * any code divergence. + */ +public interface IInvokableInstance extends IInstance +{ + public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); } + public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); } + public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); } + + public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); } + public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); } + public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); } + + public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); } + public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); } + + public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); } + public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); } + + public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); } + public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); } + + public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); } + public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); } + + public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); } + public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); } + + public <E extends Serializable> E transfer(E object); + +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java new file mode 100644 index 0000000..d811b17 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Represents a clean way to handoff evaluation of some work to an executor associated + * with a node's lifetime. + * + * There is no transfer of execution to the parallel class hierarchy. + * + * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class + * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself. + * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary. + */ +public interface IIsolatedExecutor +{ + public interface CallableNoExcept<O> extends Callable<O> { public O call(); } + public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { } + public interface SerializableRunnable extends Runnable, Serializable {} + public interface SerializableConsumer<O> extends Consumer<O>, Serializable {} + public interface SerializableSupplier<O> extends Supplier<O>, Serializable {} + public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {} + public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {} + public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {} + public interface TriFunction<I1, I2, I3, O> + { + O apply(I1 i1, I2 i2, I3 i3); + } + public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { } + + Future<Void> shutdown(); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <O> CallableNoExcept<O> sync(CallableNoExcept<O> call); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + CallableNoExcept<Future<?>> async(Runnable run); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + Runnable sync(Runnable run); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I> Function<I, Future<?>> async(Consumer<I> consumer); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I> Consumer<I> sync(Consumer<I> consumer); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I, O> Function<I, Future<O>> async(Function<I, O> f); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I, O> Function<I, O> sync(Function<I, O> f); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f); + +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java new file mode 100644 index 0000000..c2e8dd6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public interface IListen +{ + public interface Cancel { void cancel(); } + + Cancel schema(Runnable onChange); + + Cancel liveMembers(Runnable onChange); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java new file mode 100644 index 0000000..da536cc --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.shared.NetworkTopology; + +import java.io.Serializable; + +/** + * A cross-version interface for delivering internode messages via message sinks. + * + * Message implementations should be serializable so we could load into instances. + */ +public interface IMessage extends Serializable +{ + int verb(); + byte[] bytes(); + // TODO: need to make this a long + int id(); + int version(); + NetworkTopology.AddressAndPort from(); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java new file mode 100644 index 0000000..01fe972 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public interface IMessageFilters +{ + public interface Filter + { + Filter off(); + Filter on(); + } + + public interface Builder + { + Builder from(int ... nums); + Builder to(int ... nums); + + /** + * Every message for which matcher returns `true` will be _dropped_ (assuming all + * other matchers in the chain will return `true` as well). + */ + Builder messagesMatching(Matcher filter); + Filter drop(); + } + + public interface Matcher + { + boolean matches(int from, int to, IMessage message); + } + + Builder verbs(int... verbs); + Builder allVerbs(); + void reset(); + + /** + * {@code true} value returned by the implementation implies that the message was + * not matched by any filters and therefore should be delivered. + */ + boolean permit(int from, int to, IMessage msg); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java new file mode 100644 index 0000000..da864e0 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.shared.Versions; + +// this lives outside the api package so that we do not have to worry about inter-version compatibility +public interface IUpgradeableInstance extends IInstance +{ + // only to be invoked while the node is shutdown! + public void setVersion(Versions.Version version); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java new file mode 100644 index 0000000..a714cd5 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public interface TokenSupplier { + long token(int nodeId); + + static TokenSupplier evenlyDistributedTokens(int numNodes) { + long increment = (Long.MAX_VALUE / numNodes) * 2; + return (int nodeId) -> { + assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy", + nodeId, numNodes); + return Long.MIN_VALUE + 1 + nodeId * increment; + }; + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java new file mode 100644 index 0000000..6f6adfb --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens; + +public abstract class Builder<I extends IInstance, C extends ICluster> { + + private final int BROADCAST_PORT = 7012; + + public interface Factory<I extends IInstance, C extends ICluster> { + C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader); + } + + private final Factory<I, C> factory; + private int nodeCount; + private int subnet; + private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology; + private TokenSupplier tokenSupplier; + private File root; + private Versions.Version version; + private Consumer<IInstanceConfig> configUpdater; + + public Builder(Factory<I, C> factory) { + this.factory = factory; + } + + public C start() throws IOException { + C cluster = createWithoutStarting(); + cluster.startup(); + return cluster; + } + + public C createWithoutStarting() throws IOException { + if (root == null) + root = Files.createTempDirectory("dtests").toFile(); + + if (nodeCount <= 0) + throw new IllegalStateException("Cluster must have at least one node"); + + if (nodeIdTopology == null) { + nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0)))); + } + + root.mkdirs(); + + ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); + + List<IInstanceConfig> configs = new ArrayList<>(); + + // TODO: make token allocation strategy configurable + if (tokenSupplier == null) + tokenSupplier = evenlyDistributedTokens(nodeCount); + + for (int i = 0; i < nodeCount; ++i) { + int nodeNum = i + 1; + configs.add(createInstanceConfig(nodeNum)); + } + + return factory.newCluster(root, version, configs, sharedClassLoader); + } + + public IInstanceConfig newInstanceConfig(C cluster) { + return createInstanceConfig(cluster.size() + 1); + } + + protected IInstanceConfig createInstanceConfig(int nodeNum) { + String ipPrefix = "127.0." + subnet + "."; + String seedIp = ipPrefix + "1"; + String ipAddress = ipPrefix + nodeNum; + long token = tokenSupplier.token(nodeNum); + + NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology); + + IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp); + if (configUpdater != null) + configUpdater.accept(config); + + return config; + } + + protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp); + + public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) { + this.tokenSupplier = tokenSupplier; + return this; + } + + public Builder<I, C> withSubnet(int subnet) { + this.subnet = subnet; + return this; + } + + public Builder<I, C> withNodes(int nodeCount) { + this.nodeCount = nodeCount; + return this; + } + + public Builder<I, C> withDCs(int dcCount) { + return withRacks(dcCount, 1); + } + + public Builder<I, C> withRacks(int dcCount, int racksPerDC) { + if (nodeCount == 0) + throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder"); + + int totalRacks = dcCount * racksPerDC; + int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer + return withRacks(dcCount, racksPerDC, nodesPerRack); + } + + public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) { + if (nodeIdTopology != null) + throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls"); + + nodeIdTopology = new HashMap<>(); + int nodeId = 1; + for (int dc = 1; dc <= dcCount; dc++) { + for (int rack = 1; rack <= racksPerDC; rack++) { + for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++) + nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack))); + } + } + // adjust the node count to match the allocatation + final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack; + if (adjustedNodeCount != nodeCount) { + assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count"; + System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s", + dcCount, racksPerDC, nodesPerRack, adjustedNodeCount)); + nodeCount = adjustedNodeCount; + } + return this; + } + + public Builder<I, C> withDC(String dcName, int nodeCount) { + return withRack(dcName, rackName(1), nodeCount); + } + + public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) { + if (nodeIdTopology == null) { + if (nodeCount > 0) + throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks"); + + nodeIdTopology = new HashMap<>(); + } + for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++) + nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName)); + + nodeCount += nodesInRack; + return this; + } + + // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount + public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) { + if (nodeIdTopology.isEmpty()) + throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId."); + + IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> { + if (nodeIdTopology.get(nodeId) == null) + throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId); + }); + + if (nodeCount != nodeIdTopology.size()) { + nodeCount = nodeIdTopology.size(); + System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount)); + } + + this.nodeIdTopology = new HashMap<>(nodeIdTopology); + + return this; + } + + public Builder<I, C> withRoot(File root) { + this.root = root; + return this; + } + + public Builder<I, C> withVersion(Versions.Version version) { + this.version = version; + return this; + } + + public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) { + this.configUpdater = updater; + return this; + } + + static String dcName(int index) + { + return "datacenter" + index; + } + + static String rackName(int index) + { + return "rack" + index; + } +} + + diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java new file mode 100644 index 0000000..89dc0cd --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +public abstract class DistributedTestBase +{ + @After + public void afterEach() + { + System.runFinalization(); + System.gc(); + } + + public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder(); + + public static String KEYSPACE = "distributed_test_keyspace"; + + // TODO: move this to Cluster.java? + public static void nativeLibraryWorkaround() + { + // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask, + // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask, + // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader. + System.setProperty("cassandra.disable_tcactive_openssl", "true"); + System.setProperty("io.netty.transport.noNative", "true"); + } + + public static void processReaperWorkaround() throws Throwable { + // Make sure the 'process reaper' thread is initially created under the main classloader, + // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader + // which prevents it from being garbage collected. + new ProcessBuilder().command("true").start().waitFor(); + } + + public static void setupLogging() + { + try + { + File root = Files.createTempDirectory("in-jvm-dtest").toFile(); + root.deleteOnExit(); + String testConfPath = "test/conf/logback-dtest.xml"; + Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml"); + + if (!logConfPath.toFile().exists()) + { + Files.copy(new File(testConfPath).toPath(), + logConfPath); + } + + System.setProperty("logback.configurationFile", "file://" + logConfPath); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @BeforeClass + public static void setup() throws Throwable { + setupLogging(); + System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000)); + System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); + nativeLibraryWorkaround(); + processReaperWorkaround(); + } + + public static String withKeyspace(String replaceIn) + { + return String.format(replaceIn, KEYSPACE); + } + + protected static <C extends ICluster<?>> C init(C cluster) + { + return init(cluster, cluster.size()); + } + + protected static <C extends ICluster<?>> C init(C cluster, int replicationFactor) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};"); + return cluster; + } + + public static void assertRows(Object[][] actual, Object[]... expected) + { + Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected), + expected.length, actual.length); + + for (int i = 0; i < expected.length; i++) + { + Object[] expectedRow = expected[i]; + Object[] actualRow = actual[i]; + Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected), + Arrays.equals(expectedRow, actualRow)); + } + } + + public static void assertRow(Object[] actual, Object... expected) + { + Assert.assertTrue(rowNotEqualErrorMessage(actual, expected), + Arrays.equals(actual, expected)); + } + + public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected) + { + while (actual.hasNext() && expected.hasNext()) + assertRow(actual.next(), expected.next()); + + Assert.assertEquals("Resultsets have different sizes", actual.hasNext(), expected.hasNext()); + } + + public static void assertRows(Iterator<Object[]> actual, Object[]... expected) + { + assertRows(actual, new Iterator<Object[]>() { + + int i = 0; + @Override + public boolean hasNext() { + return i < expected.length; + } + + @Override + public Object[] next() { + return expected[i++]; + } + }); + } + + public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected) + { + return String.format("Expected: %s\nActual:%s\n", + Arrays.toString(expected), + Arrays.toString(actual)); + } + + public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected) + { + return String.format("Expected: %s\nActual: %s\n", + rowsToString(expected), + rowsToString(actual)); + } + + public static String rowsToString(Object[][] rows) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + boolean isFirst = true; + for (Object[] row : rows) + { + if (isFirst) + isFirst = false; + else + builder.append(","); + builder.append(Arrays.toString(row)); + } + builder.append("]"); + return builder.toString(); + } + + public static Object[][] toObjectArray(Iterator<Object[]> iter) + { + List<Object[]> res = new ArrayList<>(); + while (iter.hasNext()) + res.add(iter.next()); + + return res.toArray(new Object[res.size()][]); + } + + public static Object[] row(Object... expected) + { + return expected; + } + +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java new file mode 100644 index 0000000..c37a520 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.function.Predicate; + +public class InstanceClassLoader extends URLClassLoader +{ + private static final Predicate<String> sharePackage = name -> + name.startsWith("org.apache.cassandra.distributed.api.") + || name.startsWith("org.apache.cassandra.distributed.shared.") + || name.startsWith("sun.") + || name.startsWith("oracle.") + || name.startsWith("com.intellij.") + || name.startsWith("com.sun.") + || name.startsWith("com.oracle.") + || name.startsWith("java.") + || name.startsWith("javax.") + || name.startsWith("jdk.") + || name.startsWith("netscape.") + || name.startsWith("org.xml.sax."); + + private volatile boolean isClosed = false; + private final URL[] urls; + private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected + private final int id; + private final ClassLoader sharedClassLoader; + + public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader) + { + super(urls, null); + this.urls = urls; + this.sharedClassLoader = sharedClassLoader; + this.generation = generation; + this.id = id; + } + + public int getClusterGeneration() + { + return generation; + } + + public int getInstanceId() + { + return id; + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException + { + if (sharePackage.test(name)) + return sharedClassLoader.loadClass(name); + + return loadClassInternal(name); + } + + Class<?> loadClassInternal(String name) throws ClassNotFoundException + { + if (isClosed) + throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name)); + + synchronized (getClassLoadingLock(name)) + { + // First, check if the class has already been loaded + Class<?> c = findLoadedClass(name); + + if (c == null) + c = findClass(name); + + return c; + } + } + + /** + * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node + */ + public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz) + { + return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName()); + } + + public String toString() + { + return "InstanceClassLoader{" + + "generation=" + generation + + ", id = " + id + + ", urls=" + Arrays.toString(urls) + + '}'; + } + + public void close() throws IOException + { + isClosed = true; + super.close(); + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java new file mode 100644 index 0000000..57b8f57 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.cassandra.distributed.api.IMessage; +import org.apache.cassandra.distributed.api.IMessageFilters; + +public class MessageFilters implements IMessageFilters +{ + private final List<Filter> filters = new CopyOnWriteArrayList<>(); + + public boolean permit(int from, int to, IMessage msg) + { + for (Filter filter : filters) + { + if (filter.matches(from, to, msg)) + return false; + } + return true; + } + + public class Filter implements IMessageFilters.Filter + { + final int[] from; + final int[] to; + final int[] verbs; + final Matcher matcher; + + Filter(int[] from, int[] to, int[] verbs, Matcher matcher) + { + if (from != null) + { + from = from.clone(); + Arrays.sort(from); + } + if (to != null) + { + to = to.clone(); + Arrays.sort(to); + } + if (verbs != null) + { + verbs = verbs.clone(); + Arrays.sort(verbs); + } + this.from = from; + this.to = to; + this.verbs = verbs; + this.matcher = matcher; + } + + public int hashCode() + { + return (from == null ? 0 : Arrays.hashCode(from)) + + (to == null ? 0 : Arrays.hashCode(to)) + + (verbs == null ? 0 : Arrays.hashCode(verbs)); + } + + public boolean equals(Object that) + { + return that instanceof Filter && equals((Filter) that); + } + + public boolean equals(Filter that) + { + return Arrays.equals(from, that.from) + && Arrays.equals(to, that.to) + && Arrays.equals(verbs, that.verbs); + } + + public Filter off() + { + filters.remove(this); + return this; + } + + public Filter on() + { + filters.add(this); + return this; + } + + public boolean matches(int from, int to, IMessage msg) + { + return (this.from == null || Arrays.binarySearch(this.from, from) >= 0) + && (this.to == null || Arrays.binarySearch(this.to, to) >= 0) + && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0) + && (this.matcher == null || this.matcher.matches(from, to, msg)); + } + } + + public class Builder implements IMessageFilters.Builder + { + int[] from; + int[] to; + int[] verbs; + Matcher matcher; + + private Builder(int[] verbs) + { + this.verbs = verbs; + } + + public Builder from(int... nums) + { + from = nums; + return this; + } + + public Builder to(int... nums) + { + to = nums; + return this; + } + + public IMessageFilters.Builder messagesMatching(Matcher matcher) + { + this.matcher = matcher; + return this; + } + + public Filter drop() + { + return new Filter(from, to, verbs, matcher).on(); + } + } + + + public Builder verbs(int... verbs) + { + return new Builder(verbs); + } + + @Override + public Builder allVerbs() + { + return new Builder(null); + } + + @Override + public void reset() + { + filters.clear(); + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java new file mode 100644 index 0000000..6eedb16 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.api.IMessage; + +public class MessageFiltersTest +{ + @Test + public void simpleFiltersTest() throws Throwable + { + int VERB1 = 1; + int VERB2 = 2; + int VERB3 = 3; + int i1 = 1; + int i2 = 2; + int i3 = 3; + String MSG1 = "msg1"; + String MSG2 = "msg2"; + + MessageFilters filters = new MessageFilters(); + MessageFilters.Filter filter = filters.allVerbs().from(1).drop(); + + Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1))); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1))); + Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + filter.off(); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + filters.reset(); + + filters.verbs(VERB1).from(1).to(2).drop(); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1))); + Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1))); + + filters.reset(); + AtomicInteger counter = new AtomicInteger(); + filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> { + counter.incrementAndGet(); + return Arrays.equals(msg.bytes(), MSG1.getBytes()); + }).drop(); + Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertEquals(counter.get(), 1); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2))); + Assert.assertEquals(counter.get(), 2); + + // filter chain gets interrupted because a higher level filter returns no match + Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + Assert.assertEquals(counter.get(), 2); + Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1))); + Assert.assertEquals(counter.get(), 2); + filters.reset(); + + filters.allVerbs().from(3, 2).to(2, 1).drop(); + Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1))); + Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1))); + Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1))); + filters.reset(); + + counter.set(0); + filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> { + counter.incrementAndGet(); + return false; + }).drop(); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1))); + Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertEquals(2, counter.get()); + } + + IMessage msg(int verb, String msg) + { + return new IMessage() + { + public int verb() { return verb; } + public byte[] bytes() { return msg.getBytes(); } + public int id() { return 0; } + public int version() { return 0; } + public NetworkTopology.AddressAndPort from() { return null; } + public int fromPort() + { + return 0; + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java new file mode 100644 index 0000000..9a8e8f6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class NetworkTopology +{ + private final Map<AddressAndPort, DcAndRack> map; + + public static class DcAndRack + { + private final String dc; + private final String rack; + + private DcAndRack(String dc, String rack) + { + this.dc = dc; + this.rack = rack; + } + + @Override + public String toString() + { + return "DcAndRack{" + + "dc='" + dc + '\'' + + ", rack='" + rack + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DcAndRack dcAndRack = (DcAndRack) o; + return Objects.equals(dc, dcAndRack.dc) && + Objects.equals(rack, dcAndRack.rack); + } + + @Override + public int hashCode() + { + return Objects.hash(dc, rack); + } + } + + public static class AddressAndPort implements Serializable + { + private final InetAddress address; + private final int port; + + public AddressAndPort(InetAddress address, int port) + { + this.address = address; + this.port = port; + } + + public int getPort() + { + return port; + } + + public InetAddress getAddress() + { + return address; + } + + @Override + public String toString() + { + return "AddressAndPort{" + + "address=" + address + + ", port=" + port + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AddressAndPort that = (AddressAndPort) o; + return port == that.port && + Objects.equals(address, that.address); + } + + @Override + public int hashCode() + { + return Objects.hash(address, port); + } + } + + public static DcAndRack dcAndRack(String dc, String rack) + { + return new DcAndRack(dc, rack); + } + + public static AddressAndPort addressAndPort(InetAddress address, int port) + { + return new AddressAndPort(address, port); + } + + public static AddressAndPort addressAndPort(String address, int port) + { + try + { + return new AddressAndPort(InetAddress.getByName(address), port); + } + catch (UnknownHostException e) + { + throw new IllegalArgumentException("Unknown address '" + address + '\''); + } + } + + private NetworkTopology() + { + map = new HashMap<>(); + } + + @SuppressWarnings("WeakerAccess") + public NetworkTopology(NetworkTopology networkTopology) + { + map = new HashMap<>(networkTopology.map); + } + + public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology) + { + final NetworkTopology topology = new NetworkTopology(); + + for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++) + { + String broadcastAddress = ipPrefix + nodeId; + + DcAndRack dcAndRack = nodeIdTopology.get(nodeId); + if (dcAndRack == null) + throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap"); + + topology.put(addressAndPort(broadcastAddress, broadcastPort), dcAndRack); + } + return topology; + } + + public DcAndRack put(AddressAndPort addressAndPort, DcAndRack value) + { + return map.put(addressAndPort, value); + } + + public String localRack(NetworkTopology.AddressAndPort key) + { + DcAndRack p = map.get(key); + if (p == null) + return null; + return p.rack; + } + + public String localDC(NetworkTopology.AddressAndPort key) + { + DcAndRack p = map.get(key); + if (p == null) + return null; + return p.dc; + } + + public boolean contains(NetworkTopology.AddressAndPort key) + { + return map.containsKey(key); + } + + public String toString() + { + return "NetworkTopology{" + map + '}'; + } + + + public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount, + String dc, + String rack) + { + return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack)); + } + + public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount, + IntFunction<DcAndRack> dcAndRackSupplier) + { + + return IntStream.rangeClosed(1, nodeCount).boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + dcAndRackSupplier::apply)); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java new file mode 100644 index 0000000..01f8d29 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +public interface ThrowingRunnable +{ + public void run() throws Throwable; + + public static Runnable toRunnable(ThrowingRunnable runnable) + { + return () -> { + try + { + runnable.run(); + } + catch (Throwable throwable) + { + throw new RuntimeException(throwable); + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java new file mode 100644 index 0000000..30c0c7c --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class Versions +{ + public static final String PROPERTY_PREFIX = "cassandra."; + + public static URL[] getClassPath() + { + // In Java 9 the default system ClassLoader got changed from URLClassLoader to AppClassLoader which + // does not extend URLClassLoader nor does it give access to the class path array. + // Java requires the system property 'java.class.path' to be setup with the classpath seperated by : + // so this logic parses that string into the URL[] needed to build later + String cp = System.getProperty("java.class.path"); + assert cp != null && !cp.isEmpty(); + String[] split = cp.split(File.pathSeparator); + assert split.length > 0; + URL[] urls = new URL[split.length]; + try + { + for (int i = 0; i < split.length; i++) + urls[i] = Paths.get(split[i]).toUri().toURL(); + } + catch (MalformedURLException e) + { + throw new RuntimeException(e); + } + return urls; + } + + public enum Major + { + v22("2\\.2\\.([0-9]+)"), + v30("3\\.0\\.([0-9]+)"), + v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"), + v4("4\\.([0-9]+)"); + final Pattern pattern; + Major(String verify) + { + this.pattern = Pattern.compile(verify); + } + + static Major fromFull(String version) + { + if (version.isEmpty()) + throw new IllegalArgumentException(version); + switch (version.charAt(0)) + { + case '2': + if (version.startsWith("2.2")) + return v22; + throw new IllegalArgumentException(version); + case '3': + if (version.startsWith("3.0")) + return v30; + return v3X; + case '4': + return v4; + default: + throw new IllegalArgumentException(version); + } + } + + // verify that the version string is valid for this major version + boolean verify(String version) + { + return pattern.matcher(version).matches(); + } + + // sort two strings of the same major version as this enum instance + int compare(String a, String b) + { + Matcher ma = pattern.matcher(a); + Matcher mb = pattern.matcher(a); + if (!ma.matches()) throw new IllegalArgumentException(a); + if (!mb.matches()) throw new IllegalArgumentException(b); + int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1))); + if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null)) + { + if (ma.group(3) != null && mb.group(3) != null) + { + result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3))); + } + else + { + result = ma.group(3) != null ? 1 : -1; + } + } + // sort descending + return -result; + } + } + + public static class Version + { + public final Major major; + public final String version; + public final URL[] classpath; + + public Version(String version, URL[] classpath) + { + this(Major.fromFull(version), version, classpath); + } + public Version(Major major, String version, URL[] classpath) + { + this.major = major; + this.version = version; + this.classpath = classpath; + } + } + + private final Map<Major, List<Version>> versions; + public Versions(Map<Major, List<Version>> versions) + { + this.versions = versions; + } + + public Version get(String full) + { + return versions.get(Major.fromFull(full)) + .stream() + .filter(v -> full.equals(v.version)) + .findFirst() + .orElseThrow(() -> new RuntimeException("No version " + full + " found")); + } + + public Version getLatest(Major major) + { + return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found")); + } + + public static Versions find() + { + final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build"); + final File sourceDirectory = new File(dtestJarDirectory); + System.out.println("Looking for dtest jars in " + sourceDirectory.getAbsolutePath()); + final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar"); + final Map<Major, List<Version>> versions = new HashMap<>(); + for (Major major : Major.values()) + versions.put(major, new ArrayList<>()); + + for (File file : sourceDirectory.listFiles()) + { + Matcher m = pattern.matcher(file.getName()); + if (!m.matches()) + continue; + String version = m.group(1); + Major major = Major.fromFull(version); + versions.get(major).add(new Version(major, version, new URL[] { toURL(file) })); + } + + for (Map.Entry<Major, List<Version>> e : versions.entrySet()) + { + if (e.getValue().isEmpty()) + continue; + Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare)); + System.out.println("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", "))); + } + + return new Versions(versions); + } + + public static URL toURL(File file) + { + try + { + return file.toURI().toURL(); + } + catch (MalformedURLException e) + { + throw new IllegalArgumentException(e); + } + } + +} diff --git a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java new file mode 100644 index 0000000..e61faa6 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.Builder; +import org.apache.cassandra.distributed.shared.NetworkTopology; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE; + +public class BootstrapTest extends TestBaseImpl +{ + + @Test + public void bootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + Builder<IInstance, ICluster> builder = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)); + + Map<Integer, Long> withBootstrap = null; + Map<Integer, Long> naturally = null; + try (ICluster<IInvokableInstance> cluster = builder.withNodes(originalNodeCount).start()) + { + populate(cluster); + + IInstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .newInstanceConfig(cluster); + config.set("auto_bootstrap", true); + + cluster.bootstrap(config).startup(); + + cluster.stream().forEach(instance -> { + instance.nodetool("cleanup", KEYSPACE, "tbl"); + }); + + withBootstrap = count(cluster); + } + + builder = builder.withNodes(expandedNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withConfig(config -> config.with(NETWORK, GOSSIP)); + + try (ICluster cluster = builder.start()) + { + populate(cluster); + naturally = count(cluster); + } + + Assert.assertEquals(withBootstrap, naturally); + } + + public void populate(ICluster cluster) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + for (int i = 0; i < 1000; i++) + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + ConsistencyLevel.QUORUM, + i, i, i); + } + + public Map<Integer, Long> count(ICluster cluster) + { + return IntStream.rangeClosed(1, cluster.size()) + .boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0])); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java new file mode 100644 index 0000000..0ce0e74 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class DistributedReadWritePathTest extends TestBaseImpl +{ + @Test + public void coordinatorReadTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + 1), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + } + } + + @Test + public void largeMessageTest() throws Throwable + { + int largeMessageThreshold = 1024 * 64; + try (ICluster cluster = init(builder().withNodes(2).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))"); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < largeMessageThreshold ; i++) + builder.append('a'); + String s = builder.toString(); + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", + ConsistencyLevel.ALL, + s); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + 1), + row(1, 1, s)); + } + } + + @Test + public void coordinatorWriteTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", + ConsistencyLevel.QUORUM); + + for (int i = 0; i < 3; i++) + { + assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + } + } + + @Test + public void readRepairTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), // ensure node3 in preflist + row(1, 1, 1)); + + // Verify that data got repaired to the third node + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + } + + @Test + public void writeWithSchemaDisagreement() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Exception thrown = null; + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", + ConsistencyLevel.QUORUM); + } + catch (RuntimeException e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); + Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); + } + } + + @Test + public void readWithSchemaDisagreement() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Exception thrown = null; + try + { + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1, null)); + } + catch (Exception e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); + Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); + } + } + + @Test + public void simplePagedReadsTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + int size = 100; + Object[][] results = new Object[size][]; + for (int i = 0; i < size; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + ConsistencyLevel.QUORUM, + i, i); + results[i] = new Object[] { 1, i, i}; + } + + // Make sure paged read returns same results with different page sizes + for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50}) + { + assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", + ConsistencyLevel.QUORUM, + pageSize), + results); + } + } + } + + @Test + public void pagingWithRepairTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + int size = 100; + Object[][] results = new Object[size][]; + for (int i = 0; i < size; i++) + { + // Make sure that data lands on different nodes and not coordinator + cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + i, i); + + results[i] = new Object[] { 1, i, i}; + } + + // Make sure paged read returns same results with different page sizes + for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50}) + { + assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", + ConsistencyLevel.ALL, + pageSize), + results); + } + + assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"), + results); + } + } + + @Test + public void pagingTests() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start()); + ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + for (int i = 0; i < 10; i++) + { + for (int j = 0; j < 10; j++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + ConsistencyLevel.QUORUM, + i, j, i + i); + singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + ConsistencyLevel.QUORUM, + i, j, i + i); + } + } + + int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50}; + String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2", + "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3", + "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)", + "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2" + }; + for (String statement : statements) + { + for (int pageSize : pageSizes) + { + assertRows(cluster.coordinator(1) + .executeWithPaging(statement, + ConsistencyLevel.QUORUM, pageSize), + singleNode.coordinator(1) + .executeWithPaging(statement, + ConsistencyLevel.QUORUM, Integer.MAX_VALUE)); + } + } + } + } +} + diff --git a/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java b/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java new file mode 100644 index 0000000..e3d3c68 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ICluster; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class GossipSettlesTest extends TestBaseImpl +{ + + @Test + public void testGossipSettles() throws Throwable + { + /* Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails */ + try (ICluster cluster = builder().withNodes(3) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .withSubnet(1) + .start()) + { + } + } + +} diff --git a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java new file mode 100644 index 0000000..3c3ee47 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; + +import static java.util.concurrent.TimeUnit.SECONDS; + +public class LargeColumnTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(LargeColumnTest.class); + + private static String str(int length, Random random, long seed) + { + random.setSeed(seed); + char[] chars = new char[length]; + int i = 0; + int s = 0; + long v = 0; + while (i < length) + { + if (s == 0) + { + v = random.nextLong(); + s = 8; + } + chars[i] = (char) (((v & 127) + 32) & 127); + v >>= 8; + --s; + ++i; + } + return new String(chars); + } + + private void testLargeColumns(int nodes, int columnSize, int rowCount) throws Throwable + { + Random random = new Random(); + long seed = ThreadLocalRandom.current().nextLong(); + logger.info("Using seed {}", seed); + + try (ICluster cluster = init(builder() + .withNodes(nodes) + .withConfig(config -> + config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20) + .set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", columnSize * 2) + .set("internode_application_send_queue_reserve_global_capacity_in_bytes", columnSize * 3) + .set("write_request_timeout_in_ms", SECONDS.toMillis(30L)) + .set("read_request_timeout_in_ms", SECONDS.toMillis(30L)) + .set("memtable_heap_space_in_mb", 1024) + ) + .start())) + { + cluster.schemaChange(String.format("CREATE TABLE %s.cf (k int, c text, PRIMARY KEY (k))", KEYSPACE)); + + for (int i = 0; i < rowCount; ++i) + cluster.coordinator(1).execute(String.format("INSERT INTO %s.cf (k, c) VALUES (?, ?);", KEYSPACE), ConsistencyLevel.ALL, i, str(columnSize, random, seed | i)); + + for (int i = 0; i < rowCount; ++i) + { + Object[][] results = cluster.coordinator(1).execute(String.format("SELECT k, c FROM %s.cf WHERE k = ?;", KEYSPACE), ConsistencyLevel.ALL, i); + Assert.assertTrue(str(columnSize, random, seed | i).equals(results[0][1])); + } + } + } + + @Test + public void test() throws Throwable + { + testLargeColumns(2, 16 << 20, 5); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java new file mode 100644 index 0000000..c7e9b26 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.apache.cassandra.distributed.impl.RowUtil; +import org.junit.Assert; +import org.junit.Test; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.distributed.api.ICluster; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class NativeProtocolTest extends TestBaseImpl +{ + + @Test + public void withClientRequests() throws Throwable + { + try (ICluster ignored = init(builder().withNodes(3) + .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) + .start())) + { + + try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); + Session session = cluster.connect()) + { + session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"); + session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);"); + Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL); + final ResultSet resultSet = session.execute(select); + assertRows(RowUtil.toObjects(resultSet), row(1, 1, 1)); + Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size()); + } + } + } + + @Test + public void withCounters() throws Throwable + { + try (ICluster ignored = init(builder().withNodes(3) + .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) + .start())) + { + final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); + Session session = cluster.connect(); + session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck counter, PRIMARY KEY (pk));"); + session.execute("UPDATE " + KEYSPACE + ".tbl set ck = ck + 10 where pk = 1;"); + Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL); + final ResultSet resultSet = session.execute(select); + assertRows(RowUtil.toObjects(resultSet), row(1, 10L)); + Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size()); + session.close(); + cluster.close(); + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java new file mode 100644 index 0000000..53154e3 --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.NetworkTopology; + +public class NetworkTopologyTest extends TestBaseImpl +{ + @Test + public void namedDcTest() throws Throwable + { + try (ICluster<IInvokableInstance> cluster = builder() + .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0"))) + .withRack("elsewhere", "firstrack", 1) + .withRack("elsewhere", "secondrack", 2) + .withDC("nearthere", 4) + .start()) + { + Assert.assertEquals(1, cluster.stream("somewhere").count()); + Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count()); + Assert.assertEquals(2, cluster.stream("elsewhere", "secondrack").count()); + Assert.assertEquals(3, cluster.stream("elsewhere").count()); + Assert.assertEquals(4, cluster.stream("nearthere").count()); + + Set<IInstance> expect = cluster.stream().collect(Collectors.toSet()); + Set<IInstance> result = Stream.concat(Stream.concat(cluster.stream("somewhere"), + cluster.stream("elsewhere")), + cluster.stream("nearthere")).collect(Collectors.toSet()); + Assert.assertEquals(expect, result); + } + } + + @Test + public void automaticNamedDcTest() throws Throwable + + { + try (ICluster cluster = builder() + .withRacks(2, 1, 3) + .start()) + { + Assert.assertEquals(6, cluster.stream().count()); + Assert.assertEquals(3, cluster.stream("datacenter1").count()); + Assert.assertEquals(3, cluster.stream("datacenter2", "rack1").count()); + } + } + + @Test(expected = IllegalStateException.class) + public void noCountsAfterNamingDCsTest() + { + builder().withDC("nameddc", 1) + .withDCs(1); + } + + @Test(expected = IllegalStateException.class) + public void mustProvideNodeCountBeforeWithDCsTest() + { + builder().withDCs(1); + } + + @Test(expected = IllegalStateException.class) + public void noEmptyNodeIdTopologyTest() + { + builder().withNodeIdTopology(Collections.emptyMap()); + } + + @Test(expected = IllegalStateException.class) + public void noHolesInNodeIdTopologyTest() + { + builder().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack"))); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java new file mode 100644 index 0000000..70790bc --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; + +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class SimpleReadWriteTest extends TestBaseImpl +{ + @Test + public void coordinatorReadTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + 1), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + } + } + + @Test + public void coordinatorWriteTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", + ConsistencyLevel.QUORUM); + + for (int i = 0; i < 3; i++) + { + assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + } + } + + @Test + public void readRepairTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), // ensure node3 in preflist + row(1, 1, 1)); + + // Verify that data got repaired to the third node + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + } + + @Test + public void writeWithSchemaDisagreement() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Exception thrown = null; + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", + ConsistencyLevel.QUORUM); + } + catch (RuntimeException e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); + Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); + } + } + + @Test + public void readWithSchemaDisagreement() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Exception thrown = null; + try + { + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1, null)); + } + catch (Exception e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")); + Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3")); + } + } + + @Test + public void simplePagedReadsTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + int size = 100; + Object[][] results = new Object[size][]; + for (int i = 0; i < size; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + ConsistencyLevel.QUORUM, + i, i); + results[i] = new Object[] { 1, i, i}; + } + + // Make sure paged read returns same results with different page sizes + for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50}) + { + assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", + ConsistencyLevel.QUORUM, + pageSize), + results); + } + } + } + + @Test + public void pagingWithRepairTest() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + int size = 100; + Object[][] results = new Object[size][]; + for (int i = 0; i < size; i++) + { + // Make sure that data lands on different nodes and not coordinator + cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + i, i); + + results[i] = new Object[] { 1, i, i}; + } + + // Make sure paged read returns same results with different page sizes + for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50}) + { + assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", + ConsistencyLevel.ALL, + pageSize), + results); + } + + assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"), + results); + } + } + + @Test + public void pagingTests() throws Throwable + { + try (ICluster cluster = init(builder().withNodes(3).start()); + ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + for (int i = 0; i < 10; i++) + { + for (int j = 0; j < 10; j++) + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + ConsistencyLevel.QUORUM, + i, j, i + i); + singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", + ConsistencyLevel.QUORUM, + i, j, i + i); + } + } + + int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50}; + String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2", + "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2", + "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3", + "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)", + "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2" + }; + for (String statement : statements) + { + for (int pageSize : pageSizes) + { + assertRows(cluster.coordinator(1) + .executeWithPaging(statement, + ConsistencyLevel.QUORUM, pageSize), + singleNode.coordinator(1) + .executeWithPaging(statement, + ConsistencyLevel.QUORUM, Integer.MAX_VALUE)); + } + } + + } + } +} diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml new file mode 100644 index 0000000..370e1e5 --- /dev/null +++ b/test/conf/logback-dtest.xml @@ -0,0 +1,77 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + + <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>20</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>20MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + </encoder> + <immediateFlush>false</immediateFlush> + </appender> + + <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender"> + <discardingThreshold>0</discardingThreshold> + <maxFlushTime>0</maxFlushTime> + <queueSize>1024</queueSize> + <appender-ref ref="INSTANCEFILE"/> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> + + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <logger name="org.apache.hadoop" level="WARN"/> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEASYNCFILE" /> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> + </root> +</configuration> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org