This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 2ae1ec5dd2d98178f3ab4b3ed64a87147e713560 Merge: 0e0056c 34dde96 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Oct 12 11:06:42 2020 -0700 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 2 + build.xml | 3 + .../apache/cassandra/service/CassandraDaemon.java | 108 +++++------ .../apache/cassandra/service/StorageService.java | 2 +- .../distributed/impl/AbstractCluster.java | 18 +- .../cassandra/distributed/impl/Instance.java | 41 +++- .../cassandra/distributed/shared/Byteman.java | 207 +++++++++++++++++++++ .../cassandra/distributed/shared/Shared.java | 37 ++++ .../test/BootstrapBinaryDisabledTest.java | 165 ++++++++++++++++ .../test/ClientNetworkStopStartTest.java | 79 ++++++++ .../distributed/test/TopologyChangeTest.java | 45 +++-- test/resources/byteman/stream_failure.btm | 14 ++ 12 files changed, 633 insertions(+), 88 deletions(-) diff --cc CHANGES.txt index bf80c8c,ee70af5..6829dac --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -29,34 -6,15 +29,36 @@@ Merged from 3.11 Merged from 3.0: * Handle unexpected columns due to schema races (CASSANDRA-15899) * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160) + Merged from 2.2: + * Fixed a NullPointerException when calling nodetool enablethrift (CASSANDRA-16127) -3.11.8 +4.0-beta2 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939) + * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876) + * Remove deprecated HintedHandOffManager (CASSANDRA-15939) + * Prevent repair from overrunning compaction (CASSANDRA-15817) + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053) + * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802) + * Fix unicode chars error input (CASSANDRA-15990) + * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788) + * Handle errors in StreamSession#prepare (CASSANDRA-15852) + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039) + * Remove COMPACT STORAGE internals (CASSANDRA-13994) + * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976) + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425) + * Improve logging for socket connection/disconnection (CASSANDRA-15980) + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928) + * Forbid altering UDTs used in partition keys (CASSANDRA-15933) + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973) + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766) + * Verify sstable components on startup (CASSANDRA-15945) + * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937) +Merged from 3.11: * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071) * Fix short read protection for GROUP BY queries (CASSANDRA-15459) + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) Merged from 3.0: - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935) * Fix gossip shutdown order (CASSANDRA-15816) * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432) * Check for endpoint collision with hibernating nodes (CASSANDRA-14599) diff --cc build.xml index e026630,191c1c8..5c9ac2f --- a/build.xml +++ b/build.xml @@@ -582,13 -412,20 +582,14 @@@ <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/> <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/> <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/> - <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/> - - <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7"> - <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/> - <exclusion groupId="junit" artifactId="junit"/> - </dependency> - <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/> - <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2"> - <exclusion groupId="commons-logging" artifactId="commons-logging"/> - </dependency> - <dependency groupId="junit" artifactId="junit" version="4.6" /> + <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/> + <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.26"/> + <dependency groupId="junit" artifactId="junit" version="4.12" /> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" /> + <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.5" /> + <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> @@@ -731,19 -542,22 +732,20 @@@ version="${version}"/> <dependency groupId="junit" artifactId="junit"/> <dependency groupId="org.mockito" artifactId="mockito-core" /> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" /> + <dependency groupId="org.quicktheories" artifactId="quicktheories" /> + <dependency groupId="org.reflections" artifactId="reflections" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> + <dependency groupId="org.apache.cassandra" artifactId="dtest-api" /> + <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" /> <dependency groupId="org.apache.rat" artifactId="apache-rat"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> - <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> - <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> + <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> + <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> <dependency groupId="org.antlr" artifactId="antlr"/> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"> - <exclusion groupId="io.netty" artifactId="netty-buffer"/> - <exclusion groupId="io.netty" artifactId="netty-codec"/> - <exclusion groupId="io.netty" artifactId="netty-handler"/> - <exclusion groupId="io.netty" artifactId="netty-transport"/> - </dependency> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> - <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.4" /> - <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.4" /> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/> <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/> @@@ -770,7 -572,14 +772,8 @@@ version="${version}"/> <dependency groupId="junit" artifactId="junit"/> <dependency groupId="org.mockito" artifactId="mockito-core" /> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" /> + <dependency groupId="org.reflections" artifactId="reflections" /> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"> - <exclusion groupId="io.netty" artifactId="netty-buffer"/> - <exclusion groupId="io.netty" artifactId="netty-codec"/> - <exclusion groupId="io.netty" artifactId="netty-handler"/> - <exclusion groupId="io.netty" artifactId="netty-transport"/> - </dependency> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> <dependency groupId="io.netty" artifactId="netty-all"/> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index 4b92d69,d8bd165..6d6bc70 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -38,26 -44,19 +44,20 @@@ import com.codahale.metrics.jvm.BufferP import com.codahale.metrics.jvm.FileDescriptorRatioGauge; import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; - import com.google.common.annotations.VisibleForTesting; - import com.google.common.util.concurrent.Futures; - import com.google.common.util.concurrent.ListenableFuture; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - -import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; +import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.concurrent.ScheduledExecutors; - import org.apache.cassandra.db.virtual.SystemViewsKeyspace; - import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; - import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace; - import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.net.StartupClusterConnectivityChecker; - import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.schema.Schema; - import org.apache.cassandra.schema.SchemaConstants; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; - import org.apache.cassandra.db.*; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.SizeEstimatesRecorder; + import org.apache.cassandra.db.SystemKeyspace; ++import org.apache.cassandra.db.SystemKeyspaceMigrator40; + import org.apache.cassandra.db.WindowsFailedSnapshotTracker; import org.apache.cassandra.db.commitlog.CommitLog; ++import org.apache.cassandra.db.virtual.SystemViewsKeyspace; ++import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; ++import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.gms.Gossiper; @@@ -65,22 -65,21 +65,33 @@@ import org.apache.cassandra.io.FSError import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.SSTableHeaderFix; import org.apache.cassandra.io.util.FileUtils; ++import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.StorageMetrics; - import org.apache.cassandra.tracing.Tracing; - import org.apache.cassandra.utils.*; -import org.apache.cassandra.schema.LegacySchemaMigrator; ++import org.apache.cassandra.net.StartupClusterConnectivityChecker; ++import org.apache.cassandra.schema.Schema; ++import org.apache.cassandra.schema.SchemaConstants; ++import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.security.ThreadAwareSecurityManager; -import org.apache.cassandra.thrift.ThriftServer; + import org.apache.cassandra.tracing.Tracing; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.JMXServerUtils; + import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.MBeanWrapper; + import org.apache.cassandra.utils.Mx4jTool; + import org.apache.cassandra.utils.NativeLibrary; + import org.apache.cassandra.utils.WindowsTimer; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND; - import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE; +import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_JMX_REMOTE_PORT; ++import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE; +import static org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT; +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_CLASS_PATH; +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION; +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME; + /** * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon * service, which defines not only a way to activate and deactivate it, but also @@@ -167,24 -166,10 +178,24 @@@ public class CassandraDaemo } } + @VisibleForTesting + public static Runnable SPECULATION_THRESHOLD_UPDATER = + () -> + { + try + { + Keyspace.allExisting().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold)); + } + catch (Throwable t) + { + logger.warn("Failed to update speculative retry thresholds.", t); + JVMStabilityInspector.inspectThrowable(t); + } + }; + static final CassandraDaemon instance = new CassandraDaemon(); - private NativeTransportService nativeTransportService; - private volatile Server thriftServer; + private volatile NativeTransportService nativeTransportService; private JMXConnectorServer jmxServer; private final boolean runManaged; @@@ -445,25 -435,20 +456,25 @@@ // due to scheduling errors or race conditions ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES); + // schedule periodic recomputation of speculative retry thresholds + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SPECULATION_THRESHOLD_UPDATER, + DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS), + DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS), + NANOSECONDS); + - initializeNativeTransport(); + initializeClientTransports(); completeSetup(); } - public synchronized void initializeClientTransports() + public void setupVirtualKeyspaces() { - // Thrift - InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); - int rpcPort = DatabaseDescriptor.getRpcPort(); - int listenBacklog = DatabaseDescriptor.getRpcListenBacklog(); - if (thriftServer == null) - thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog); + VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance); + VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance); + } - public void initializeNativeTransport() ++ public synchronized void initializeClientTransports() + { // Native transport if (nativeTransportService == null) nativeTransportService = new NativeTransportService(); @@@ -560,34 -530,24 +571,28 @@@ */ public void start() { + StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(), + DatabaseDescriptor.getBlockForPeersInRemoteDatacenters()); + connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter); + - // We only start transports if bootstrap has completed and we're not in survey mode, - // OR if we are in survey mode and streaming has completed but we're not using auth - // OR if we have not joined the ring yet. - if (StorageService.instance.hasJoined()) + // check to see if transports may start else return without starting. This is needed when in survey mode or + // when bootstrap has not completed. + try { - if (StorageService.instance.isSurveyMode()) - { - if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication()) - { - logger.info("Not starting client transports in write_survey mode as it's bootstrapping or " + - "auth is enabled"); - return; - } - } - else - { - if (!SystemKeyspace.bootstrapComplete()) - { - logger.info("Not starting client transports as bootstrap has not completed"); - return; - } - } + validateTransportsCanStart(); } + catch (IllegalStateException isx) + { + // If there are any errors, we just log and return in this case - logger.info(isx.getMessage()); ++ logger.warn(isx.getMessage()); + return; + } + + startClientTransports(); + } + private void startClientTransports() + { String nativeFlag = System.getProperty("cassandra.start_native_transport"); if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) { @@@ -631,16 -596,14 +635,13 @@@ } @VisibleForTesting - public void destroyNativeTransport() throws InterruptedException + public void destroyClientTransports() { - stopThriftServer(); + stopNativeTransport(); if (nativeTransportService != null) - { nativeTransportService.destroy(); - nativeTransportService = null; - } } - /** * Clean up all resources obtained during the lifetime of the daemon. This * is a hook for JSVC. @@@ -765,10 -726,42 +768,9 @@@ public boolean isNativeTransportRunning() { - return nativeTransportService != null ? nativeTransportService.isRunning() : false; + return nativeTransportService != null && nativeTransportService.isRunning(); } - public void startThriftServer() - { - validateTransportsCanStart(); - - if (thriftServer == null) - throw new IllegalStateException("setup() must be called first for CassandraDaemon"); - thriftServer.start(); - } - - public void stopThriftServer() - { - if (thriftServer != null) - { - thriftServer.stop(); - } - } - - public boolean isThriftServerRunning() - { - return thriftServer != null && thriftServer.isRunning(); - } - - public int getMaxNativeProtocolVersion() - { - return nativeTransportService.getMaxProtocolVersion(); - } - - public void refreshMaxNativeProtocolVersion() - { - if (nativeTransportService != null) - nativeTransportService.refreshMaxNegotiableProtocolVersion(); - } -- /** * A convenience method to stop and destroy the daemon in one shot. */ diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 3fee754,5477e36..361519d --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -35,8 -35,8 +35,9 @@@ import java.util.concurrent.atomic.Atom import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.Consumer; + import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import com.google.common.collect.Sets; @@@ -68,12 -70,11 +70,13 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.shared.ShutdownException; import org.apache.cassandra.distributed.shared.Versions; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; + import org.reflections.Reflections; +import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort; + /** * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}. * @@@ -186,11 -174,11 +196,11 @@@ public abstract class AbstractCluster< private IInvokableInstance newInstance(int generation) { - ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader); + ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE); if (instanceInitializer != null) instanceInitializer.accept(classLoader, config.num()); - return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader) - .apply(config, classLoader); + return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader) + .apply(config.forVersion(version.major), classLoader); } public IInstanceConfig config() @@@ -769,5 -729,11 +779,11 @@@ .collect(Collectors.toList()); } + private static Set<String> findClassesMarkedForSharedClassLoader() + { + return new Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream() - .map(Class::getName) - .collect(Collectors.toSet()); ++ .map(Class::getName) ++ .collect(Collectors.toSet()); + } } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 83669e2,aa37029..6ad0712 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -152,11 -142,8 +152,12 @@@ public class Instance extends IsolatedE // Set the config at instance creation, possibly before startup() has run on all other instances. // setMessagingVersions below will call runOnInstance which will instantiate // the MessagingService and dependencies preventing later changes to network parameters. - Config.setOverrideLoadConfig(() -> loadConfig(config)); + Config single = loadConfig(config); + Config.setOverrideLoadConfig(() -> single); + + // Enable streaming inbound handler tracking so they can be closed properly without leaking + // the blocking IO thread. + StreamingInboundHandler.trackInboundHandlers(); } @Override @@@ -405,6 -511,6 +406,9 @@@ throw e; } ++ // Start up virtual table support ++ CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces(); ++ Keyspace.setInitialized(); // Replay any CommitLogSegments found on disk @@@ -430,12 -535,9 +434,13 @@@ // -- not sure what that means? SocketFactory.instance.getClass(); registerMockMessaging(cluster); } + registerInboundFilter(cluster); + registerOutboundFilter(cluster); + + JVMStabilityInspector.replaceKiller(new InstanceKiller()); // TODO: this is more than just gossip + StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting()); if (config.has(GOSSIP)) { StorageService.instance.initServer(); @@@ -451,21 -552,18 +456,19 @@@ SystemKeyspace.finishStartup(); + CassandraDaemon.getInstanceForTesting().setupCompleted(); + if (config.has(NATIVE_PROTOCOL)) { - // Start up virtual table support - CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces(); - - CassandraDaemon.getInstanceForTesting().initializeNativeTransport(); - CassandraDaemon.getInstanceForTesting().startNativeTransport(); - StorageService.instance.setRpcReady(true); + CassandraDaemon.getInstanceForTesting().initializeClientTransports(); + CassandraDaemon.getInstanceForTesting().start(); } - if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress())) - throw new IllegalStateException(); - if (DatabaseDescriptor.getStoragePort() != broadcastAddress().getPort()) - throw new IllegalStateException(); + if (!FBUtilities.getBroadcastAddressAndPort().address.equals(broadcastAddress().getAddress()) || + FBUtilities.getBroadcastAddressAndPort().port != broadcastAddress().getPort()) + throw new IllegalStateException(String.format("%s != %s", FBUtilities.getBroadcastAddressAndPort(), broadcastAddress())); + + ActiveRepairService.instance.start(); } catch (Throwable t) { @@@ -586,20 -682,15 +589,21 @@@ () -> BufferPool.shutdownLocalCleaner(1L, MINUTES), () -> Ref.shutdownReferenceReaper(1L, MINUTES), () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES), + () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES), + () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES), () -> SSTableReader.shutdownBlocking(1L, MINUTES), - () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES) + () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())), + () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES) ); + error = parallelRun(error, executor, - () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES), - (IgnoreThrowingRunnable) MessagingService.instance()::shutdown + CommitLog.instance::shutdownBlocking, - () -> MessagingService.instance().shutdown(1L, MINUTES, false, true) ++ // can only shutdown message once, so if the test shutsdown an instance, then ignore the failure ++ (IgnoreThrowingRunnable) () -> MessagingService.instance().shutdown(1L, MINUTES, false, true) ); error = parallelRun(error, executor, - () -> StageManager.shutdownAndWait(1L, MINUTES), + () -> GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES), + () -> Stage.shutdownAndWait(1L, MINUTES), () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES) ); error = parallelRun(error, executor, diff --cc test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java index 0000000,3ac5028..a7ac605 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java @@@ -1,0 -1,165 +1,165 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.io.IOException; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.concurrent.TimeoutException; + + import org.junit.Assert; + import org.junit.Test; + + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.Feature; + import org.apache.cassandra.distributed.api.IInstanceConfig; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.api.LogResult; + import org.apache.cassandra.distributed.api.SimpleQueryResult; + import org.apache.cassandra.distributed.api.TokenSupplier; + import org.apache.cassandra.distributed.shared.Byteman; + import org.apache.cassandra.distributed.shared.NetworkTopology; + import org.apache.cassandra.distributed.shared.Shared; + + /** + * Replaces python dtest bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled + */ + public class BootstrapBinaryDisabledTest extends TestBaseImpl + { + @Test + public void test() throws IOException, TimeoutException + { + Map<String, Object> config = new HashMap<>(); + config.put("authenticator", "org.apache.cassandra.auth.PasswordAuthenticator"); + config.put("authorizer", "org.apache.cassandra.auth.CassandraAuthorizer"); + config.put("role_manager", "org.apache.cassandra.auth.CassandraRoleManager"); + config.put("permissions_validity_in_ms", 0); + config.put("roles_validity_in_ms", 0); + + int originalNodeCount = 1; + int expandedNodeCount = originalNodeCount + 2; + Byteman byteman = Byteman.createFromScripts("test/resources/byteman/stream_failure.btm"); + try (Cluster cluster = init(Cluster.build(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(c -> { + config.forEach(c::set); + c.with(Feature.GOSSIP, Feature.NETWORK, Feature.NATIVE_PROTOCOL); + }) + .withInstanceInitializer((cl, nodeNumber) -> { + switch (nodeNumber) { + case 1: + case 2: + byteman.install(cl); + break; + } + }) + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk text primary key)"); + populate(cluster.get(1)); + cluster.forEach(c -> c.flush(KEYSPACE)); + + bootstrap(cluster, config, false); + // Test write survey behaviour + bootstrap(cluster, config, true); + } + } + + private static void bootstrap(Cluster cluster, + Map<String, Object> config, + boolean isWriteSurvey) throws TimeoutException + { + IInstanceConfig nodeConfig = cluster.newInstanceConfig(); + nodeConfig.set("auto_bootstrap", true); + config.forEach(nodeConfig::set); + + //TODO can we make this more isolated? + System.setProperty("cassandra.ring_delay_ms", "5000"); + if (isWriteSurvey) + System.setProperty("cassandra.write_survey", "true"); + + RewriteEnabled.enable(); + cluster.bootstrap(nodeConfig).startup(); + IInvokableInstance node = cluster.get(cluster.size()); + assertLogHas(node, "Some data streaming failed"); + assertLogHas(node, isWriteSurvey ? + "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled" : + "Node is not yet bootstrapped completely"); + + node.nodetoolResult("join").asserts() - .failure() - .errorContains("Cannot join the ring until bootstrap completes"); ++ .failure() ++ .errorContains("Cannot join the ring until bootstrap completes"); + + RewriteEnabled.disable(); + node.nodetoolResult("bootstrap", "resume").asserts().success(); + if (isWriteSurvey) + assertLogHas(node, "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled"); + + if (isWriteSurvey) + { + node.nodetoolResult("join").asserts().success(); + assertLogHas(node, "Leaving write survey mode and joining ring at operator request"); + } + + node.logs().watchFor("Starting listening for CQL clients"); + assertBootstrapState(node, "COMPLETED"); + } + + private static void assertBootstrapState(IInvokableInstance node, String expected) + { + SimpleQueryResult qr = node.executeInternalWithResult("SELECT bootstrapped FROM system.local WHERE key='local'"); + Assert.assertTrue("No rows found", qr.hasNext()); + Assert.assertEquals(expected, qr.next().getString("bootstrapped")); + } + + private static void assertLogHas(IInvokableInstance node, String msg) + { + LogResult<List<String>> results = node.logs().grep(msg); + Assert.assertFalse("Unable to find '" + msg + "'", results.getResult().isEmpty()); + } + + private void populate(IInvokableInstance inst) + { + for (int i = 0; i < 10; i++) + inst.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)", Integer.toString(i)); + } + + @Shared + public static final class RewriteEnabled + { + private static volatile boolean enabled = false; + + public static boolean isEnabled() + { + return enabled; + } + + public static void enable() + { + enabled = true; + } + + public static void disable() + { + enabled = false; + } + } + } diff --cc test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java index 0000000,1d23ac7..1d9029b mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java @@@ -1,0 -1,192 +1,79 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + -import java.io.ByteArrayOutputStream; + import java.io.IOException; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Objects; + -import org.junit.Assert; + import org.junit.Test; + + import com.datastax.driver.core.Session; -import org.apache.cassandra.db.marshal.CompositeType; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.Feature; + import org.apache.cassandra.distributed.api.IInvokableInstance; ++import org.apache.cassandra.distributed.api.NodeToolResult; + import org.apache.cassandra.distributed.api.QueryResults; + import org.apache.cassandra.distributed.api.SimpleQueryResult; + import org.apache.cassandra.distributed.shared.AssertUtils; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.Mutation; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.thrift.TException; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; + + public class ClientNetworkStopStartTest extends TestBaseImpl + { + /** + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-16127">CASSANDRA-16127</a> + */ + @Test - public void stopStartThrift() throws IOException, TException - { - try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start())) - { - IInvokableInstance node = cluster.get(1); - assertTransportStatus(node, "binary", true); - assertTransportStatus(node, "thrift", true); - node.nodetoolResult("disablethrift").asserts().success(); - assertTransportStatus(node, "binary", true); - assertTransportStatus(node, "thrift", false); - node.nodetoolResult("enablethrift").asserts().success(); - assertTransportStatus(node, "binary", true); - assertTransportStatus(node, "thrift", true); - - // now use it to make sure it still works! - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))"); - - ThriftClientUtils.thriftClient(node, thrift -> { - thrift.set_keyspace(KEYSPACE); - Mutation mutation = new Mutation(); - ColumnOrSuperColumn csoc = new ColumnOrSuperColumn(); - Column column = new Column(); - column.setName(CompositeType.build(ByteBufferUtil.bytes("value"))); - column.setValue(ByteBufferUtil.bytes(0)); - column.setTimestamp(System.currentTimeMillis()); - csoc.setColumn(column); - mutation.setColumn_or_supercolumn(csoc); - - thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0), - Collections.singletonMap("tbl", Arrays.asList(mutation))), - org.apache.cassandra.thrift.ConsistencyLevel.ALL); - }); - - SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); - AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build()); - } - } - - /** - * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-16127">CASSANDRA-16127</a> - */ - @Test + public void stopStartNative() throws IOException + { - try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start())) ++ //TODO why does trunk need GOSSIP for native to work but no other branch does? ++ try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)).start())) + { + IInvokableInstance node = cluster.get(1); + assertTransportStatus(node, "binary", true); - assertTransportStatus(node, "thrift", true); + node.nodetoolResult("disablebinary").asserts().success(); + assertTransportStatus(node, "binary", false); - assertTransportStatus(node, "thrift", true); + node.nodetoolResult("enablebinary").asserts().success(); + assertTransportStatus(node, "binary", true); - assertTransportStatus(node, "thrift", true); + + // now use it to make sure it still works! + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))"); + + try (com.datastax.driver.core.Cluster client = com.datastax.driver.core.Cluster.builder().addContactPoints(node.broadcastAddress().getAddress()).build(); + Session session = client.connect()) + { + session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, value) VALUES (?, ?)", 0, 0); + } + + SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); + AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build()); + } + } + + private static void assertTransportStatus(IInvokableInstance node, String transport, boolean running) + { + assertNodetoolStdout(node, running ? "running" : "not running", running ? "not running" : null, "status" + transport); + } + + private static void assertNodetoolStdout(IInvokableInstance node, String expectedStatus, String notExpected, String... nodetool) + { - // without CASSANDRA-16057 need this hack - PrintStream previousStdout = System.out; - try - { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - PrintStream stdout = new PrintStream(out, true); - System.setOut(stdout); - - node.nodetoolResult(nodetool).asserts().success(); - - stdout.flush(); - String output = out.toString(); - Assert.assertThat(output, new StringContains(expectedStatus)); - if (notExpected != null) - Assert.assertThat(output, new StringNotContains(notExpected)); - } - finally - { - System.setOut(previousStdout); - } - } - - private static final class StringContains extends BaseMatcher<String> - { - private final String expected; - - private StringContains(String expected) - { - this.expected = Objects.requireNonNull(expected); - } - - public boolean matches(Object o) - { - return o.toString().contains(expected); - } - - public void describeTo(Description description) - { - description.appendText("Expected to find '" + expected + "', but did not"); - } - } - - private static final class StringNotContains extends BaseMatcher<String> - { - private final String notExpected; - - private StringNotContains(String expected) - { - this.notExpected = Objects.requireNonNull(expected); - } - - public boolean matches(Object o) - { - return !o.toString().contains(notExpected); - } - - public void describeTo(Description description) - { - description.appendText("Expected not to find '" + notExpected + "', but did"); - } ++ NodeToolResult result = node.nodetoolResult(nodetool); ++ result.asserts().success().stdoutContains(expectedStatus); ++ if (notExpected != null) ++ result.asserts().stdoutNotContains(notExpected); + } + } diff --cc test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java index 731a87e,0000000..0fff01c mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java @@@ -1,199 -1,0 +1,196 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.net.InetSocketAddress; - import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; ++import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Session; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy; +import org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.Event; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces; +import static org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.OneNetworkInterface; +import static org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Down; +import static org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Remove; +import static org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Up; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@RunWith(Parameterized.class) +public class TopologyChangeTest extends TestBaseImpl +{ + static class EventStateListener implements Host.StateListener + { + enum EventType + { + Add, + Up, + Down, + Remove, + } + + static class Event + { + InetSocketAddress host; + EventType type; + + Event(EventType type, Host host) + { + this.type = type; + this.host = host.getBroadcastSocketAddress(); + } + + public Event(EventType type, IInvokableInstance iInvokableInstance) + { + this.type = type; + this.host = iInvokableInstance.broadcastAddress(); + } + + + public String toString() + { + return "Event{" + + "host='" + host + '\'' + + ", type=" + type + + '}'; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Event event = (Event) o; + return Objects.equals(host, event.host) && + type == event.type; + } + + public int hashCode() + { + return Objects.hash(host, type); + } + } + - private List<Event> events = new ArrayList<>(); ++ private final List<Event> events = new CopyOnWriteArrayList<>(); + + public void onAdd(Host host) + { + events.add(new Event(EventType.Add, host)); + } + + public void onUp(Host host) + { + events.add(new Event(Up, host)); + } + + public void onDown(Host host) + { + events.add(new Event(EventType.Down, host)); + } + + public void onRemove(Host host) + { + events.add(new Event(Remove, host)); + } + + public void onRegister(com.datastax.driver.core.Cluster cluster) + { + } + + public void onUnregister(com.datastax.driver.core.Cluster cluster) + { + } + } + + @Parameterized.Parameter(0) + public Strategy strategy; + + @Parameterized.Parameters(name = "{index}: provision strategy={0}") + public static Collection<Strategy[]> data() + { + return Arrays.asList(new Strategy[][]{ { MultipleNetworkInterfaces }, + { OneNetworkInterface } + }); + } + + @Test + public void testDecommission() throws Throwable + { - try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy).withConfig( - config -> { - config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL); - }).start())) ++ try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy) ++ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)).start()); ++ com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); ++ Session session = cluster.connect()) + { - final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); - Session session = cluster.connect(); + EventStateListener eventStateListener = new EventStateListener(); + session.getCluster().register(eventStateListener); - control.get(3).nodetool("disablebinary"); - control.get(3).nodetool("decommission", "-f"); ++ ++ control.get(3).nodetoolResult("disablebinary").asserts().success(); ++ control.get(3).nodetoolResult("decommission", "-f").asserts().success(); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertEquals(2, cluster.getMetadata().getAllHosts().size())); - assertThat(eventStateListener.events).containsExactly(new Event(Remove, control.get(3))); - session.close(); - cluster.close(); ++ session.getCluster().unregister(eventStateListener); ++ // DOWN UP can also be seen if the jvm is slow and connections are closed; to avoid this make sure to use ++ // containsSequence to check that down/remove happen in this order ++ assertThat(eventStateListener.events).containsSequence(new Event(Down, control.get(3)), new Event(Remove, control.get(3))); + } + } + + @Test + public void testRestartNode() throws Throwable + { - try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy).withConfig( - config -> { - config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL); - }).start())) ++ try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy) ++ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)).start()); ++ com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); ++ Session session = cluster.connect()) + { - final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build(); - Session session = cluster.connect(); + EventStateListener eventStateListener = new EventStateListener(); + session.getCluster().register(eventStateListener); + + control.get(3).shutdown().get(); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertEquals(2, cluster.getMetadata().getAllHosts().stream().filter(h -> h.isUp()).count())); + + control.get(3).startup(); + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertEquals(3, cluster.getMetadata().getAllHosts().stream().filter(h -> h.isUp()).count())); + - assertThat(eventStateListener.events).containsExactly(new Event(Down, control.get(3)), - new Event(Up, control.get(3))); - - session.close(); - cluster.close(); ++ // DOWN UP can also be seen if the jvm is slow and connections are closed, but make sure it at least happens once ++ // given the node restarts ++ assertThat(eventStateListener.events).containsSequence(new Event(Down, control.get(3)), ++ new Event(Up, control.get(3))); + } + } +} + diff --cc test/resources/byteman/stream_failure.btm index 0000000,e40f7fe..768c7a3 mode 000000,100644..100644 --- a/test/resources/byteman/stream_failure.btm +++ b/test/resources/byteman/stream_failure.btm @@@ -1,0 -1,14 +1,14 @@@ + # + # Inject streaming failure + # + # Before start streaming files in `StreamSession#prepare()` method, + # interrupt streaming by throwing RuntimeException. + # + RULE inject stream failure + CLASS org.apache.cassandra.streaming.StreamSession -METHOD prepare -AT INVOKE maybeCompleted ++METHOD prepareAck ++AT INVOKE startStreamingFiles + IF org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest$RewriteEnabled.isEnabled() + DO + throw new java.lang.RuntimeException("Triggering network failure") -ENDRULE ++ENDRULE --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org