This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cassandra-2.2 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push: new 50b7094 Add client testing capabilities to in-jvm tests 50b7094 is described below commit 50b7094278241f389d3b0b49b02e893fd4322b12 Author: Doug Rohrer <droh...@apple.com> AuthorDate: Mon Oct 14 13:42:35 2019 -0400 Add client testing capabilities to in-jvm tests Patch by Doug Rohrer, reviewed by Alex Petrov for CASSANDRA-15347. Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com> --- .../apache/cassandra/service/CassandraDaemon.java | 96 +++++++++---- .../org/apache/cassandra/thrift/ThriftServer.java | 5 + .../org/apache/cassandra/transport/Server.java | 41 +++++- .../apache/cassandra/distributed/api/Feature.java | 2 +- .../cassandra/distributed/api/IInstance.java | 2 + .../apache/cassandra/distributed/api/IListen.java | 2 + .../distributed/impl/AbstractCluster.java | 158 +++++++++++++++++---- .../cassandra/distributed/impl/Instance.java | 21 +++ .../distributed/impl/InstanceClassLoader.java | 16 ++- .../cassandra/distributed/impl/InstanceConfig.java | 34 +++-- .../apache/cassandra/distributed/impl/Listen.java | 20 ++- .../apache/cassandra/distributed/impl/RowUtil.java | 17 +++ .../distributed/test/DistributedTestBase.java | 7 + .../distributed/test/NativeProtocolTest.java | 59 ++++++++ .../distributed/test/ResourceLeakTest.java | 16 +++ 15 files changed, 424 insertions(+), 72 deletions(-) diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 1380f43..c0ba38e 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; import javax.management.remote.JMXConnectorServer; @@ -83,6 +82,13 @@ public class CassandraDaemon private static JMXConnectorServer jmxServer = null; private static final Logger logger; + + @VisibleForTesting + public static CassandraDaemon getInstanceForTesting() + { + return instance; + } + static { // Need to register metrics before instrumented appender is created(first access to LoggerFactory). SharedMetricRegistries.getOrCreate("logback-metrics").addListener(new MetricRegistryListener.Base() @@ -357,13 +363,53 @@ public class CassandraDaemon int rpcPort = DatabaseDescriptor.getRpcPort(); int listenBacklog = DatabaseDescriptor.getRpcListenBacklog(); thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog); + initializeNativeTransport(); + + completeSetup(); + } + public void initializeNativeTransport() + { // Native transport InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress(); int nativePort = DatabaseDescriptor.getNativeTransportPort(); nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); + } - completeSetup(); + public void startNativeTransport() + { + validateTransportsCanStart(); + + if (nativeServer == null) + throw new IllegalStateException("native transport should be set up before it can be started"); + + nativeServer.start(); + } + + private void validateTransportsCanStart() + { + // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in + // survey mode and streaming has completed but we're not using auth. + // OR if we have not joined the ring yet. + if (StorageService.instance.hasJoined()) + { + if (StorageService.instance.isSurveyMode()) + { + if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication()) + { + throw new IllegalStateException("Not starting client transports in write_survey mode as it's bootstrapping or " + + "auth is enabled"); + } + } + else + { + if (!SystemKeyspace.bootstrapComplete()) + { + throw new IllegalStateException("Node is not yet bootstrapped completely. Use nodetool to check bootstrap" + + " state and resume. For more, see `nodetool help bootstrap`"); + } + } + } } /* @@ -440,28 +486,15 @@ public class CassandraDaemon */ public void start() { - // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in - // survey mode and streaming has completed but we're not using auth. - // OR if we have not joined the ring yet. - if (StorageService.instance.hasJoined()) + try { - if (StorageService.instance.isSurveyMode()) - { - if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication()) - { - logger.info("Not starting client transports in write_survey mode as it's bootstrapping or " + - "auth is enabled"); - return; - } - } - else - { - if (!SystemKeyspace.bootstrapComplete()) - { - logger.info("Not starting client transports as bootstrap has not completed"); - return; - } - } + validateTransportsCanStart(); + } + catch (IllegalStateException isx) + { + // If there are any errors, we just log and return in this case + logger.info(isx.getMessage()); + return; } String nativeFlag = System.getProperty("cassandra.start_native_transport"); @@ -510,6 +543,18 @@ public class CassandraDaemon } } + @VisibleForTesting + public void destroyNativeTransport() throws InterruptedException + { + // In 2.2, just stopping the server works. Future versions require `destroy` to be called + // so we maintain the name for consistency + if (nativeServer != null) + { + nativeServer.stopAndAwaitTermination(); + nativeServer = null; + } + } + /** * Clean up all resources obtained during the lifetime of the daemon. This @@ -648,7 +693,7 @@ public class CassandraDaemon logger.info("No gossip backlog; proceeding"); } - public static void stop(String[] args) + public static void stop(String[] args) throws InterruptedException { instance.deactivate(); } @@ -703,6 +748,9 @@ public class CassandraDaemon */ public void stop(); + @VisibleForTesting + public void stopAndAwaitTermination(); + /** * Returns whether the server is currently running. */ diff --git a/src/java/org/apache/cassandra/thrift/ThriftServer.java b/src/java/org/apache/cassandra/thrift/ThriftServer.java index 44ec524..87dcd3e 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftServer.java +++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java @@ -77,6 +77,11 @@ public class ThriftServer implements CassandraDaemon.Server } } + public void stopAndAwaitTermination() + { + stop(); + } + public boolean isRunning() { return server != null; diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index c91d37d..418f6f7 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -24,10 +24,12 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +126,14 @@ public class Server implements CassandraDaemon.Server public void stop() { if (isRunning.compareAndSet(true, false)) - close(); + close(false); + } + + @VisibleForTesting + public void stopAndAwaitTermination() + { + if (isRunning.compareAndSet(true, false)) + close(true); } public boolean isRunning() @@ -206,15 +215,39 @@ public class Server implements CassandraDaemon.Server private void close() { + close(false); + } + + private void closeAndAwait() + { + close(true); + } + + private void close(boolean awaitTermination) + { // Close opened connections connectionTracker.closeAll(); workerGroup.shutdownGracefully(); - workerGroup = null; - eventExecutorGroup.shutdown(); - eventExecutorGroup = null; + logger.info("Stop listening for CQL clients"); + if (awaitTermination) + { + try + { + workerGroup.awaitTermination(1, TimeUnit.MINUTES); + eventExecutorGroup.awaitTermination(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + logger.error(e.getMessage()); + } + } + + workerGroup = null; + eventExecutorGroup = null; + StorageService.instance.setRpcReady(false); } diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java index a5c9316..b4ba036 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/Feature.java +++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java @@ -20,5 +20,5 @@ package org.apache.cassandra.distributed.api; public enum Feature { - NETWORK, GOSSIP + NETWORK, GOSSIP, NATIVE_PROTOCOL } diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java index 6a9e33a..e6c705c 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java @@ -41,6 +41,8 @@ public interface IInstance extends IIsolatedExecutor Future<Void> shutdown(); Future<Void> shutdown(boolean graceful); + int liveMemberCount(); + // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface void startup(ICluster cluster); void receiveMessage(IMessage message); diff --git a/test/distributed/org/apache/cassandra/distributed/api/IListen.java b/test/distributed/org/apache/cassandra/distributed/api/IListen.java index d3a80da..c2e8dd6 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IListen.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IListen.java @@ -23,4 +23,6 @@ public interface IListen public interface Cancel { void cancel(); } Cancel schema(Runnable onChange); + + Cancel liveMembers(Runnable onChange); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index f603497..0283457 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -43,9 +43,11 @@ import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -129,7 +131,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, private IInvokableInstance newInstance(int generation) { - ClassLoader classLoader = new InstanceClassLoader(generation, version.classpath, sharedClassLoader); + ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader); return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader) .apply(config, classLoader); } @@ -171,6 +173,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, return future; } + public int liveMemberCount() + { + if (!isShutdown && delegate != null) + return delegate().liveMemberCount(); + + throw new IllegalStateException("Cannot get live member count on shutdown instance"); + } + @Override public void receiveMessage(IMessage message) { @@ -250,16 +260,22 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, } public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); } - public void forEach(Consumer<? super I> consumer) { instances.forEach(consumer); } - public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit units) + public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); } + public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); } + + public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit) + { + parallelForEach(instances, consumer, timeout, unit); + } + + public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit) { FBUtilities.waitOnFutures(instances.stream() .map(i -> i.async(consumer).apply(i)) .collect(Collectors.toList()), - timeout, units); + timeout, unit); } - public IMessageFilters filters() { return filters; } public MessageFilters.Builder verbs(MessagingService.Verb ... verbs) { return filters.verbs(verbs); } @@ -278,7 +294,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, { // execute the schema change coordinator(1).execute(query, ConsistencyLevel.ALL); - monitor.waitForAgreement(); + monitor.waitForCompletion(); } }).run(); } @@ -301,33 +317,26 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, } } - /** - * Will wait for a schema change AND agreement that occurs after it is created - * (and precedes the invocation to waitForAgreement) - * - * Works by simply checking if all UUIDs agree after any schema version change event, - * so long as the waitForAgreement method has been entered (indicating the change has - * taken place on the coordinator) - * - * This could perhaps be made a little more robust, but this should more than suffice. - */ - public class SchemaChangeMonitor implements AutoCloseable + public abstract class ChangeMonitor implements AutoCloseable { final List<IListen.Cancel> cleanup; - volatile boolean schemaHasChanged; - final SimpleCondition agreement = new SimpleCondition(); + final SimpleCondition completed; + private final long timeOut; + private final TimeUnit timeoutUnit; + volatile boolean changed; - public SchemaChangeMonitor() + public ChangeMonitor(long timeOut, TimeUnit timeoutUnit) { + this.timeOut = timeOut; + this.timeoutUnit = timeoutUnit; this.cleanup = new ArrayList<>(instances.size()); - for (IInstance instance : instances) - cleanup.add(instance.listen().schema(this::signal)); + this.completed = new SimpleCondition(); } - private void signal() + protected void signal() { - if (schemaHasChanged && 1 == instances.stream().map(IInstance::schemaVersion).distinct().count()) - agreement.signalAll(); + if (changed && isCompleted()) + completed.signalAll(); } @Override @@ -337,20 +346,91 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, cancel.cancel(); } - public void waitForAgreement() + public void waitForCompletion() { - schemaHasChanged = true; + startPolling(); + changed = true; signal(); try { - if (!agreement.await(1L, TimeUnit.MINUTES)) + if (!completed.await(timeOut, timeoutUnit)) throw new InterruptedException(); } catch (InterruptedException e) { - throw new IllegalStateException("Schema agreement not reached"); + throw new IllegalStateException(getMonitorTimeoutMessage()); } } + + private void startPolling() + { + for (IInstance instance : instances) + cleanup.add(startPolling(instance)); + } + + protected abstract IListen.Cancel startPolling(IInstance instance); + + protected abstract boolean isCompleted(); + + protected abstract String getMonitorTimeoutMessage(); + } + + + + /** + * Will wait for a schema change AND agreement that occurs after it is created + * (and precedes the invocation to waitForAgreement) + * + * Works by simply checking if all UUIDs agree after any schema version change event, + * so long as the waitForAgreement method has been entered (indicating the change has + * taken place on the coordinator) + * + * This could perhaps be made a little more robust, but this should more than suffice. + */ + public class SchemaChangeMonitor extends ChangeMonitor + { + public SchemaChangeMonitor() + { + super(70, TimeUnit.SECONDS); + } + + protected IListen.Cancel startPolling(IInstance instance) + { + return instance.listen().schema(this::signal); + } + + protected boolean isCompleted() + { + return 1 == instances.stream().map(IInstance::schemaVersion).distinct().count(); + } + + protected String getMonitorTimeoutMessage() + { + return "Schema agreement not reached"; + } + } + + public class AllMembersAliveMonitor extends ChangeMonitor + { + public AllMembersAliveMonitor() + { + super(60, TimeUnit.SECONDS); + } + + protected IListen.Cancel startPolling(IInstance instance) + { + return instance.listen().liveMembers(this::signal); + } + + protected boolean isCompleted() + { + return instances.stream().allMatch(i -> !i.config().has(Feature.GOSSIP) || i.liveMemberCount() == instances.size()); + } + + protected String getMonitorTimeoutMessage() + { + return "Live member count did not converge across all instances"; + } } public void schemaChange(String statement, int instance) @@ -360,7 +440,25 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, void startup() { - parallelForEach(I::startup, 0, null); + try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor()) + { + // Start any instances with auto_bootstrap enabled first, and in series to avoid issues + // with multiple nodes bootstrapping with consistent range movement enabled, + // and then start any instances with it disabled in parallel. + List<I> startSequentially = new ArrayList<>(); + List<I> startParallel = new ArrayList<>(); + for (I instance : instances) + { + if ((boolean) instance.config().get("auto_bootstrap")) + startSequentially.add(instance); + else + startParallel.add(instance); + } + + forEach(startSequentially, I::startup); + parallelForEach(startParallel, I::startup, 0, null); + monitor.waitForCompletion(); + } } protected interface Factory<I extends IInstance, C extends AbstractCluster<I>> diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index d3f2955..c8613f7 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -73,6 +73,7 @@ import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.QueryState; @@ -90,6 +91,7 @@ import org.apache.cassandra.utils.concurrent.Ref; import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; public class Instance extends IsolatedExecutor implements IInvokableInstance { @@ -426,6 +428,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance if (config.has(GOSSIP)) { StorageService.instance.initServer(); + StorageService.instance.removeShutdownHook(); } else { @@ -436,6 +439,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance SystemKeyspace.finishStartup(); + if (config.has(NATIVE_PROTOCOL)) + { + CassandraDaemon.getInstanceForTesting().initializeNativeTransport(); + CassandraDaemon.getInstanceForTesting().startNativeTransport(); + } + if (!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address)) throw new IllegalStateException(); if (DatabaseDescriptor.getStoragePort() != broadcastAddressAndPort().port) @@ -525,6 +534,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance Future<?> future = async((ExecutorService executor) -> { Throwable error = null; + error = parallelRun(error, executor, CassandraDaemon.getInstanceForTesting()::destroyNativeTransport); + if (config.has(GOSSIP) || config.has(NETWORK)) { StorageService.instance.shutdownServer(); @@ -565,6 +576,16 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance .thenRun(super::shutdown); } + public int liveMemberCount() + { + return sync(() -> { + if (!DatabaseDescriptor.isDaemonInitialized() || !Gossiper.instance.isEnabled()) + return 0; + return Gossiper.instance.getLiveMembers().size(); + }).call(); + } + + private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables) { List<Future<Throwable>> results = new ArrayList<>(); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java index ca6d713..fb78902 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java @@ -24,6 +24,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.SigarLibrary; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; import java.util.Arrays; @@ -65,16 +66,19 @@ public class InstanceClassLoader extends URLClassLoader InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader); } + private volatile boolean isClosed = false; private final URL[] urls; private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected + private final int id; private final ClassLoader sharedClassLoader; - InstanceClassLoader(int generation, URL[] urls, ClassLoader sharedClassLoader) + InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader) { super(urls, null); this.urls = urls; this.sharedClassLoader = sharedClassLoader; this.generation = generation; + this.id = id; } @Override @@ -88,6 +92,9 @@ public class InstanceClassLoader extends URLClassLoader Class<?> loadClassInternal(String name) throws ClassNotFoundException { + if (isClosed) + throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name)); + synchronized (getClassLoadingLock(name)) { // First, check if the class has already been loaded @@ -112,7 +119,14 @@ public class InstanceClassLoader extends URLClassLoader { return "InstanceClassLoader{" + "generation=" + generation + + ", id = " + id + ", urls=" + Arrays.toString(urls) + '}'; } + + public void close() throws IOException + { + isClosed = true; + super.close(); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 88299d6..97e1a18 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -58,18 +58,23 @@ public class InstanceConfig implements IInstanceConfig { if (broadcastAddressAndPort == null) { - try - { - broadcastAddressAndPort = InetAddressAndPort.getByNameOverrideDefaults(getString("broadcast_address"), getInt("storage_port")); - } - catch (UnknownHostException e) - { - throw new IllegalStateException(e); - } + broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port"); } return broadcastAddressAndPort; } + private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp) + { + try + { + return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp)); + } + catch (UnknownHostException e) + { + throw new IllegalStateException(e); + } + } + private InstanceConfig(int num, NetworkTopology networkTopology, String broadcast_address, @@ -97,6 +102,7 @@ public class InstanceConfig implements IInstanceConfig // .set("cdc_directory", cdc_directory) .set("initial_token", initial_token) .set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner") + .set("start_native_transport", true) .set("concurrent_writes", 2) .set("concurrent_counter_writes", 2) // .set("concurrent_materialized_view_writes", 2) @@ -109,6 +115,11 @@ public class InstanceConfig implements IInstanceConfig .set("endpoint_snitch", DistributedTestSnitch.class.getName()) .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(), Collections.singletonMap("seeds", "127.0.0.1"))) + .set("auto_bootstrap", false) + // capacities that are based on `totalMemory` that should be fixed size + .set("index_summary_capacity_in_mb", 50l) + .set("counter_cache_size_in_mb", 50l) + .set("key_cache_size_in_mb", 50l) // legacy parameters .forceSet("commitlog_sync_batch_window_in_ms", 1.0); this.featureFlags = EnumSet.noneOf(Feature.class); @@ -130,6 +141,13 @@ public class InstanceConfig implements IInstanceConfig return this; } + public InstanceConfig with(Feature... flags) + { + for (Feature flag : flags) + featureFlags.add(flag); + return this; + } + public boolean has(Feature featureFlag) { return featureFlags.contains(featureFlag); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java index cf208a1..27ae156 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java @@ -18,15 +18,18 @@ package org.apache.cassandra.distributed.impl; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; + import org.apache.cassandra.distributed.api.IListen; +import org.apache.cassandra.gms.Gossiper; public class Listen implements IListen { final Instance instance; + public Listen(Instance instance) { this.instance = instance; @@ -34,9 +37,18 @@ public class Listen implements IListen public Cancel schema(Runnable onChange) { - final AtomicBoolean cancel = new AtomicBoolean(); + return start(onChange, instance::schemaVersion); + } + + public Cancel liveMembers(Runnable onChange) + { + return start(onChange, instance::liveMemberCount); + } + + protected <T> Cancel start(Runnable onChange, Supplier<T> valueSupplier) { + AtomicBoolean cancel = new AtomicBoolean(false); instance.isolatedExecutor.execute(() -> { - UUID prev = instance.schemaVersion(); + T prev = valueSupplier.get(); while (true) { if (cancel.get()) @@ -44,7 +56,7 @@ public class Listen implements IListen LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); - UUID cur = instance.schemaVersion(); + T cur = valueSupplier.get(); if (!prev.equals(cur)) onChange.run(); prev = cur; diff --git a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java index d84c4a9..ded3708 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java @@ -24,6 +24,8 @@ import java.util.List; import com.google.common.collect.Iterators; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.transport.messages.ResultMessage; @@ -64,4 +66,19 @@ public class RowUtil return objectRow; }); } + + public static Iterator<Object[]> toObjects(ResultSet rs) + { + return Iterators.transform(rs.iterator(), (Row row) -> { + final int numColumns = rs.getColumnDefinitions().size(); + Object[] objectRow = new Object[numColumns]; + for (int i = 0; i < numColumns; i++) + { + objectRow[i] = row.getObject(i); + } + return objectRow; + }); + } + + } diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java index 757c17f..745e1ab 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java @@ -28,8 +28,10 @@ import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; +import com.datastax.driver.core.ResultSet; import org.apache.cassandra.distributed.impl.AbstractCluster; import org.apache.cassandra.distributed.impl.IsolatedExecutor; +import org.apache.cassandra.distributed.impl.RowUtil; public class DistributedTestBase { @@ -81,6 +83,11 @@ public class DistributedTestBase return cluster; } + public static void assertRows(ResultSet actual,Object[]... expected) + { + assertRows(RowUtil.toObjects(actual), expected); + } + public static void assertRows(Object[][] actual, Object[]... expected) { Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected), diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java new file mode 100644 index 0000000..15392b1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.impl.RowUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Iterator; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class NativeProtocolTest extends DistributedTestBase +{ + + @Test + public void withClientRequests() throws Throwable + { + try (Cluster ignored = init(Cluster.create(3, + config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)))) + { + final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); + Session session = cluster.connect(); + session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"); + session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);"); + Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL); + final ResultSet resultSet = session.execute(select); + assertRows(resultSet, row(1, 1, 1)); + Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size()); + session.close(); + cluster.close(); + } + } +} \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java index 55c700c..09f40e4 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java @@ -44,6 +44,7 @@ import org.apache.cassandra.utils.SigarLibrary; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup. * All objects referencing the InstanceClassLoader need to be garbage collected or @@ -144,6 +145,7 @@ public class ResourceLeakTest extends DistributedTestBase { for (int loop = 0; loop < numTestLoops; loop++) { + System.out.println(String.format("========== Starting loop %03d ========", loop)); try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start()) { if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node @@ -170,6 +172,7 @@ public class ResourceLeakTest extends DistributedTestBase System.runFinalization(); System.gc(); } + System.out.println(String.format("========== Completed loop %03d ========", loop)); } } @@ -198,4 +201,17 @@ public class ResourceLeakTest extends DistributedTestBase } dumpResources("final-gossip-network"); } + + @Test + public void looperNativeTest() throws Throwable + { + doTest(2, config -> config.with(NATIVE_PROTOCOL)); + if (forceCollection) + { + System.runFinalization(); + System.gc(); + Thread.sleep(finalWaitMillis); + } + dumpResources("final-native"); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org