This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cassandra-2.2 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push: new 8dcaa12 Allow instance class loaders to be garbage collected for inJVM dtest 8dcaa12 is described below commit 8dcaa12baa97ce870f23ff9045f968f2fa28b2cc Author: Jon Meredith <jmeredit...@gmail.com> AuthorDate: Thu Aug 15 10:12:06 2019 -0600 Allow instance class loaders to be garbage collected for inJVM dtest Backport support optional network/gossip test features, instance generations and subnet support from trunk. Backport MessageFilter and cluster builder API changes to match trunk (except for the change in namespace for Verbs). Add a test for repeatedly creating/tearing down in-JVM dtest clusters to help find resource leaks. Change IsolatedExecutor to clean up on an executor with zero core threads so that it exits sooner enabling GC on the InstanceClassLoader. Moved classloader close after shutdown to prevent improve logging when the isolated executor is shutdown. Update the logback config for dtests to make it obvious which log threads are for instances vs the main logger. Disable native library loading until it can be reinstated (tracked in CASSANDRA-15170). Shutdown various executors and threads that were preventing the instance classloader from being unloaded. Add test-jvm-dtest-some ant target Patch by Jon Meredith; Reviewed by Alex Petrov and Benedict Elliott Smith for CASSANDRA-15170 --- CHANGES.txt | 1 + build.xml | 22 ++- .../cassandra/concurrent/InfiniteLoopExecutor.java | 2 +- .../cassandra/concurrent/ScheduledExecutors.java | 11 +- .../cassandra/concurrent/SharedExecutorPool.java | 4 +- .../apache/cassandra/concurrent/StageManager.java | 8 +- .../org/apache/cassandra/db/BatchlogManager.java | 11 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 14 +- .../apache/cassandra/db/HintedHandOffManager.java | 9 + .../apache/cassandra/db/commitlog/CommitLog.java | 1 + src/java/org/apache/cassandra/gms/Gossiper.java | 9 + .../cassandra/io/sstable/IndexSummaryManager.java | 11 ++ .../cassandra/io/sstable/format/SSTableReader.java | 5 +- .../org/apache/cassandra/net/MessagingService.java | 7 +- .../cassandra/net/OutboundTcpConnection.java | 2 +- .../apache/cassandra/service/CassandraDaemon.java | 3 +- .../service/PendingRangeCalculatorService.java | 11 +- .../apache/cassandra/service/StorageService.java | 29 ++- .../cassandra/streaming/StreamCoordinator.java | 13 ++ .../cassandra/utils/BackgroundActivityMonitor.java | 12 +- src/java/org/apache/cassandra/utils/CLibrary.java | 9 +- .../org/apache/cassandra/utils/ExecutorUtils.java | 151 ++++++++++++++++ .../utils/NanoTimeToCurrentTimeMillis.java | 14 +- .../org/apache/cassandra/utils/concurrent/Ref.java | 8 +- .../cassandra/utils/memory/MemtablePool.java | 8 +- test/conf/logback-dtest.xml | 20 +- .../org/apache/cassandra/distributed/Cluster.java | 20 +- .../cassandra/distributed/UpgradeableCluster.java | 22 +-- .../api/{IMessageFilters.java => Feature.java} | 28 +-- .../cassandra/distributed/api/IInstance.java | 2 + .../cassandra/distributed/api/IInstanceConfig.java | 1 + .../cassandra/distributed/api/IMessageFilters.java | 6 +- .../distributed/impl/AbstractCluster.java | 138 +++++++++----- .../cassandra/distributed/impl/Instance.java | 165 ++++++++++++++--- .../distributed/impl/InstanceClassLoader.java | 9 +- .../cassandra/distributed/impl/InstanceConfig.java | 28 ++- .../distributed/impl/IsolatedExecutor.java | 47 ++++- .../cassandra/distributed/impl/MessageFilters.java | 31 +--- .../distributed/test/DistributedTestBase.java | 28 +++ .../distributed/test/ResourceLeakTest.java | 201 +++++++++++++++++++++ .../cassandra/concurrent/SEPExecutorTest.java | 3 +- 41 files changed, 893 insertions(+), 231 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index eabdcaa..caea0f4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ * Fixing invalid CQL in security documentation (CASSANDRA-15020) * Make tools/bin/token-generator py2/3 compatible (CASSANDRA-15012) * Multi-version in-JVM dtests (CASSANDRA-14937) + * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170) 2.2.14 diff --git a/build.xml b/build.xml index d522b59..b42c3ed 100644 --- a/build.xml +++ b/build.xml @@ -1818,6 +1818,10 @@ <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/> <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/> <jvmarg value="-Dcassandra.skip_sync=true" /> + <jvmarg value="-XX:MaxMetaspaceSize=256M" /> + <jvmarg value="-XX:SoftRefLRUPolicyMSPerMB=0" /> + <jvmarg value="-XX:+HeapDumpOnOutOfMemoryError" /> + <jvmarg value="-XX:HeapDumpPath=build/test/oom.hprof" /> </testmacro> </target> @@ -1851,10 +1855,24 @@ </jar> </target> - <target name="test-jvm-dtest" depends="build-test" description="Execute unit tests"> + <target name="test-jvm-dtest" depends="build-test" description="Execute in-jvm dtests"> <testmacro inputdir="${test.distributed.src}" timeout="${test.distributed.timeout}" forkmode="once" showoutput="true" filter="**/test/*Test.java"> <jvmarg value="-Dlogback.configurationFile=test/conf/logback-dtest.xml"/> - <jvmarg value="-Dcassandra.ring_delay_ms=1000"/> + <jvmarg value="-Dcassandra.ring_delay_ms=10000"/> + <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/> + <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/> + <jvmarg value="-Dcassandra.skip_sync=true" /> + </testmacro> + </target> + + <!-- Use this with an FQDN for test class, and a csv list of methods like this: + ant test-jvm-dtest-some -Dtest.name=org.apache.cassandra.distributed.test.ResourceLeakTest -Dtest.methods=looperTest + --> + <target name="test-jvm-dtest-some" depends="build-test" description="Execute some in-jvm dtests"> + <testmacro inputdir="${test.distributed.src}" timeout="${test.distributed.timeout}" forkmode="once" showoutput="true"> + <test name="${test.name}" methods="${test.methods}" outfile="build/test/output/TEST-${test.name}-${test.methods}"/> + <jvmarg value="-Dlogback.configurationFile=test/conf/logback-dtest.xml"/> + <jvmarg value="-Dcassandra.ring_delay_ms=10000"/> <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/> <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/> <jvmarg value="-Dcassandra.skip_sync=true" /> diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java index 473edb7..af41513 100644 --- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java @@ -76,7 +76,7 @@ public class InfiniteLoopExecutor return this; } - public void shutdown() + public void shutdownNow() { isShutdown = true; thread.interrupt(); diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java index 489f58e..13d27a8 100644 --- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java +++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java @@ -19,9 +19,12 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.utils.ExecutorUtils; + /** * Centralized location for shared executors */ @@ -43,12 +46,8 @@ public class ScheduledExecutors public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks"); @VisibleForTesting - public static void shutdownAndWait() throws InterruptedException + public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - ExecutorService[] executors = new ExecutorService[] { scheduledTasks, nonPeriodicTasks, optionalTasks }; - for (ExecutorService executor : executors) - executor.shutdownNow(); - for (ExecutorService executor : executors) - executor.awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownNowAndWait(timeout, unit, scheduledTasks, nonPeriodicTasks, optionalTasks); } } diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index d355d77..50cc5a3 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -114,7 +114,7 @@ public class SharedExecutorPool return executor; } - public void shutdown() throws InterruptedException + public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException { shuttingDown = true; for (SEPExecutor executor : executors) @@ -122,7 +122,7 @@ public class SharedExecutorPool terminateWorkers(); - long until = System.nanoTime() + TimeUnit.MINUTES.toNanos(1L); + long until = System.nanoTime() + unit.toNanos(timeout); for (SEPExecutor executor : executors) executor.shutdown.await(until - System.nanoTime(), TimeUnit.NANOSECONDS); } diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 5e0a667..01bfb7c 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.config.DatabaseDescriptor.*; @@ -121,12 +122,9 @@ public class StageManager }; @VisibleForTesting - public static void shutdownAndWait() throws InterruptedException + public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - for (Stage stage : Stage.values()) - StageManager.stages.get(stage).shutdown(); - for (Stage stage : Stage.values()) - StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownNowAndWait(timeout, unit, StageManager.stages.values()); } /** diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 9a2d1f6..40f8ce0 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -19,13 +19,11 @@ package org.apache.cassandra.db; import java.io.DataInputStream; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; -import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; @@ -54,7 +52,9 @@ import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.WrappedRunnable; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; @@ -74,7 +74,7 @@ public class BatchlogManager implements BatchlogManagerMBean public void start() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + MBeanWrapper mbs = MBeanWrapper.instance; try { mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); @@ -95,10 +95,9 @@ public class BatchlogManager implements BatchlogManagerMBean batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS); } - public static void shutdown() throws InterruptedException + public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - batchlogTasks.shutdown(); - batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownAndWait(timeout, unit, batchlogTasks); } public int countAllBatches() diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index d26cd61..01330a6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -88,6 +88,8 @@ import org.apache.cassandra.utils.memory.MemtableAllocator; import com.clearspring.analytics.stream.Counter; +import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; +import static org.apache.cassandra.utils.ExecutorUtils.shutdown; import static org.apache.cassandra.utils.Throwables.maybeFail; public class ColumnFamilyStore implements ColumnFamilyStoreMBean @@ -192,24 +194,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public volatile long sampleLatencyNanos; private final ScheduledFuture<?> latencyCalculator; - public static void shutdownFlushExecutor() throws InterruptedException - { - flushExecutor.shutdown(); - flushExecutor.awaitTermination(60, TimeUnit.SECONDS); - } - public static void shutdownPostFlushExecutor() throws InterruptedException { postFlushExecutor.shutdown(); postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS); } - public static void shutdownReclaimExecutor() throws InterruptedException + public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - reclaimExecutor.shutdown(); - reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownAndWait(timeout, unit, reclaimExecutor, postFlushExecutor, flushExecutor); } + public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 95af9ba..7a570d2 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -68,6 +68,9 @@ import java.util.List; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; +import static org.apache.cassandra.utils.ExecutorUtils.shutdown; + /** * The hint schema looks like this: * @@ -619,4 +622,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } + @VisibleForTesting + public void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException + { + shutdown(executor, hintDeliveryExecutor); + awaitTermination(timeout, units, executor, hintDeliveryExecutor); + } } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 2f0179d..6dd519a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -382,6 +382,7 @@ public class CommitLog implements CommitLogMBean /** * Shuts down the threads used by the commit log, blocking until completion. + * TODO this should accept a timeout, and throw TimeoutException */ public void shutdownBlocking() throws InterruptedException { diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 831c252..bd4fe13 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; @@ -47,6 +48,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; +import static org.apache.cassandra.utils.ExecutorUtils.shutdown; /** * This module is responsible for Gossiping information for the local endpoint. This abstraction @@ -1547,4 +1550,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return System.currentTimeMillis() + Gossiper.aVeryLongTime; } + @VisibleForTesting + public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + stop(); + ExecutorUtils.shutdownAndWait(timeout, unit, executor); + } } diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 3ebbb6e..9317132 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; @@ -43,11 +44,15 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; +import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; +import static org.apache.cassandra.utils.ExecutorUtils.shutdown; + /** * Manages the fixed-size memory pool for index summaries, periodically resizing them * in order to give more memory to hot sstables and less memory to cold sstables. @@ -257,4 +262,10 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean { return CompactionManager.instance.runIndexSummaryRedistribution(redistribution); } + + @VisibleForTesting + public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + ExecutorUtils.shutdownAndWait(timeout, unit, executor); + } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 81af9f0..16fa6c9 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -2326,10 +2326,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } - public static void shutdownBlocking() throws InterruptedException + public static void shutdownBlocking(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - syncExecutor.shutdownNow(); - syncExecutor.awaitTermination(0, TimeUnit.SECONDS); + ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor); resetTidying(); } } diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index e7ce964..e42b91b 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -775,6 +775,8 @@ public final class MessagingService implements MessagingServiceMBean // see https://issues.apache.org/jira/browse/CASSANDRA-10545 handleIOException(e); } + + connectionManagers.values().forEach(OutboundTcpConnectionPool::close); } catch (IOException e) { @@ -1063,7 +1065,10 @@ public final class MessagingService implements MessagingServiceMBean { // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20 // see https://bugs.openjdk.java.net/browse/JDK-8050499 - if (!"Unknown error: 316".equals(e.getMessage()) || !"Mac OS X".equals(System.getProperty("os.name"))) + if ((!"Unknown error: 316".equals(e.getMessage()) || !"Mac OS X".equals(System.getProperty("os.name"))) && + !"Thread signal failed".equals(e.getMessage()) && // handle shutdown for in-JVM dtests + !"Bad file descriptor".equals(e.getMessage()) && + !"No such file or directory".equals(e.getMessage())) throw e; } diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 4cfe019..e8346d8 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -392,7 +392,7 @@ public class OutboundTcpConnection extends Thread long start = System.nanoTime(); long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout()); - while (System.nanoTime() - start < timeout) + while (System.nanoTime() - start < timeout && !isStopped) { targetVersion = MessagingService.instance().getVersion(poolReference.endPoint()); try diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 8f6c9c2..1380f43 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -601,7 +601,8 @@ public class CassandraDaemon } } - private void waitForGossipToSettle() + @VisibleForTesting + public static void waitForGossipToSettle() { int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1); if (forceAfter == 0) diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index e82b0bb..a7ee333 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -23,15 +23,21 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.utils.ExecutorUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; +import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination; +import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow; + public class PendingRangeCalculatorService { public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService(); @@ -112,9 +118,8 @@ public class PendingRangeCalculatorService } @VisibleForTesting - public void shutdownExecutor() throws InterruptedException + public void shutdownExecutor(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - executor.shutdown(); - executor.awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownNowAndWait(timeout, unit, executor); } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c5c1ca6..0a9a8da 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.sql.Time; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; @@ -79,6 +80,8 @@ import org.apache.cassandra.utils.progress.ProgressEventType; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport; +import static java.util.concurrent.TimeUnit.MINUTES; + /** * This abstraction contains the token/identifier of this node * on the identifier space. This token gets gossiped around. @@ -659,7 +662,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // wait for miscellaneous tasks like sstable and commitlog segment deletion ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES)) logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); } }, "StorageServiceShutdownHook"); @@ -1365,9 +1368,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return bgMonitor.getSeverity(endpoint); } - public void shutdownBGMonitor() + public void shutdownBGMonitorAndWait(long timeout, TimeUnit units) throws TimeoutException, InterruptedException { - bgMonitor.shutdown(); + bgMonitor.shutdownAndWait(timeout, units); } /** @@ -4067,7 +4070,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE remainingCFs--; } - BatchlogManager.shutdown(); + try + { + /* not clear this is reasonable time, but propagated from prior embedded behaviour */ + BatchlogManager.shutdownAndWait(1L, MINUTES); + } + catch (TimeoutException t) + { + logger.error("Batchlog manager timed out shutting down", t); + } // Interrupt on going compaction and shutdown to prevent further compaction CompactionManager.instance.forceShutdown(); @@ -4093,7 +4104,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // wait for miscellaneous tasks like sstable and commitlog segment deletion ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES)) logger.warn("Failed to wait for non periodic tasks to shutdown"); ColumnFamilyStore.shutdownPostFlushExecutor(); @@ -4551,4 +4562,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB)); } + @VisibleForTesting + public void shutdownServer() + { + if (drainOnShutdown != null) + { + Runtime.getRuntime().removeShutdownHook(drainOnShutdown); + } + } } diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 603366d..e0948c9 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -19,11 +19,17 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; /** @@ -290,4 +296,11 @@ public class StreamCoordinator return sessionInfos.values(); } } + + @VisibleForTesting + public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + ExecutorUtils.shutdownAndWait(timeout, unit, streamExecutor); + } + } diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java index ab81c20..711c5dd 100644 --- a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java +++ b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java @@ -28,6 +28,7 @@ import java.net.InetAddress; import java.util.StringTokenizer; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.gms.ApplicationState; @@ -143,16 +144,9 @@ public class BackgroundActivityMonitor return 0.0; } - public void shutdown() + public void shutdownAndWait(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException { - reportThread.shutdown(); - try - { - reportThread.awaitTermination(1L, TimeUnit.MINUTES); - } catch (InterruptedException e) - { - throw new IllegalStateException(); - } + ExecutorUtils.shutdownAndWait(timeout, unit, reportThread); } public class BackgroundActivityReporter implements Runnable diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java index b6598ec..e3bec4f 100644 --- a/src/java/org/apache/cassandra/utils/CLibrary.java +++ b/src/java/org/apache/cassandra/utils/CLibrary.java @@ -60,7 +60,14 @@ public final class CLibrary { try { - Native.register("c"); + if (Boolean.getBoolean("cassandra.disable_clibrary")) + { + jnaAvailable = false; + } + else + { + Native.register("c"); + } } catch (NoClassDefFoundError e) { diff --git a/src/java/org/apache/cassandra/utils/ExecutorUtils.java b/src/java/org/apache/cassandra/utils/ExecutorUtils.java new file mode 100644 index 0000000..21933a3 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/ExecutorUtils.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cassandra.concurrent.InfiniteLoopExecutor; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class ExecutorUtils +{ + + public static Runnable runWithThreadName(Runnable runnable, String threadName) + { + return () -> { + String oldThreadName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName(threadName); + runnable.run(); + } + finally + { + Thread.currentThread().setName(oldThreadName); + } + }; + } + + public static void shutdownNow(Iterable<?> executors) + { + shutdown(true, executors); + } + + public static void shutdown(Iterable<?> executors) + { + shutdown(false, executors); + } + + public static void shutdown(boolean interrupt, Iterable<?> executors) + { + for (Object executor : executors) + { + if (executor instanceof ExecutorService) + { + if (interrupt) ((ExecutorService) executor).shutdownNow(); + else ((ExecutorService) executor).shutdown(); + } + else if (executor instanceof InfiniteLoopExecutor) + ((InfiniteLoopExecutor) executor).shutdownNow(); + else if (executor instanceof Thread) + ((Thread) executor).interrupt(); + else if (executor != null) + throw new IllegalArgumentException(executor.toString()); + } + } + + public static void shutdown(ExecutorService ... executors) + { + shutdown(Arrays.asList(executors)); + } + + public static void shutdownNow(ExecutorService ... executors) + { + shutdownNow(Arrays.asList(executors)); + } + + public static void awaitTermination(long timeout, TimeUnit unit, ExecutorService ... executors) throws InterruptedException, TimeoutException + { + awaitTermination(timeout, unit, Arrays.asList(executors)); + } + + public static void awaitTermination(long timeout, TimeUnit unit, Collection<?> executors) throws InterruptedException, TimeoutException + { + long deadline = System.nanoTime() + unit.toNanos(timeout); + awaitTerminationUntil(deadline, executors); + } + + public static void awaitTerminationUntil(long deadline, Collection<?> executors) throws InterruptedException, TimeoutException + { + for (Object executor : executors) + { + long wait = deadline - System.nanoTime(); + if (executor instanceof ExecutorService) + { + if (wait <= 0 || !((ExecutorService)executor).awaitTermination(wait, NANOSECONDS)) + throw new TimeoutException(executor + " did not terminate on time"); + } + else if (executor instanceof InfiniteLoopExecutor) + { + if (wait <= 0 || !((InfiniteLoopExecutor)executor).awaitTermination(wait, NANOSECONDS)) + throw new TimeoutException(executor + " did not terminate on time"); + } + else if (executor instanceof Thread) + { + Thread t = (Thread) executor; + if (wait <= 0) + throw new TimeoutException(executor + " did not terminate on time"); + t.join((wait + 999999) / 1000000L, (int) (wait % 1000000L)); + if (t.isAlive()) + throw new TimeoutException(executor + " did not terminate on time"); + } + else if (executor != null) + { + throw new IllegalArgumentException(executor.toString()); + } + } + } + + public static void shutdownAndWait(long timeout, TimeUnit unit, Collection<?> executors) throws TimeoutException, InterruptedException + { + shutdown(executors); + awaitTermination(timeout, unit, executors); + } + + public static void shutdownNowAndWait(long timeout, TimeUnit unit, Collection<?> executors) throws TimeoutException, InterruptedException + { + shutdownNow(executors); + awaitTermination(timeout, unit, executors); + } + + public static void shutdownAndWait(long timeout, TimeUnit unit, Object ... executors) throws TimeoutException, InterruptedException + { + shutdownAndWait(timeout, unit, Arrays.asList(executors)); + } + + public static void shutdownNowAndWait(long timeout, TimeUnit unit, Object ... executors) throws TimeoutException, InterruptedException + { + shutdownNowAndWait(timeout, unit, Arrays.asList(executors)); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java index f124383..9d42acb 100644 --- a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java +++ b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java @@ -39,6 +39,8 @@ public class NanoTimeToCurrentTimeMillis @VisibleForTesting public static final Object TIMESTAMP_UPDATE = new Object(); + private static final Thread updater; + /* * System.currentTimeMillis() is 25 nanoseconds. This is 2 nanoseconds (maybe) according to JMH. * Faster than calling both currentTimeMillis() and nanoTime(). @@ -57,7 +59,7 @@ public class NanoTimeToCurrentTimeMillis static { //Pick up updates from NTP periodically - Thread t = new Thread("NanoTimeToCurrentTimeMillis updater") + updater = new Thread("NanoTimeToCurrentTimeMillis updater") { @Override public void run() @@ -82,7 +84,13 @@ public class NanoTimeToCurrentTimeMillis } } }; - t.setDaemon(true); - t.start(); + updater.setDaemon(true); + updater.start(); + } + + public static void shutdown(long millis) throws InterruptedException + { + updater.interrupt(); + updater.join(millis); } } diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java index e1cc7ff..c009032 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java @@ -22,6 +22,7 @@ package org.apache.cassandra.utils.concurrent; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Set; @@ -35,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable; -import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.utils.ExecutorUtils; import static org.apache.cassandra.utils.Throwables.maybeFail; import static org.apache.cassandra.utils.Throwables.merge; @@ -343,9 +344,8 @@ public final class Ref<T> implements RefCounted<T> } @VisibleForTesting - public static void shutdownReferenceReaper() throws InterruptedException + public static void shutdownReferenceReaper(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - EXEC.shutdown(); - EXEC.awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownNowAndWait(timeout, unit, EXEC); } } diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index b4efaa6..9c4824a 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -19,10 +19,12 @@ package org.apache.cassandra.utils.memory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -62,12 +64,12 @@ public abstract class MemtablePool public abstract boolean needToCopyOnHeap(); @VisibleForTesting - public void shutdown() throws InterruptedException + public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - cleaner.shutdown(); - cleaner.awaitTermination(60, TimeUnit.SECONDS); + ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner); } + public abstract MemtableAllocator newAllocator(); /** diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml index b8899a3..4282fee 100644 --- a/test/conf/logback-dtest.xml +++ b/test/conf/logback-dtest.xml @@ -23,7 +23,7 @@ <!-- Shutdown hook ensures that async appender flushes --> <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> - <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> @@ -38,18 +38,18 @@ <encoder> <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> - <immediateFlush>false</immediateFlush> </encoder> + <immediateFlush>false</immediateFlush> </appender> - <appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender"> + <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <maxFlushTime>0</maxFlushTime> <queueSize>1024</queueSize> - <appender-ref ref="FILE"/> + <appender-ref ref="INSTANCEFILE"/> </appender> - <appender name="STDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> </encoder> @@ -58,7 +58,7 @@ </filter> </appender> - <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> </encoder> @@ -67,7 +67,7 @@ </filter> </appender> - <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> </encoder> @@ -79,8 +79,8 @@ <logger name="org.apache.hadoop" level="WARN"/> <root level="DEBUG"> - <appender-ref ref="ASYNCFILE" /> - <appender-ref ref="STDERR" /> - <appender-ref ref="STDOUT" /> + <appender-ref ref="INSTANCEASYNCFILE" /> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> </root> </configuration> diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java index c7f7675..95862b6 100644 --- a/test/distributed/org/apache/cassandra/distributed/Cluster.java +++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java @@ -20,8 +20,8 @@ package org.apache.cassandra.distributed; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.List; +import java.util.function.Consumer; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.impl.AbstractCluster; @@ -40,18 +40,24 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu super(root, version, configs, sharedClassLoader); } - protected IInvokableInstance newInstanceWrapper(Versions.Version version, InstanceConfig config) + protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config) { - return new Wrapper(version, config); + return new Wrapper(generation, version, config); } - public static Cluster create(int nodeCount) throws Throwable + public static Builder<IInvokableInstance, Cluster> build(int nodeCount) + { + return new Builder<>(nodeCount, Cluster::new); + } + + public static Cluster create(int nodeCount, Consumer<InstanceConfig> configUpdater) throws IOException { - return create(nodeCount, Cluster::new); + return build(nodeCount).withConfig(configUpdater).start(); } - public static Cluster create(int nodeCount, File root) + + public static Cluster create(int nodeCount) throws Throwable { - return create(nodeCount, Versions.CURRENT, root, Cluster::new); + return build(nodeCount).start(); } } diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java index 0c8e63a..232ef0b 100644 --- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java @@ -43,28 +43,24 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im super(root, version, configs, sharedClassLoader); } - protected IUpgradeableInstance newInstanceWrapper(Versions.Version version, InstanceConfig config) + protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config) { - return new Wrapper(version, config); + return new Wrapper(generation, version, config); } - public static UpgradeableCluster create(int nodeCount) throws Throwable - { - return create(nodeCount, UpgradeableCluster::new); - } - public static UpgradeableCluster create(int nodeCount, File root) + public static Builder<IUpgradeableInstance, UpgradeableCluster> build(int nodeCount) { - return create(nodeCount, Versions.CURRENT, root, UpgradeableCluster::new); + return new Builder<>(nodeCount, UpgradeableCluster::new); } - public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws IOException + public static UpgradeableCluster create(int nodeCount) throws Throwable { - return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), UpgradeableCluster::new); + return build(nodeCount).start(); } - public static UpgradeableCluster create(int nodeCount, Versions.Version version, File root) + + public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable { - return create(nodeCount, version, root, UpgradeableCluster::new); + return build(nodeCount).withVersion(version).start(); } - } diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java similarity index 56% copy from test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java copy to test/distributed/org/apache/cassandra/distributed/api/Feature.java index b5fde84..a5c9316 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java +++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java @@ -18,31 +18,7 @@ package org.apache.cassandra.distributed.api; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.MessagingService; - -import java.util.function.BiConsumer; - -public interface IMessageFilters +public enum Feature { - public interface Filter - { - Filter restore(); - Filter drop(); - } - - public interface Builder - { - Builder from(int ... nums); - Builder to(int ... nums); - Filter ready(); - Filter drop(); - } - - Builder verbs(MessagingService.Verb... verbs); - Builder allVerbs(); - void reset(); - - // internal - BiConsumer<InetAddressAndPort, IMessage> filter(BiConsumer<InetAddressAndPort, IMessage> applyIfNotFiltered); + NETWORK, GOSSIP } diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java index 3834093..d5382b4 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java @@ -37,7 +37,9 @@ public interface IInstance extends IIsolatedExecutor UUID schemaVersion(); void startup(); + boolean isShutdown(); Future<Void> shutdown(); + Future<Void> shutdown(boolean graceful); // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface void startup(ICluster cluster); diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java index 6741b3f..3e5a18f 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java @@ -38,4 +38,5 @@ public interface IInstanceConfig Object get(String fieldName); String getString(String fieldName); int getInt(String fieldName); + boolean has(Feature featureFlag); } diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java index b5fde84..f7c8094 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java @@ -18,11 +18,8 @@ package org.apache.cassandra.distributed.api; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; -import java.util.function.BiConsumer; - public interface IMessageFilters { public interface Filter @@ -44,5 +41,6 @@ public interface IMessageFilters void reset(); // internal - BiConsumer<InetAddressAndPort, IMessage> filter(BiConsumer<InetAddressAndPort, IMessage> applyIfNotFiltered); + boolean permit(IInstance from, IInstance to, int verb); + } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index c27d9bf..19fb7e5 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -30,11 +30,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -91,6 +89,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, // to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener) // before we instantiate any for a new instance private static final Logger logger = LoggerFactory.getLogger(AbstractCluster.class); + private static final AtomicInteger generation = new AtomicInteger(); private final File root; private final ClassLoader sharedClassLoader; @@ -104,6 +103,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance { + private final int generation; private final InstanceConfig config; private volatile IInvokableInstance delegate; private volatile Versions.Version version; @@ -112,21 +112,22 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, protected IInvokableInstance delegate() { if (delegate == null) - delegate = newInstance(); + delegate = newInstance(generation); return delegate; } - public Wrapper(Versions.Version version, InstanceConfig config) + public Wrapper(int generation, Versions.Version version, InstanceConfig config) { + this.generation = generation; this.config = config; this.version = version; // we ensure there is always a non-null delegate, so that the executor may be used while the node is offline - this.delegate = newInstance(); + this.delegate = newInstance(generation); } - private IInvokableInstance newInstance() + private IInvokableInstance newInstance(int generation) { - ClassLoader classLoader = new InstanceClassLoader(config.num(), version.classpath, sharedClassLoader); + ClassLoader classLoader = new InstanceClassLoader(generation, version.classpath, sharedClassLoader); return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader) .apply(config, classLoader); } @@ -136,6 +137,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, return config; } + public boolean isShutdown() + { + return isShutdown; + } + @Override public synchronized void startup() { @@ -149,10 +155,16 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, @Override public synchronized Future<Void> shutdown() { + return shutdown(true); + } + + @Override + public synchronized Future<Void> shutdown(boolean graceful) + { if (isShutdown) throw new IllegalStateException(); isShutdown = true; - Future<Void> future = delegate.shutdown(); + Future<Void> future = delegate.shutdown(graceful); delegate = null; return future; } @@ -187,19 +199,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, this.sharedClassLoader = sharedClassLoader; this.instances = new ArrayList<>(); this.instanceMap = new HashMap<>(); + int generation = AbstractCluster.generation.incrementAndGet(); for (InstanceConfig config : configs) { - I instance = newInstanceWrapper(version, config); + I instance = newInstanceWrapper(generation, version, config); instances.add(instance); // we use the config().broadcastAddressAndPort() here because we have not initialised the Instance I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance); if (null != prev) throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddressAndPort() + " vs " + prev.broadcastAddressAndPort()); } - this.filters = new MessageFilters(this); + this.filters = new MessageFilters(); } - protected abstract I newInstanceWrapper(Versions.Version version, InstanceConfig config); + protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config); /** * WARNING: we index from 1 here, for consistency with inet address! @@ -257,9 +270,12 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, { for (IInstance reportTo: instances) { + if (reportTo.isShutdown()) + continue; + for (IInstance reportFrom: instances) { - if (reportFrom == reportTo) + if (reportFrom == reportTo || reportFrom.isShutdown()) continue; int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion()); @@ -335,46 +351,83 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader); } - protected static <I extends IInstance, C extends AbstractCluster<I>> C - create(int nodeCount, Factory<I, C> factory) throws Throwable + public static class Builder<I extends IInstance, C extends AbstractCluster<I>> { - return create(nodeCount, Files.createTempDirectory("dtests").toFile(), factory); - } + private final int nodeCount; + private final Factory<I, C> factory; + private int subnet; + private File root; + private Versions.Version version; + private Consumer<InstanceConfig> configUpdater; + public Builder(int nodeCount, Factory<I, C> factory) + { + this.nodeCount = nodeCount; + this.factory = factory; + } - protected static <I extends IInstance, C extends AbstractCluster<I>> C - create(int nodeCount, File root, Factory<I, C> factory) - { - return create(nodeCount, Versions.CURRENT, root, factory); - } + public Builder<I, C> withSubnet(int subnet) + { + this.subnet = subnet; + return this; + } - protected static <I extends IInstance, C extends AbstractCluster<I>> C - create(int nodeCount, Versions.Version version, Factory<I, C> factory) throws IOException - { - return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), factory); - } + public Builder<I, C> withRoot(File root) + { + this.root = root; + return this; + } - protected static <I extends IInstance, C extends AbstractCluster<I>> C - create(int nodeCount, Versions.Version version, File root, Factory<I, C> factory) - { - root.mkdirs(); - setupLogging(root); + public Builder<I, C> withVersion(Versions.Version version) + { + this.version = version; + return this; + } - ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); + public Builder<I, C> withConfig(Consumer<InstanceConfig> updater) + { + this.configUpdater = updater; + return this; + } - List<InstanceConfig> configs = new ArrayList<>(); - long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount); - for (int i = 0 ; i < nodeCount ; ++i) + public C createWithoutStarting() throws IOException { - InstanceConfig config = InstanceConfig.generate(i + 1, root, String.valueOf(token)); - configs.add(config); - token += increment; + File root = this.root; + Versions.Version version = this.version; + + if (root == null) + root = Files.createTempDirectory("dtests").toFile(); + if (version == null) + version = Versions.CURRENT; + + root.mkdirs(); + setupLogging(root); + + ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); + + List<InstanceConfig> configs = new ArrayList<>(); + long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount); + for (int i = 0; i < nodeCount; ++i) + { + InstanceConfig config = InstanceConfig.generate(i + 1, subnet, root, String.valueOf(token)); + if (configUpdater != null) + configUpdater.accept(config); + configs.add(config); + token += increment; + } + + C cluster = factory.newCluster(root, version, configs, sharedClassLoader); + return cluster; } - C cluster = factory.newCluster(root, version, configs, sharedClassLoader); - cluster.startup(); - return cluster; + public C start() throws IOException + { + C cluster = createWithoutStarting(); + cluster.startup(); + return cluster; + } } + private static void setupLogging(File root) { try @@ -398,6 +451,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, public void close() { FBUtilities.waitOnFutures(instances.stream() + .filter(i -> !i.isShutdown()) .map(IInstance::shutdown) .collect(Collectors.toList()), 1L, TimeUnit.MINUTES); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index dce03ca..29426cb 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -35,9 +35,6 @@ import java.util.concurrent.Future; import java.util.function.BiConsumer; import java.util.function.Function; -import org.slf4j.LoggerFactory; - -import ch.qos.logback.classic.LoggerContext; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.SharedExecutorPool; import org.apache.cassandra.concurrent.StageManager; @@ -48,7 +45,9 @@ import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.db.BatchlogManager; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.HintedHandOffManager; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.SystemKeyspace; @@ -58,12 +57,14 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IListen; import org.apache.cassandra.distributed.api.IMessage; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.sstable.IndexSummaryManager; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; @@ -76,11 +77,17 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamCoordinator; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Ref; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + public class Instance extends IsolatedExecutor implements IInvokableInstance { public final IInstanceConfig config; @@ -93,6 +100,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance this.config = config; InstanceIDDefiner.setInstanceId(config.num()); FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address); + // Set the config at instance creation, possibly before startup() has run on all other instances. + // setMessagingVersions below will call runOnInstance which will instantiate + // the MessagingService and dependencies preventing later changes to network parameters. + Config.setOverrideLoadConfig(() -> loadConfig(config)); } public IInstanceConfig config() @@ -140,6 +151,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance throw new UnsupportedOperationException(); } + public boolean isShutdown() + { + throw new UnsupportedOperationException(); + } + @Override public void schemaChangeInternal(String query) { @@ -166,7 +182,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance private void registerMockMessaging(ICluster cluster) { BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message); - BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = cluster.filters().filter(deliverToInstance); + BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> { + if (cluster.filters().permit(this, cluster.get(to), message.verb())) + deliverToInstance.accept(to, message); + }; Map<InetAddress, InetAddressAndPort> addressAndPortMap = new HashMap<>(); cluster.stream().forEach(instance -> { @@ -182,6 +201,26 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get)); } + // unnecessary if registerMockMessaging used + private void registerFilter(ICluster cluster) + { + IInstance instance = this; + MessagingService.instance().addMessageSink(new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress) + { + // Port is not passed in, so take a best guess at the destination port from this instance + IInstance to = cluster.get(InetAddressAndPort.getByAddressOverrideDefaults(toAddress, instance.config().broadcastAddressAndPort().port)); + return cluster.filters().permit(instance, to, message.verb.ordinal()); + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return true; + } + }); + } + private class MessageDeliverySink implements IMessageSink { private final BiConsumer<InetAddressAndPort, IMessage> deliver; @@ -200,6 +239,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance assert from.equals(lookupAddressAndPort.apply(messageOut.from)); InetAddressAndPort toFull = lookupAddressAndPort.apply(to); int version = MessagingService.instance().getVersion(to); + + out.writeInt(MessagingService.PROTOCOL_MAGIC); + out.writeInt(id); + long timestamp = System.currentTimeMillis(); + out.writeInt((int) timestamp); messageOut.serialize(out, version); deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from)); } @@ -217,14 +261,45 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } } - public void receiveMessage(IMessage message) + public void receiveMessage(IMessage imessage) { sync(() -> { - try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(message.bytes()))) + // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage + try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(imessage.bytes()))) { - MessageIn<?> messageIn = MessageIn.read(in, message.version(), message.id()); - Runnable deliver = new MessageDeliveryTask(messageIn, message.id(), System.currentTimeMillis(), false); - deliver.run(); + int version = imessage.version(); + + MessagingService.validateMagic(input.readInt()); + int id; + if (version < MessagingService.VERSION_20) + id = Integer.parseInt(input.readUTF()); + else + id = input.readInt(); + assert imessage.id() == id; + + long timestamp = System.currentTimeMillis(); + boolean isCrossNodeTimestamp = false; + + // make sure to readInt, even if cross_node_to is not enabled + int partial = input.readInt(); + if (DatabaseDescriptor.hasCrossNodeTimeout()) + { + long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); + isCrossNodeTimestamp = (timestamp != crossNodeTimestamp); + timestamp = crossNodeTimestamp; + } + + MessageIn message = MessageIn.read(input, version, id); + if (message == null) + { + // callback expired; nothing to do + return; + } + if (version <= MessagingService.current_version) + { + MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp); + } + // else ignore message } catch (Throwable t) { @@ -251,7 +326,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance { mkdirs(); - Config.setOverrideLoadConfig(() -> loadConfig(config)); DatabaseDescriptor.setDaemonInitialized(); DatabaseDescriptor.createAllDirectories(); @@ -281,8 +355,28 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance throw new RuntimeException(e); } - initializeRing(cluster); - registerMockMessaging(cluster); + if (config.has(NETWORK)) + { + registerFilter(cluster); + MessagingService.instance().listen(); + } + else + { + // Even though we don't use MessagingService, access the static SocketFactory + // instance here so that we start the static event loop state +// -- not sure what that means? SocketFactory.instance.getClass(); + registerMockMessaging(cluster); + } + + // TODO: this is more than just gossip + if (config.has(GOSSIP)) + { + StorageService.instance.initServer(); + } + else + { + initializeRing(cluster); + } SystemKeyspace.finishStartup(); @@ -366,32 +460,47 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance public Future<Void> shutdown() { + return shutdown(true); + } + + public Future<Void> shutdown(boolean graceful) + { Future<?> future = async((ExecutorService executor) -> { Throwable error = null; + + if (config.has(GOSSIP) || config.has(NETWORK)) + { + StorageService.instance.shutdownServer(); + + error = parallelRun(error, executor, + () -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L)) + ); + } + error = parallelRun(error, executor, - Gossiper.instance::stop, - CompactionManager.instance::forceShutdown, - CommitLog.instance::shutdownBlocking, - ColumnFamilyStore::shutdownFlushExecutor, - ColumnFamilyStore::shutdownPostFlushExecutor, - ColumnFamilyStore::shutdownReclaimExecutor, - PendingRangeCalculatorService.instance::shutdownExecutor, - StorageService.instance::shutdownBGMonitor, - Ref::shutdownReferenceReaper, - Memtable.MEMORY_POOL::shutdown, - ScheduledExecutors::shutdownAndWait, - SSTableReader::shutdownBlocking + () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES), + CompactionManager.instance::forceShutdown, + () -> BatchlogManager.shutdownAndWait(1L, MINUTES), + () -> HintedHandOffManager.instance.shutdownAndWait(1L, MINUTES), + () -> StreamCoordinator.shutdownAndWait(1L, MINUTES), + () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES), + () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES), + () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES), + () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES), + () -> Ref.shutdownReferenceReaper(1L, MINUTES), + () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES), + () -> SSTableReader.shutdownBlocking(1L, MINUTES), + () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES) ); error = parallelRun(error, executor, + CommitLog.instance::shutdownBlocking, MessagingService.instance()::shutdown ); error = parallelRun(error, executor, - StageManager::shutdownAndWait, - SharedExecutorPool.SHARED::shutdown + () -> StageManager.shutdownAndWait(1L, MINUTES), + () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES) ); - LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); - loggerContext.stop(); Throwables.maybeFail(error); }).apply(isolatedExecutor); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java index 6fd5c7e..363a1df 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java @@ -48,6 +48,7 @@ public class InstanceClassLoader extends URLClassLoader name.startsWith("org.apache.cassandra.distributed.api.") || name.startsWith("sun.") || name.startsWith("oracle.") + || name.startsWith("com.intellij.") || name.startsWith("com.sun.") || name.startsWith("com.oracle.") || name.startsWith("java.") @@ -63,16 +64,16 @@ public class InstanceClassLoader extends URLClassLoader InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader); } - private final int id; private final URL[] urls; + private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected private final ClassLoader sharedClassLoader; - InstanceClassLoader(int id, URL[] urls, ClassLoader sharedClassLoader) + InstanceClassLoader(int generation, URL[] urls, ClassLoader sharedClassLoader) { super(urls, null); - this.id = id; this.urls = urls; this.sharedClassLoader = sharedClassLoader; + this.generation = generation; } @Override @@ -109,7 +110,7 @@ public class InstanceClassLoader extends URLClassLoader public String toString() { return "InstanceClassLoader{" + - "id=" + id + + "generation=" + generation + ", urls=" + Arrays.toString(urls) + '}'; } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index ab15fed..efe9a0f 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SimpleSeedProvider; @@ -30,6 +31,7 @@ import java.lang.reflect.Field; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -45,6 +47,8 @@ public class InstanceConfig implements IInstanceConfig public UUID hostId() { return hostId; } private final Map<String, Object> params = new TreeMap<>(); + private EnumSet featureFlags; + private volatile InetAddressAndPort broadcastAddressAndPort; @Override @@ -103,6 +107,7 @@ public class InstanceConfig implements IInstanceConfig Collections.singletonMap("seeds", "127.0.0.1"))) // legacy parameters .forceSet("commitlog_sync_batch_window_in_ms", 1.0); + this.featureFlags = EnumSet.noneOf(Feature.class); } private InstanceConfig(InstanceConfig copy) @@ -110,6 +115,18 @@ public class InstanceConfig implements IInstanceConfig this.num = copy.num; this.params.putAll(copy.params); this.hostId = copy.hostId; + this.featureFlags = copy.featureFlags; + } + + public InstanceConfig with(Feature featureFlag) + { + featureFlags.add(featureFlag); + return this; + } + + public boolean has(Feature featureFlag) + { + return featureFlags.contains(featureFlag); } public InstanceConfig set(String fieldName, Object value) @@ -200,13 +217,14 @@ public class InstanceConfig implements IInstanceConfig return (String)params.get(name); } - public static InstanceConfig generate(int nodeNum, File root, String token) + public static InstanceConfig generate(int nodeNum, int subnet, File root, String token) { + String ipPrefix = "127.0." + subnet + "."; return new InstanceConfig(nodeNum, - "127.0.0." + nodeNum, - "127.0.0." + nodeNum, - "127.0.0." + nodeNum, - "127.0.0." + nodeNum, + ipPrefix + nodeNum, + ipPrefix + nodeNum, + ipPrefix + nodeNum, + ipPrefix + nodeNum, String.format("%s/node%d/saved_caches", root, nodeNum), new String[] { String.format("%s/node%d/data", root, nodeNum) }, String.format("%s/node%d/commitlog", root, nodeNum), diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java index d82c9e4..1d26c5d 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java @@ -27,28 +27,36 @@ import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URLClassLoader; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.LoggerContext; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.utils.ExecutorUtils; public class IsolatedExecutor implements IIsolatedExecutor { final ExecutorService isolatedExecutor; + private final String name; private final ClassLoader classLoader; private final Method deserializeOnInstance; IsolatedExecutor(String name, ClassLoader classLoader) { + this.name = name; this.isolatedExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("isolatedExecutor", Thread.NORM_PRIORITY, classLoader, new ThreadGroup(name))); this.classLoader = classLoader; this.deserializeOnInstance = lookupDeserializeOneObject(classLoader); @@ -57,9 +65,40 @@ public class IsolatedExecutor implements IIsolatedExecutor public Future<Void> shutdown() { isolatedExecutor.shutdown(); - ThrowingRunnable.toRunnable(((URLClassLoader) classLoader)::close).run(); - return CompletableFuture.runAsync(ThrowingRunnable.toRunnable(() -> isolatedExecutor.awaitTermination(60, TimeUnit.SECONDS)), - Executors.newSingleThreadExecutor()); + + /* Use a thread pool with a core pool size of zero to terminate the thread as soon as possible + ** so the instance class loader can be garbage collected. Uses a custom thread factory + ** rather than NamedThreadFactory to avoid calling FastThreadLocal.removeAll() in 3.0 and up + ** as it was observed crashing during test failures and made it harder to find the real cause. + */ + ThreadFactory threadFactory = (Runnable r) -> { + Thread t = new Thread(r, name + "_shutdown"); + t.setDaemon(true); + return t; + }; + ExecutorService shutdownExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), threadFactory); + return shutdownExecutor.submit(() -> { + try + { + ExecutorUtils.awaitTermination(60, TimeUnit.SECONDS, isolatedExecutor); + + // Shutdown logging last - this is not ideal as the logging subsystem is initialized + // outsize of this class, however doing it this way provides access to the full + // logging system while termination is taking place. + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + loggerContext.stop(); + + // Close the instance class loader after shutting down the isolatedExecutor and logging + // in case error handling triggers loading additional classes + ((URLClassLoader) classLoader).close(); + } + finally + { + shutdownExecutor.shutdownNow(); + } + return null; + }); } public <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call) { return () -> isolatedExecutor.submit(call); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java index a72c7a5..c1607f8 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java @@ -32,33 +32,20 @@ import org.apache.cassandra.net.MessagingService; public class MessageFilters implements IMessageFilters { - private final ICluster cluster; private final Set<Filter> filters = new CopyOnWriteArraySet<>(); - public MessageFilters(AbstractCluster cluster) + public boolean permit(IInstance from, IInstance to, int verb) { - this.cluster = cluster; - } + if (from == null || to == null) + return false; // cannot deliver + int fromNum = from.config().num(); + int toNum = to.config().num(); - public BiConsumer<InetAddressAndPort, IMessage> filter(BiConsumer<InetAddressAndPort, IMessage> applyIfNotFiltered) - { - return (toAddress, message) -> - { - IInstance from = cluster.get(message.from()); - IInstance to = cluster.get(toAddress); - if (from == null || to == null) - return; // cannot deliver - int fromNum = from.config().num(); - int toNum = to.config().num(); - int verb = message.verb(); - for (Filter filter : filters) - { - if (filter.matches(fromNum, toNum, verb)) - return; - } + for (Filter filter : filters) + if (filter.matches(fromNum, toNum, verb)) + return false; - applyIfNotFiltered.accept(toAddress, message); - }; + return true; } public class Filter implements IMessageFilters.Filter diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java index 18ca17f..757c17f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java @@ -29,6 +29,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.apache.cassandra.distributed.impl.AbstractCluster; +import org.apache.cassandra.distributed.impl.IsolatedExecutor; public class DistributedTestBase { @@ -41,10 +42,37 @@ public class DistributedTestBase public static String KEYSPACE = "distributed_test_keyspace"; + public static void nativeLibraryWorkaround() + { + // Disable the C library for in-JVM dtests otherwise it holds a gcroot against the InstanceClassLoader + System.setProperty("cassandra.disable_clibrary", "true"); + + // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask, + // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask, + // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader. + System.setProperty("cassandra.disable_tcactive_openssl", "true"); + System.setProperty("io.netty.transport.noNative", "true"); + } + + public static void processReaperWorkaround() + { + // Make sure the 'process reaper' thread is initially created under the main classloader, + // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader + // which prevents it from being garbage collected. + IsolatedExecutor.ThrowingRunnable.toRunnable(() -> new ProcessBuilder().command("true").start().waitFor()).run(); + } + @BeforeClass public static void setup() { System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); + nativeLibraryWorkaround(); + processReaperWorkaround(); + } + + static String withKeyspace(String replaceIn) + { + return String.format(replaceIn, KEYSPACE); } protected static <C extends AbstractCluster<?>> C init(C cluster) diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java new file mode 100644 index 0000000..55c700c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.sql.Date; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.List; +import java.util.function.Consumer; +import javax.management.MBeanServer; + +import org.junit.Ignore; +import org.junit.Test; + +import com.sun.management.HotSpotDiagnosticMXBean; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.impl.InstanceConfig; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.SigarLibrary; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +/* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup. + * All objects referencing the InstanceClassLoader need to be garbage collected or + * the JVM runs out of metaspace. This test also calls out to lsof to check which + * file handles are still opened. + * + * This is intended to be a burn type test where it is run outside of the test suites + * when a problem is detected (like OutOfMetaspace exceptions). + * + * Currently this test demonstrates that the InstanceClassLoader is cleaned up (load up + * the final hprof and check that the class loaders are not reachable from a GC root), + * but it shows that the file handles for Data/Index files are being leaked. + */ +@Ignore +public class ResourceLeakTest extends DistributedTestBase +{ + // Parameters to adjust while hunting for leaks + final int numTestLoops = 1; // Set this value high to crash on leaks, or low when tracking down an issue. + final boolean dumpEveryLoop = false; // Dump heap & possibly files every loop + final boolean dumpFileHandles = false; // Call lsof whenever dumping resources + final boolean forceCollection = false; // Whether to explicitly force finalization/gc for smaller heap dumps + final long finalWaitMillis = 0l; // Number of millis to wait before final resource dump to give gc a chance + + static final SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); + static final String when = format.format(Date.from(Instant.now())); + + static String outputFilename(String base, String description, String extension) + { + Path p = FileSystems.getDefault().getPath("build", "test", + String.join("-", when, base, description) + extension); + return p.toString(); + } + + /** + * Retrieves the process ID or <code>null</code> if the process ID cannot be retrieved. + * @return the process ID or <code>null</code> if the process ID cannot be retrieved. + * + * (Duplicated from HeapUtils to avoid refactoring older releases where this test is useful). + */ + private static Long getProcessId() + { + // Once Java 9 is ready the process API should provide a better way to get the process ID. + long pid = SigarLibrary.instance.getPid(); + + if (pid >= 0) + return Long.valueOf(pid); + + return getProcessIdFromJvmName(); + } + + /** + * Retrieves the process ID from the JVM name. + * @return the process ID or <code>null</code> if the process ID cannot be retrieved. + */ + private static Long getProcessIdFromJvmName() + { + // the JVM name in Oracle JVMs is: '<pid>@<hostname>' but this might not be the case on all JVMs + String jvmName = ManagementFactory.getRuntimeMXBean().getName(); + try + { + return Long.parseLong(jvmName.split("@")[0]); + } + catch (NumberFormatException e) + { + // ignore + } + return null; + } + + static void dumpHeap(String description, boolean live) throws IOException + { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + HotSpotDiagnosticMXBean mxBean = ManagementFactory.newPlatformMXBeanProxy( + server, "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class); + mxBean.dumpHeap(outputFilename("heap", description, ".hprof"), live); + } + + static void dumpOpenFiles(String description) throws IOException, InterruptedException + { + long pid = getProcessId(); + ProcessBuilder map = new ProcessBuilder("/usr/sbin/lsof", "-p", Long.toString(pid)); + File output = new File(outputFilename("lsof", description, ".txt")); + map.redirectOutput(output); + map.redirectErrorStream(true); + map.start().waitFor(); + } + + void dumpResources(String description) throws IOException, InterruptedException + { + dumpHeap(description, false); + if (dumpFileHandles) + { + dumpOpenFiles(description); + } + } + + void doTest(int numClusterNodes, Consumer<InstanceConfig> updater) throws Throwable + { + for (int loop = 0; loop < numTestLoops; loop++) + { + try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start()) + { + if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node + cluster.get(1).runOnInstance(() -> CassandraDaemon.waitForGossipToSettle()); + + init(cluster); + String tableName = "tbl" + loop; + cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL); + cluster.get(1).callOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush())); + if (dumpEveryLoop) + { + dumpResources(String.format("loop%03d", loop)); + } + } + catch (Throwable tr) + { + System.out.println("Dumping resources for exception: " + tr.getMessage()); + tr.printStackTrace(); + dumpResources("exception"); + } + if (forceCollection) + { + System.runFinalization(); + System.gc(); + } + } + } + + @Test + public void looperTest() throws Throwable + { + doTest(1, config -> {}); + if (forceCollection) + { + System.runFinalization(); + System.gc(); + Thread.sleep(finalWaitMillis); + } + dumpResources("final"); + } + + @Test + public void looperGossipNetworkTest() throws Throwable + { + doTest(2, config -> config.with(GOSSIP).with(NETWORK)); + if (forceCollection) + { + System.runFinalization(); + System.gc(); + Thread.sleep(finalWaitMillis); + } + dumpResources("final-gossip-network"); + } +} diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index 7bb4a51..0d61ad8 100644 --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.io.PrintStream; import java.util.Arrays; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; @@ -62,7 +63,7 @@ public class SEPExecutorTest } // shutdown does not guarantee that threads are actually dead once it exits, only that they will stop promptly afterwards - sharedPool.shutdown(); + sharedPool.shutdownAndWait(1L, TimeUnit.MINUTES); for (Thread thread : Thread.getAllStackTraces().keySet()) { if (thread.getName().contains(MAGIC)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org