This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
The following commit(s) were added to refs/heads/master by this push: new a562fd5 Introduce the extracted in-JVM DTest API a562fd5 is described below commit a562fd56b302e0573b2af9371aa948689714dcbc Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Feb 24 12:06:09 2020 +0100 Introduce the extracted in-JVM DTest API Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-15539. --- pom.xml | 99 +++++++++ .../distributed/api/ConsistencyLevel.java | 34 +++ .../apache/cassandra/distributed/api/Feature.java | 24 +++ .../apache/cassandra/distributed/api/ICluster.java | 98 +++++++++ .../cassandra/distributed/api/ICoordinator.java | 38 ++++ .../cassandra/distributed/api/IInstance.java | 72 +++++++ .../cassandra/distributed/api/IInstanceConfig.java | 102 +++++++++ .../distributed/api/IInvokableInstance.java | 67 ++++++ .../distributed/api/IIsolatedExecutor.java | 128 +++++++++++ .../apache/cassandra/distributed/api/IListen.java | 28 +++ .../apache/cassandra/distributed/api/IMessage.java | 37 ++++ .../cassandra/distributed/api/IMessageFilters.java | 100 +++++++++ .../distributed/api/IUpgradeableInstance.java | 29 +++ .../cassandra/distributed/api/LongTokenRange.java | 38 ++++ .../cassandra/distributed/api/NodeToolResult.java | 182 ++++++++++++++++ .../cassandra/distributed/api/QueryResult.java | 139 ++++++++++++ .../org/apache/cassandra/distributed/api/Row.java | 110 ++++++++++ .../cassandra/distributed/api/TokenSupplier.java | 32 +++ .../cassandra/distributed/shared/AssertUtils.java | 130 ++++++++++++ .../cassandra/distributed/shared/Builder.java | 233 +++++++++++++++++++++ .../distributed/shared/DistributedTestBase.java | 57 +++++ .../distributed/shared/InstanceClassLoader.java | 116 ++++++++++ .../distributed/shared/MessageFilters.java | 194 +++++++++++++++++ .../distributed/shared/NetworkTopology.java | 169 +++++++++++++++ .../distributed/shared/ThrowingRunnable.java | 38 ++++ .../cassandra/distributed/shared/Versions.java | 206 ++++++++++++++++++ test/conf/logback-dtest.xml | 77 +++++++ 27 files changed, 2577 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..61fe4b8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>23</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.cassandra</groupId> + <artifactId>dtest-api</artifactId> + <version>0.0.2-SNAPSHOT</version> + <name>In JVM Test API</name> + <description>In JVM Test API</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <licenses> + <license> + <name>Apache License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>in-jvm-dtest-cassandra-tryout</artifactId> + <version>0.0.1-2.2-1</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + + + <plugin> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.8.2</version> + <executions> + <execution> + <id>default-deploy</id> + <phase>deploy</phase> + <goals> + <goal>deploy</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>sign-artifacts</id> + <phase>verify</phase> + <goals> + <goal>sign</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <scm> + <connection>scm:git:https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git</connection> + <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git</developerConnection> + <url>https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git</url> + <tag>0.0.4</tag> + </scm> +</project> + diff --git a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java new file mode 100644 index 0000000..3c057f8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public enum ConsistencyLevel { + ANY, + ONE, + TWO, + THREE, + QUORUM, + ALL, + LOCAL_QUORUM, + EACH_QUORUM, + SERIAL, + LOCAL_SERIAL, + LOCAL_ONE, + NODE_LOCAL +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/Feature.java b/src/main/java/org/apache/cassandra/distributed/api/Feature.java new file mode 100644 index 0000000..b4ba036 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/Feature.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public enum Feature +{ + NETWORK, GOSSIP, NATIVE_PROTOCOL +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java new file mode 100644 index 0000000..dffd980 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + +public interface ICluster<I extends IInstance> extends AutoCloseable { + void startup(); + + I bootstrap(IInstanceConfig config); + + I get(int i); + + I get(InetSocketAddress endpoint); + + ICoordinator coordinator(int node); + + void schemaChange(String query); + + void schemaChange(String statement, int instance); + + int size(); + + Stream<I> stream(); + + Stream<I> stream(String dcName); + + Stream<I> stream(String dcName, String rackName); + + IMessageFilters filters(); + + static void setup() throws Throwable { + setupLogging(); + setSystemProperties(); + nativeLibraryWorkaround(); + processReaperWorkaround(); + } + + static void nativeLibraryWorkaround() { + // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask, + // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask, + // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader. + System.setProperty("cassandra.disable_tcactive_openssl", "true"); + System.setProperty("io.netty.transport.noNative", "true"); + } + + static void processReaperWorkaround() throws Throwable { + // Make sure the 'process reaper' thread is initially created under the main classloader, + // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader + // which prevents it from being garbage collected. + new ProcessBuilder().command("true").start().waitFor(); + } + + static void setSystemProperties() { + System.setProperty("cassandra.ring_delay_ms", Integer.toString(30 * 1000)); + System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); + } + + static void setupLogging() { + try { + File root = Files.createTempDirectory("in-jvm-dtest").toFile(); + root.deleteOnExit(); + String testConfPath = "test/conf/logback-dtest.xml"; + Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml"); + + if (!logConfPath.toFile().exists()) { + Files.copy(new File(testConfPath).toPath(), + logConfPath); + } + + System.setProperty("logback.configurationFile", "file://" + logConfPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java new file mode 100644 index 0000000..3d07a3d --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.Future; + +// The cross-version API requires that a Coordinator can be constructed without any constructor arguments +public interface ICoordinator +{ + default Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays(); + } + QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues); + Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues); + + Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + IInstance instance(); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java new file mode 100644 index 0000000..0dd4865 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.net.InetSocketAddress; +import java.util.UUID; +import java.util.concurrent.Future; + +// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader) +public interface IInstance extends IIsolatedExecutor +{ + ICoordinator coordinator(); + IListen listen(); + + void schemaChangeInternal(String query); + public Object[][] executeInternal(String query, Object... args); + + IInstanceConfig config(); + InetSocketAddress broadcastAddress(); + UUID schemaVersion(); + + void startup(); + boolean isShutdown(); + Future<Void> shutdown(); + Future<Void> shutdown(boolean graceful); + + int liveMemberCount(); + + NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs); + default NodeToolResult nodetoolResult(String... commandAndArgs) + { + return nodetoolResult(true, commandAndArgs); + } + default int nodetool(String... commandAndArgs) { + return nodetoolResult(commandAndArgs).getRc(); + } + void uncaughtException(Thread t, Throwable e); + + /** + * Return the number of times the instance tried to call {@link System#exit(int)}. + * + * When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1} + * to indicate "unknown". + */ + long killAttempts(); + + // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface + void startup(ICluster cluster); + void receiveMessage(IMessage message); + + int getMessagingVersion(); + void setMessagingVersion(InetSocketAddress addressAndPort, int version); + + void flush(String keyspace); + void forceCompact(String keyspace, String table); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java new file mode 100644 index 0000000..97c1534 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Function; + +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.shared.Versions; + +public interface IInstanceConfig +{ + IInstanceConfig with(Feature featureFlag); + IInstanceConfig with(Feature... flags); + + int num(); + UUID hostId(); + InetSocketAddress broadcastAddress(); + NetworkTopology networkTopology(); + + String localRack(); + String localDatacenter(); + + /** + * write the specified parameters to the Config object; we do not specify Config as the type to support a Config + * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but + * must use the reflection API to modify the state + */ + void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor); + + /** + * Validates whether the config properties are within range of accepted values. + */ + void validate(); + IInstanceConfig set(String fieldName, Object value); + Object get(String fieldName); + String getString(String fieldName); + int getInt(String fieldName); + boolean has(Feature featureFlag); + + public IInstanceConfig forVersion(Versions.Major major); + + public static class ParameterizedClass + { + public static final String CLASS_NAME = "class_name"; + public static final String PARAMETERS = "parameters"; + + public String class_name; + public Map<String, String> parameters; + + public ParameterizedClass(String class_name, Map<String, String> parameters) + { + this.class_name = class_name; + this.parameters = parameters; + } + + @SuppressWarnings("unchecked") + public ParameterizedClass(Map<String, ?> p) + { + this((String)p.get(CLASS_NAME), + p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null); + } + + @Override + public boolean equals(Object that) + { + return that instanceof ParameterizedClass && equals((ParameterizedClass) that); + } + + public boolean equals(ParameterizedClass that) + { + return Objects.equals(class_name, that.class_name) && Objects.equals(parameters, that.parameters); + } + + @Override + public String toString() + { + return class_name + (parameters == null ? "" : parameters.toString()); + } + } + +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java new file mode 100644 index 0000000..1dbc4cc --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.Serializable; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.cassandra.distributed.api.IInstance; + +/** + * This version is only supported for a Cluster running the same code as the test environment, and permits + * ergonomic cross-node behaviours, without editing the cross-version API. + * + * A lambda can be written tto be invoked on any or all of the nodes. + * + * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will + * not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given + * any code divergence. + */ +public interface IInvokableInstance extends IInstance +{ + public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); } + public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); } + public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); } + + public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); } + public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); } + public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); } + + public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); } + public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); } + + public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); } + public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); } + + public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); } + public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); } + + public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); } + public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); } + + public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); } + public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); } + + public <E extends Serializable> E transfer(E object); + +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java new file mode 100644 index 0000000..d811b17 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Represents a clean way to handoff evaluation of some work to an executor associated + * with a node's lifetime. + * + * There is no transfer of execution to the parallel class hierarchy. + * + * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class + * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself. + * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary. + */ +public interface IIsolatedExecutor +{ + public interface CallableNoExcept<O> extends Callable<O> { public O call(); } + public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { } + public interface SerializableRunnable extends Runnable, Serializable {} + public interface SerializableConsumer<O> extends Consumer<O>, Serializable {} + public interface SerializableSupplier<O> extends Supplier<O>, Serializable {} + public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {} + public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {} + public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {} + public interface TriFunction<I1, I2, I3, O> + { + O apply(I1 i1, I2 i2, I3 i3); + } + public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { } + + Future<Void> shutdown(); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <O> CallableNoExcept<O> sync(CallableNoExcept<O> call); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + CallableNoExcept<Future<?>> async(Runnable run); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + Runnable sync(Runnable run); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I> Function<I, Future<?>> async(Consumer<I> consumer); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I> Consumer<I> sync(Consumer<I> consumer); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I, O> Function<I, Future<O>> async(Function<I, O> f); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I, O> Function<I, O> sync(Function<I, O> f); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f); + + /** + * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result + */ + <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f); + + /** + * Convert the execution to one performed synchronously on the IsolatedExecutor + */ + <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f); + +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java new file mode 100644 index 0000000..d21c594 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public interface IListen +{ + interface Cancel { void cancel(); } + + Cancel schema(Runnable onChange); + + Cancel liveMembers(Runnable onChange); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java new file mode 100644 index 0000000..f75861e --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.Serializable; +import java.net.InetSocketAddress; + +/** + * A cross-version interface for delivering internode messages via message sinks. + * + * Message implementations should be serializable so we could load into instances. + */ +public interface IMessage extends Serializable +{ + int verb(); + byte[] bytes(); + // TODO: need to make this a long + int id(); + int version(); + InetSocketAddress from(); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java new file mode 100644 index 0000000..f2cd6ee --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.function.Predicate; + +public interface IMessageFilters +{ + public interface Filter + { + Filter off(); + Filter on(); + } + + public interface Builder + { + Builder from(int ... nums); + Builder to(int ... nums); + + Builder verbs(int... verbs); + Builder allVerbs(); + + Builder inbound(boolean inbound); + + default Builder inbound() + { + return inbound(true); + } + + default Builder outbound() + { + return inbound(false); + } + + /** + * Every message for which matcher returns `true` will be _dropped_ (assuming all + * other matchers in the chain will return `true` as well). + */ + Builder messagesMatching(Matcher filter); + Filter drop(); + } + + public interface Matcher + { + boolean matches(int from, int to, IMessage message); + + static Matcher of(Predicate<IMessage> fn) { + return (from, to, m) -> fn.test(m); + } + } + + Builder inbound(boolean inbound); + default Builder inbound() { + return inbound(true); + } + default Builder outbound() { + return inbound(false); + } + default Builder verbs(int... verbs) { + return inbound().verbs(verbs); + } + default Builder allVerbs() { + return inbound().allVerbs(); + } + void reset(); + + /** + * Checks if the message should be delivered. This is expected to run on "inbound", or on the reciever of + * the message (instance.config.num == to). + * + * @return {@code true} value returned by the implementation implies that the message was + * not matched by any filters and therefore should be delivered. + */ + boolean permitInbound(int from, int to, IMessage msg); + + /** + * Checks if the message should be delivered. This is expected to run on "outbound", or on the sender of + * the message (instance.config.num == from). + * + * @return {@code true} value returned by the implementation implies that the message was + * not matched by any filters and therefore should be delivered. + */ + boolean permitOutbound(int from, int to, IMessage msg); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java new file mode 100644 index 0000000..da864e0 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.shared.Versions; + +// this lives outside the api package so that we do not have to worry about inter-version compatibility +public interface IUpgradeableInstance extends IInstance +{ + // only to be invoked while the node is shutdown! + public void setVersion(Versions.Version version); +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java new file mode 100644 index 0000000..06327e8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.io.Serializable; + +public final class LongTokenRange implements Serializable +{ + public final long minExclusive; + public final long maxInclusive; + + public LongTokenRange(long minExclusive, long maxInclusive) + { + this.minExclusive = minExclusive; + this.maxInclusive = maxInclusive; + } + + public String toString() + { + return "(" + minExclusive + "," + maxInclusive + "]"; + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java new file mode 100644 index 0000000..773f617 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.shared.AssertUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.management.Notification; + +public class NodeToolResult +{ + private final String[] commandAndArgs; + private final int rc; + private final List<Notification> notifications; + private final Throwable error; + + public NodeToolResult(String[] commandAndArgs, int rc, List<Notification> notifications, Throwable error) + { + this.commandAndArgs = commandAndArgs; + this.rc = rc; + this.notifications = notifications; + this.error = error; + } + + public String[] getCommandAndArgs() + { + return commandAndArgs; + } + + public int getRc() + { + return rc; + } + + public List<Notification> getNotifications() + { + return notifications; + } + + public Throwable getError() + { + return error; + } + + public Asserts asserts() + { + return new Asserts(); + } + + public final class Asserts { + public Asserts success() { + AssertUtils.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc); + return this; + } + + public Asserts failure() { + AssertUtils.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc); + return this; + } + + public Asserts errorContains(String msg) { + AssertUtils.assertNotNull("No exception was found but expected one", error); + AssertUtils.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg)); + return this; + } + + public Asserts notificationContains(String msg) { + AssertUtils.assertNotNull("notifications not defined", notifications); + AssertUtils.assertFalse("notifications not defined", notifications.isEmpty()); + for (Notification n : notifications) { + if (n.getMessage().contains(msg)) { + return this; + } + } + AssertUtils.fail("Unable to locate message " + msg + " in notifications: " + notifications); + return this; // unreachable + } + + public Asserts notificationContains(ProgressEventType type, String msg) { + int userType = type.ordinal(); + AssertUtils.assertNotNull("notifications not defined", notifications); + AssertUtils.assertFalse("notifications not defined", notifications.isEmpty()); + for (Notification n : notifications) { + if (notificationType(n) == userType) { + if (n.getMessage().contains(msg)) { + return this; + } + } + } + AssertUtils.fail("Unable to locate message '" + msg + "' in notifications: " + notifications); + return this; // unreachable + } + } + + private static int notificationType(Notification n) + { + return ((Map<String, Integer>) n.getUserData()).get("type").intValue(); + } + + public String toString() + { + return "NodeToolResult{" + + "commandAndArgs=" + Arrays.toString(commandAndArgs) + + ", rc=" + rc + + ", notifications=[" + notifications.stream().map(n -> ProgressEventType.values()[notificationType(n)].name()).collect(Collectors.joining(", ")) + "]" + + ", error=" + error + + '}'; + } + + /** + * Progress event type. + * + * <p> + * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events, + * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}. + * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process. + * </p> + * <p> + * {@link #NOTIFICATION} event type is used to just notify message without progress. + * </p> + */ + public enum ProgressEventType + { + /** + * Fired first when progress starts. + * Happens only once. + */ + START, + + /** + * Fire when progress happens. + * This can be zero or more time after START. + */ + PROGRESS, + + /** + * When observing process completes with error, this is sent once before COMPLETE. + */ + ERROR, + + /** + * When observing process is aborted by user, this is sent once before COMPLETE. + */ + ABORT, + + /** + * When observing process completes successfully, this is sent once before COMPLETE. + */ + SUCCESS, + + /** + * Fire when progress complete. + * This is fired once, after ERROR/ABORT/SUCCESS is fired. + * After this, no more ProgressEvent should be fired for the same event. + */ + COMPLETE, + + /** + * Used when sending message without progress. + */ + NOTIFICATION + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java new file mode 100644 index 0000000..dcdfa14 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Predicate; + +/** + * A table of data representing a complete query result. + * + * A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in several key ways: + * + * <ul> + * <li>represents a complete result rather than a cursor</li> + * <li>returns a {@link Row} to access the current row of data</li> + * <li>relies on object pooling; {@link #hasNext()} may return the same object just with different data, accessing a + * {@link Row} from a previous {@link #hasNext()} call has undefined behavior.</li> + * <li>includes {@link #filter(Predicate)}, this will do client side filtering since Apache Cassandra is more + * restrictive on server side filtering</li> + * </ul> + * + * <h2>Unsafe patterns</h2> + * + * Below are a few unsafe patterns which may lead to unexpected results + * + * <code>{@code + * while (rs.hasNext()) { + * list.add(rs.next()); + * } + * }</code> + * + * <code>{@code + * rs.forEach(list::add) + * }</code> + * + * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row} + * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact + * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()} + * should be used; this will clone the {@link Row} and return a new object pointing to the same data. + */ +public class QueryResult implements Iterator<Row> +{ + public static final QueryResult EMPTY = new QueryResult(new String[0], null); + + private final String[] names; + private final Object[][] results; + private final Predicate<Row> filter; + private final Row row; + private int offset = -1; + + public QueryResult(String[] names, Object[][] results) + { + this.names = Objects.requireNonNull(names, "names"); + this.results = results; + this.row = new Row(names); + this.filter = ignore -> true; + } + + private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset) + { + this.names = names; + this.results = results; + this.filter = filter; + this.offset = offset; + this.row = new Row(names); + } + + public String[] getNames() + { + return names; + } + + public boolean isEmpty() + { + return results.length == 0; + } + + public int size() + { + return results.length; + } + + public QueryResult filter(Predicate<Row> fn) + { + return new QueryResult(names, results, filter.and(fn), offset); + } + + /** + * Get all rows as a 2d array. Any calls to {@link #filter(Predicate)} will be ignored and the array returned will + * be the full set from the query. + */ + public Object[][] toObjectArrays() + { + return results; + } + + @Override + public boolean hasNext() + { + if (results == null) + return false; + while ((offset += 1) < results.length) + { + row.setResults(results[offset]); + if (filter.test(row)) + { + return true; + } + } + row.setResults(null); + return false; + } + + @Override + public Row next() + { + if (offset < 0 || offset >= results.length) + throw new NoSuchElementException(); + return row; + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java new file mode 100644 index 0000000..1dd05fe --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.*; + +/** + * Data representing a single row in a query result. + * + * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls + * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call; + * to get around this, a call to {@link #copy()} will return a new object pointing to the same row. + */ +public class Row +{ + private final Map<String, Integer> nameIndex; + private Object[] results; // mutable to avoid allocations in loops + + public Row(String[] names) + { + Objects.requireNonNull(names, "names"); + this.nameIndex = new HashMap<>(names.length); + for (int i = 0; i < names.length; i++) { + nameIndex.put(names[i], i); + } + } + + private Row(Map<String, Integer> nameIndex) + { + this.nameIndex = nameIndex; + } + + void setResults(Object[] results) + { + this.results = results; + } + + /** + * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}. + */ + public Row copy() { + Row copy = new Row(nameIndex); + copy.setResults(results); + return copy; + } + + public <T> T get(String name) + { + checkAccess(); + int idx = findIndex(name); + if (idx == -1) + return null; + return (T) results[idx]; + } + + public String getString(String name) + { + return get(name); + } + + public UUID getUUID(String name) + { + return get(name); + } + + public Date getTimestamp(String name) + { + return get(name); + } + + public <T> Set<T> getSet(String name) + { + return get(name); + } + + public String toString() + { + return "Row{" + + "names=" + nameIndex.keySet() + + ", results=" + Arrays.toString(results) + + '}'; + } + + private void checkAccess() + { + if (results == null) + throw new NoSuchElementException(); + } + + private int findIndex(String name) + { + return nameIndex.getOrDefault(name, -1); + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java new file mode 100644 index 0000000..a714cd5 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public interface TokenSupplier { + long token(int nodeId); + + static TokenSupplier evenlyDistributedTokens(int numNodes) { + long increment = (Long.MAX_VALUE / numNodes) * 2; + return (int nodeId) -> { + assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy", + nodeId, numNodes); + return Long.MIN_VALUE + 1 + nodeId * increment; + }; + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java new file mode 100644 index 0000000..f914e90 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java @@ -0,0 +1,130 @@ +package org.apache.cassandra.distributed.shared; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +public class AssertUtils { + + public static void assertRows(Object[][] actual, Object[]... expected) + { + assertEquals(rowsNotEqualErrorMessage(actual, expected), + expected.length, actual.length); + + for (int i = 0; i < expected.length; i++) + { + Object[] expectedRow = expected[i]; + Object[] actualRow = actual[i]; + assertTrue(rowsNotEqualErrorMessage(actual, expected), + Arrays.equals(expectedRow, actualRow)); + } + } + + public static void assertRow(Object[] actual, Object... expected) + { + assertTrue(rowNotEqualErrorMessage(actual, expected), + Arrays.equals(actual, expected)); + } + + public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected) + { + while (actual.hasNext() && expected.hasNext()) + assertRow(actual.next(), expected.next()); + + assertTrue("Resultsets have different sizes", actual.hasNext() == expected.hasNext()); + } + + public static void assertRows(Iterator<Object[]> actual, Object[]... expected) + { + assertRows(actual, new Iterator<Object[]>() { + + int i = 0; + @Override + public boolean hasNext() { + return i < expected.length; + } + + @Override + public Object[] next() { + return expected[i++]; + } + }); + } + + public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected) + { + return String.format("Expected: %s\nActual:%s\n", + Arrays.toString(expected), + Arrays.toString(actual)); + } + + public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected) + { + return String.format("Expected: %s\nActual: %s\n", + rowsToString(expected), + rowsToString(actual)); + } + + public static String rowsToString(Object[][] rows) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + boolean isFirst = true; + for (Object[] row : rows) + { + if (isFirst) + isFirst = false; + else + builder.append(","); + builder.append(Arrays.toString(row)); + } + builder.append("]"); + return builder.toString(); + } + + public static Object[][] toObjectArray(Iterator<Object[]> iter) + { + List<Object[]> res = new ArrayList<>(); + while (iter.hasNext()) + res.add(iter.next()); + + return res.toArray(new Object[res.size()][]); + } + + public static Object[] row(Object... expected) + { + return expected; + } + + public static void assertEquals(String message, long expected, long actual) { + if (expected != actual) + fail(message); + } + + public static void assertNotEquals(String message, long expected, long actual) { + if (expected == actual) + fail(message); + } + + public static void assertNotNull(String message, Object object) { + if (object == null) + fail(message); + } + + public static void assertTrue(String message, boolean condition) { + if (!condition) + fail(message); + } + + public static void assertFalse(String message, boolean condition) { + if (condition) + fail(message); + } + + + public static void fail(String message) { + throw new AssertionError(message); + } + +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java new file mode 100644 index 0000000..6f6adfb --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens; + +public abstract class Builder<I extends IInstance, C extends ICluster> { + + private final int BROADCAST_PORT = 7012; + + public interface Factory<I extends IInstance, C extends ICluster> { + C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader); + } + + private final Factory<I, C> factory; + private int nodeCount; + private int subnet; + private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology; + private TokenSupplier tokenSupplier; + private File root; + private Versions.Version version; + private Consumer<IInstanceConfig> configUpdater; + + public Builder(Factory<I, C> factory) { + this.factory = factory; + } + + public C start() throws IOException { + C cluster = createWithoutStarting(); + cluster.startup(); + return cluster; + } + + public C createWithoutStarting() throws IOException { + if (root == null) + root = Files.createTempDirectory("dtests").toFile(); + + if (nodeCount <= 0) + throw new IllegalStateException("Cluster must have at least one node"); + + if (nodeIdTopology == null) { + nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0)))); + } + + root.mkdirs(); + + ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); + + List<IInstanceConfig> configs = new ArrayList<>(); + + // TODO: make token allocation strategy configurable + if (tokenSupplier == null) + tokenSupplier = evenlyDistributedTokens(nodeCount); + + for (int i = 0; i < nodeCount; ++i) { + int nodeNum = i + 1; + configs.add(createInstanceConfig(nodeNum)); + } + + return factory.newCluster(root, version, configs, sharedClassLoader); + } + + public IInstanceConfig newInstanceConfig(C cluster) { + return createInstanceConfig(cluster.size() + 1); + } + + protected IInstanceConfig createInstanceConfig(int nodeNum) { + String ipPrefix = "127.0." + subnet + "."; + String seedIp = ipPrefix + "1"; + String ipAddress = ipPrefix + nodeNum; + long token = tokenSupplier.token(nodeNum); + + NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology); + + IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp); + if (configUpdater != null) + configUpdater.accept(config); + + return config; + } + + protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp); + + public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) { + this.tokenSupplier = tokenSupplier; + return this; + } + + public Builder<I, C> withSubnet(int subnet) { + this.subnet = subnet; + return this; + } + + public Builder<I, C> withNodes(int nodeCount) { + this.nodeCount = nodeCount; + return this; + } + + public Builder<I, C> withDCs(int dcCount) { + return withRacks(dcCount, 1); + } + + public Builder<I, C> withRacks(int dcCount, int racksPerDC) { + if (nodeCount == 0) + throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder"); + + int totalRacks = dcCount * racksPerDC; + int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer + return withRacks(dcCount, racksPerDC, nodesPerRack); + } + + public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) { + if (nodeIdTopology != null) + throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls"); + + nodeIdTopology = new HashMap<>(); + int nodeId = 1; + for (int dc = 1; dc <= dcCount; dc++) { + for (int rack = 1; rack <= racksPerDC; rack++) { + for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++) + nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack))); + } + } + // adjust the node count to match the allocatation + final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack; + if (adjustedNodeCount != nodeCount) { + assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count"; + System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s", + dcCount, racksPerDC, nodesPerRack, adjustedNodeCount)); + nodeCount = adjustedNodeCount; + } + return this; + } + + public Builder<I, C> withDC(String dcName, int nodeCount) { + return withRack(dcName, rackName(1), nodeCount); + } + + public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) { + if (nodeIdTopology == null) { + if (nodeCount > 0) + throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks"); + + nodeIdTopology = new HashMap<>(); + } + for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++) + nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName)); + + nodeCount += nodesInRack; + return this; + } + + // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount + public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) { + if (nodeIdTopology.isEmpty()) + throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId."); + + IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> { + if (nodeIdTopology.get(nodeId) == null) + throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId); + }); + + if (nodeCount != nodeIdTopology.size()) { + nodeCount = nodeIdTopology.size(); + System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount)); + } + + this.nodeIdTopology = new HashMap<>(nodeIdTopology); + + return this; + } + + public Builder<I, C> withRoot(File root) { + this.root = root; + return this; + } + + public Builder<I, C> withVersion(Versions.Version version) { + this.version = version; + return this; + } + + public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) { + this.configUpdater = updater; + return this; + } + + static String dcName(int index) + { + return "datacenter" + index; + } + + static String rackName(int index) + { + return "rack" + index; + } +} + + diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java new file mode 100644 index 0000000..d28af2a --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; + +public abstract class DistributedTestBase +{ + public void afterEach() + { + System.runFinalization(); + System.gc(); + } + + public static void beforeClass() throws Throwable { + ICluster.setup(); + } + + public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder(); + + public static String KEYSPACE = "distributed_test_keyspace"; + + public static String withKeyspace(String replaceIn) + { + return String.format(replaceIn, KEYSPACE); + } + + protected static <C extends ICluster<?>> C init(C cluster) + { + return init(cluster, cluster.size()); + } + + protected static <C extends ICluster<?>> C init(C cluster, int replicationFactor) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};"); + return cluster; + } + + +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java new file mode 100644 index 0000000..c37a520 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.function.Predicate; + +public class InstanceClassLoader extends URLClassLoader +{ + private static final Predicate<String> sharePackage = name -> + name.startsWith("org.apache.cassandra.distributed.api.") + || name.startsWith("org.apache.cassandra.distributed.shared.") + || name.startsWith("sun.") + || name.startsWith("oracle.") + || name.startsWith("com.intellij.") + || name.startsWith("com.sun.") + || name.startsWith("com.oracle.") + || name.startsWith("java.") + || name.startsWith("javax.") + || name.startsWith("jdk.") + || name.startsWith("netscape.") + || name.startsWith("org.xml.sax."); + + private volatile boolean isClosed = false; + private final URL[] urls; + private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected + private final int id; + private final ClassLoader sharedClassLoader; + + public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader) + { + super(urls, null); + this.urls = urls; + this.sharedClassLoader = sharedClassLoader; + this.generation = generation; + this.id = id; + } + + public int getClusterGeneration() + { + return generation; + } + + public int getInstanceId() + { + return id; + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException + { + if (sharePackage.test(name)) + return sharedClassLoader.loadClass(name); + + return loadClassInternal(name); + } + + Class<?> loadClassInternal(String name) throws ClassNotFoundException + { + if (isClosed) + throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name)); + + synchronized (getClassLoadingLock(name)) + { + // First, check if the class has already been loaded + Class<?> c = findLoadedClass(name); + + if (c == null) + c = findClass(name); + + return c; + } + } + + /** + * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node + */ + public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz) + { + return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName()); + } + + public String toString() + { + return "InstanceClassLoader{" + + "generation=" + generation + + ", id = " + id + + ", urls=" + Arrays.toString(urls) + + '}'; + } + + public void close() throws IOException + { + isClosed = true; + super.close(); + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java new file mode 100644 index 0000000..c1731db --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.cassandra.distributed.api.IMessage; +import org.apache.cassandra.distributed.api.IMessageFilters; + +public class MessageFilters implements IMessageFilters +{ + private final List<Filter> inboundFilters = new CopyOnWriteArrayList<>(); + private final List<Filter> outboundFilters = new CopyOnWriteArrayList<>(); + + public boolean permitInbound(int from, int to, IMessage msg) + { + return permit(inboundFilters, from, to, msg); + } + + public boolean permitOutbound(int from, int to, IMessage msg) + { + return permit(outboundFilters, from, to, msg); + } + + private static boolean permit(List<Filter> filters, int from, int to, IMessage msg) + { + for (Filter filter : filters) + { + if (filter.matches(from, to, msg)) + return false; + } + return true; + } + + public static class Filter implements IMessageFilters.Filter + { + final int[] from; + final int[] to; + final int[] verbs; + final Matcher matcher; + final List<Filter> parent; + + Filter(int[] from, int[] to, int[] verbs, Matcher matcher, List<Filter> parent) + { + if (from != null) + { + from = from.clone(); + Arrays.sort(from); + } + if (to != null) + { + to = to.clone(); + Arrays.sort(to); + } + if (verbs != null) + { + verbs = verbs.clone(); + Arrays.sort(verbs); + } + this.from = from; + this.to = to; + this.verbs = verbs; + this.matcher = matcher; + this.parent = Objects.requireNonNull(parent, "parent"); + } + + public int hashCode() + { + return (from == null ? 0 : Arrays.hashCode(from)) + + (to == null ? 0 : Arrays.hashCode(to)) + + (verbs == null ? 0 : Arrays.hashCode(verbs) + + parent.hashCode()); + } + + public boolean equals(Object that) + { + return that instanceof Filter && equals((Filter) that); + } + + public boolean equals(Filter that) + { + return Arrays.equals(from, that.from) + && Arrays.equals(to, that.to) + && Arrays.equals(verbs, that.verbs) + && parent.equals(that.parent); + } + + public Filter off() + { + parent.remove(this); + return this; + } + + public Filter on() + { + parent.add(this); + return this; + } + + public boolean matches(int from, int to, IMessage msg) + { + return (this.from == null || Arrays.binarySearch(this.from, from) >= 0) + && (this.to == null || Arrays.binarySearch(this.to, to) >= 0) + && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0) + && (this.matcher == null || this.matcher.matches(from, to, msg)); + } + } + + public class Builder implements IMessageFilters.Builder + { + int[] from; + int[] to; + int[] verbs; + Matcher matcher; + boolean inbound; + + private Builder(boolean inbound) + { + this.inbound = inbound; + } + + public Builder from(int... nums) + { + from = nums; + return this; + } + + public Builder to(int... nums) + { + to = nums; + return this; + } + + public IMessageFilters.Builder verbs(int... verbs) + { + this.verbs = verbs; + return this; + } + + public IMessageFilters.Builder allVerbs() + { + this.verbs = null; + return this; + } + + public IMessageFilters.Builder inbound(boolean inbound) + { + this.inbound = inbound; + return this; + } + + public IMessageFilters.Builder messagesMatching(Matcher matcher) + { + this.matcher = matcher; + return this; + } + + public IMessageFilters.Filter drop() + { + return new Filter(from, to, verbs, matcher, inbound ? inboundFilters : outboundFilters).on(); + } + } + + public IMessageFilters.Builder inbound(boolean inbound) + { + return new Builder(inbound); + } + + @Override + public void reset() + { + inboundFilters.clear(); + outboundFilters.clear(); + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java new file mode 100644 index 0000000..7bd91d3 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class NetworkTopology +{ + private final Map<InetSocketAddress, DcAndRack> map; + + public static class DcAndRack + { + private final String dc; + private final String rack; + + private DcAndRack(String dc, String rack) + { + this.dc = dc; + this.rack = rack; + } + + @Override + public String toString() + { + return "DcAndRack{" + + "dc='" + dc + '\'' + + ", rack='" + rack + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DcAndRack dcAndRack = (DcAndRack) o; + return Objects.equals(dc, dcAndRack.dc) && + Objects.equals(rack, dcAndRack.rack); + } + + @Override + public int hashCode() + { + return Objects.hash(dc, rack); + } + } + + public static DcAndRack dcAndRack(String dc, String rack) + { + return new DcAndRack(dc, rack); + } + + public static InetSocketAddress addressAndPort(InetAddress address, int port) + { + return new InetSocketAddress(address, port); + } + + public static InetSocketAddress addressAndPort(String address, int port) + { + try + { + return new InetSocketAddress(InetAddress.getByName(address), port); + } + catch (UnknownHostException e) + { + throw new IllegalArgumentException("Unknown address '" + address + '\''); + } + } + + private NetworkTopology() + { + map = new HashMap<>(); + } + + @SuppressWarnings("WeakerAccess") + public NetworkTopology(NetworkTopology networkTopology) + { + map = new HashMap<>(networkTopology.map); + } + + public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology) + { + final NetworkTopology topology = new NetworkTopology(); + + for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++) + { + String broadcastAddress = ipPrefix + nodeId; + + DcAndRack dcAndRack = nodeIdTopology.get(nodeId); + if (dcAndRack == null) + throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap"); + + topology.put(addressAndPort(broadcastAddress, broadcastPort), dcAndRack); + } + return topology; + } + + public DcAndRack put(InetSocketAddress addressAndPort, DcAndRack value) + { + return map.put(addressAndPort, value); + } + + public String localRack(InetSocketAddress key) + { + DcAndRack p = map.get(key); + if (p == null) + return null; + return p.rack; + } + + public String localDC(InetSocketAddress key) + { + DcAndRack p = map.get(key); + if (p == null) + return null; + return p.dc; + } + + public boolean contains(InetSocketAddress key) + { + return map.containsKey(key); + } + + public String toString() + { + return "NetworkTopology{" + map + '}'; + } + + + public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount, + String dc, + String rack) + { + return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack)); + } + + public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount, + IntFunction<DcAndRack> dcAndRackSupplier) + { + + return IntStream.rangeClosed(1, nodeCount).boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + dcAndRackSupplier::apply)); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java new file mode 100644 index 0000000..01f8d29 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +public interface ThrowingRunnable +{ + public void run() throws Throwable; + + public static Runnable toRunnable(ThrowingRunnable runnable) + { + return () -> { + try + { + runnable.run(); + } + catch (Throwable throwable) + { + throw new RuntimeException(throwable); + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java new file mode 100644 index 0000000..51c9e49 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class Versions +{ + private static final Logger logger = LoggerFactory.getLogger(Versions.class); + + public static final String PROPERTY_PREFIX = "cassandra."; + + public static URL[] getClassPath() + { + // In Java 9 the default system ClassLoader got changed from URLClassLoader to AppClassLoader which + // does not extend URLClassLoader nor does it give access to the class path array. + // Java requires the system property 'java.class.path' to be setup with the classpath seperated by : + // so this logic parses that string into the URL[] needed to build later + String cp = System.getProperty("java.class.path"); + assert cp != null && !cp.isEmpty(); + String[] split = cp.split(File.pathSeparator); + assert split.length > 0; + URL[] urls = new URL[split.length]; + try + { + for (int i = 0; i < split.length; i++) + urls[i] = Paths.get(split[i]).toUri().toURL(); + } + catch (MalformedURLException e) + { + throw new RuntimeException(e); + } + return urls; + } + + public enum Major + { + v22("2\\.2\\.([0-9]+)"), + v30("3\\.0\\.([0-9]+)"), + v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"), + v4("4\\.([0-9]+)"); + final Pattern pattern; + Major(String verify) + { + this.pattern = Pattern.compile(verify); + } + + static Major fromFull(String version) + { + if (version.isEmpty()) + throw new IllegalArgumentException(version); + switch (version.charAt(0)) + { + case '2': + if (version.startsWith("2.2")) + return v22; + throw new IllegalArgumentException(version); + case '3': + if (version.startsWith("3.0")) + return v30; + return v3X; + case '4': + return v4; + default: + throw new IllegalArgumentException(version); + } + } + + // verify that the version string is valid for this major version + boolean verify(String version) + { + return pattern.matcher(version).matches(); + } + + // sort two strings of the same major version as this enum instance + int compare(String a, String b) + { + Matcher ma = pattern.matcher(a); + Matcher mb = pattern.matcher(a); + if (!ma.matches()) throw new IllegalArgumentException(a); + if (!mb.matches()) throw new IllegalArgumentException(b); + int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1))); + if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null)) + { + if (ma.group(3) != null && mb.group(3) != null) + { + result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3))); + } + else + { + result = ma.group(3) != null ? 1 : -1; + } + } + // sort descending + return -result; + } + } + + public static class Version + { + public final Major major; + public final String version; + public final URL[] classpath; + + public Version(String version, URL[] classpath) + { + this(Major.fromFull(version), version, classpath); + } + public Version(Major major, String version, URL[] classpath) + { + this.major = major; + this.version = version; + this.classpath = classpath; + } + } + + private final Map<Major, List<Version>> versions; + public Versions(Map<Major, List<Version>> versions) + { + this.versions = versions; + } + + public Version get(String full) + { + return versions.get(Major.fromFull(full)) + .stream() + .filter(v -> full.equals(v.version)) + .findFirst() + .orElseThrow(() -> new RuntimeException("No version " + full + " found")); + } + + public Version getLatest(Major major) + { + return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found")); + } + + public static Versions find() + { + final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build"); + final File sourceDirectory = new File(dtestJarDirectory); + logger.info("Looking for dtest jars in " + sourceDirectory.getAbsolutePath()); + final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar"); + final Map<Major, List<Version>> versions = new HashMap<>(); + for (Major major : Major.values()) + versions.put(major, new ArrayList<>()); + + for (File file : sourceDirectory.listFiles()) + { + Matcher m = pattern.matcher(file.getName()); + if (!m.matches()) + continue; + String version = m.group(1); + Major major = Major.fromFull(version); + versions.get(major).add(new Version(major, version, new URL[] { toURL(file) })); + } + + for (Map.Entry<Major, List<Version>> e : versions.entrySet()) + { + if (e.getValue().isEmpty()) + continue; + Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare)); + System.out.println("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", "))); + } + + return new Versions(versions); + } + + public static URL toURL(File file) + { + try + { + return file.toURI().toURL(); + } + catch (MalformedURLException e) + { + throw new IllegalArgumentException(e); + } + } + +} diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml new file mode 100644 index 0000000..370e1e5 --- /dev/null +++ b/test/conf/logback-dtest.xml @@ -0,0 +1,77 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + + <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>20</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>20MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + </encoder> + <immediateFlush>false</immediateFlush> + </appender> + + <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender"> + <discardingThreshold>0</discardingThreshold> + <maxFlushTime>0</maxFlushTime> + <queueSize>1024</queueSize> + <appender-ref ref="INSTANCEFILE"/> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> + + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <logger name="org.apache.hadoop" level="WARN"/> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEASYNCFILE" /> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> + </root> +</configuration> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org