This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit eae581a5f07c340594f6af47bb558693ef363611
Author: Benedict Elliott Smith <bened...@apple.com>
AuthorDate: Wed Nov 17 14:34:23 2021 +0000

    [CEP-10] Cluster and Code Simulations: Minor improvements
    
    - Simplify Semaphore
    - Future improvements
    - ScheduledExecutorPlus improvements for simulator compatibility
    - Debug leaks in Ref or BufferPool
    - Support use of TokenMetadata without initialising Cassandra
    - Additional system properties and simulator flags
    - Permit Clock initialisation within separate ClassLoader
    - Introduce BallotGenerator
    
    patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-17008
---
 .../cassandra/auth/CassandraRoleManager.java       |   2 +-
 .../concurrent/ScheduledExecutorPlus.java          |  24 +++
 .../ScheduledThreadPoolExecutorPlus.java           |  27 +++
 .../cassandra/concurrent/SyncFutureTask.java       |   5 +-
 .../config/CassandraRelevantProperties.java        |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |   4 +
 .../cql3/statements/BatchUpdatesCollector.java     |   2 +-
 src/java/org/apache/cassandra/db/Keyspace.java     |   2 +-
 src/java/org/apache/cassandra/db/Mutation.java     |   2 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |   2 +-
 .../cassandra/db/ReadExecutionController.java      |   2 +-
 .../db/commitlog/AbstractCommitLogService.java     |   3 +-
 .../cassandra/db/monitoring/MonitorableImpl.java   |   2 +-
 .../cassandra/db/monitoring/MonitoringTask.java    |   2 +-
 .../org/apache/cassandra/gms/FailureDetector.java  |   4 +-
 .../apache/cassandra/hints/HintsDispatcher.java    |   2 +-
 .../apache/cassandra/locator/TokenMetadata.java    |  76 ++++----
 src/java/org/apache/cassandra/metrics/Sampler.java |   2 +-
 .../cassandra/net/AbstractMessageHandler.java      |   2 +-
 .../cassandra/net/InboundMessageHandler.java       |   2 +-
 .../cassandra/net/InboundMessageHandlers.java      |   2 +-
 src/java/org/apache/cassandra/net/Message.java     |   2 +-
 .../apache/cassandra/net/OutboundConnection.java   |   2 +-
 .../org/apache/cassandra/net/RequestCallbacks.java |   2 +-
 .../apache/cassandra/net/ResponseVerbHandler.java  |   2 +-
 .../apache/cassandra/schema/MigrationManager.java  |   4 +
 .../apache/cassandra/schema/SchemaKeyspace.java    |   3 +
 .../cassandra/service/ActiveRepairService.java     |   3 +
 .../org/apache/cassandra/service/StorageProxy.java |  28 ++-
 .../cassandra/service/paxos/BallotGenerator.java   |  75 ++++++++
 .../cassandra/service/paxos/ProposeCallback.java   |   3 +-
 .../cassandra/transport/CQLMessageHandler.java     |   2 +-
 src/java/org/apache/cassandra/utils/Clock.java     |  31 +++-
 .../org/apache/cassandra/utils/MonotonicClock.java |  19 +-
 .../Nemesis.java}                                  |  27 ++-
 src/java/org/apache/cassandra/utils/Simulate.java  |  56 ++++++
 .../cassandra/utils/concurrent/AbstractFuture.java |  26 ++-
 .../cassandra/utils/concurrent/AsyncFuture.java    |   2 +-
 .../cassandra/utils/concurrent/Awaitable.java      | 135 +++++++-------
 .../apache/cassandra/utils/concurrent/Future.java  |  11 ++
 .../cassandra/utils/concurrent/ListenerList.java   |  18 +-
 .../org/apache/cassandra/utils/concurrent/Ref.java |  29 ++-
 .../cassandra/utils/concurrent/Semaphore.java      | 200 ++-------------------
 .../cassandra/utils/concurrent/SyncFuture.java     |   4 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  19 +-
 .../apache/cassandra/utils/memory/HeapPool.java    |   4 +
 .../cassandra/utils/memory/LongBufferPoolTest.java |   4 +-
 .../concurrent/AbstractExecutorPlusTest.java       |   8 +-
 .../cassandra/utils/concurrent/SemaphoreTest.java  |   8 +-
 49 files changed, 515 insertions(+), 382 deletions(-)

diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java 
b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 0e49056..b813b55 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -385,7 +385,7 @@ public class CassandraRoleManager implements IRoleManager
     protected void scheduleSetupTask(final Callable<Void> setupTask)
     {
         // The delay is to give the node a chance to see its peers before 
attempting the operation
-        ScheduledExecutors.optionalTasks.schedule(() -> {
+        ScheduledExecutors.optionalTasks.scheduleSelfRecurring(() -> {
             isClusterReady = true;
             try
             {
diff --git 
a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java 
b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
index ecf073d..a2b033a 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
@@ -19,6 +19,8 @@
 package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.utils.Shared;
 
@@ -27,4 +29,26 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 @Shared(scope = SIMULATION)
 public interface ScheduledExecutorPlus extends ExecutorPlus, 
ScheduledExecutorService
 {
+    /**
+     * Schedule an action that is recurring but self-administered.
+     */
+    ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, 
TimeUnit units);
+
+    /**
+     * Schedule a timeout action. This method is primarily used by the 
Simulator to modify its
+     * scheduling behaviour with respect to this operation.
+     */
+    ScheduledFuture<?> scheduleAt(Runnable run, long deadline);
+
+    /**
+     * Schedule a timeout action. This method is primarily used by the 
Simulator to modify its
+     * scheduling behaviour with respect to this operation.
+     */
+    ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline);
+
+    /**
+     * Schedule a timeout action. This method is primarily used by the 
Simulator to modify its
+     * scheduling behaviour with respect to this operation.
+     */
+    ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, 
TimeUnit units);
 }
diff --git 
a/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java 
b/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java
index efd284f..0ab09a4 100644
--- 
a/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java
+++ 
b/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java
@@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.service.StorageService;
 
+import static com.google.common.primitives.Longs.max;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.concurrent.ExecutionFailure.propagating;
 import static org.apache.cassandra.concurrent.ExecutionFailure.suppressing;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 /**
  * Like ExecutorPlus, ScheduledThreadPoolExecutorPlus always
@@ -97,6 +100,30 @@ public class ScheduledThreadPoolExecutorPlus extends 
ScheduledThreadPoolExecutor
         return super.scheduleWithFixedDelay(suppressing(task), initialDelay, 
delay, unit);
     }
 
+    @Override
+    public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, 
TimeUnit units)
+    {
+        return schedule(run, delay, units);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAt(Runnable run, long deadline)
+    {
+        return schedule(run, max(0, deadline - nanoTime()), NANOSECONDS);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline)
+    {
+        return scheduleTimeoutWithDelay(run, max(0, deadline - nanoTime()), 
NANOSECONDS);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long 
delay, TimeUnit units)
+    {
+        return schedule(run, delay, units);
+    }
+
     /*======== BEGIN DIRECT COPY OF ThreadPoolExecutorPlus ===============*/
 
     private <T extends Runnable> T addTask(T task)
diff --git a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java 
b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
index 4f4aa67..4885821 100644
--- a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
+++ b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
@@ -60,7 +60,10 @@ public class SyncFutureTask<T> extends SyncFuture<T> 
implements RunnableFuture<T
         try
         {
             if (!setUncancellable())
-                throw new IllegalStateException();
+            {
+                if (isCancelled()) return;
+                else throw new IllegalStateException();
+            }
 
             if (!trySuccess(call.call()))
                 throw new IllegalStateException();
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 43db1c3..807516c 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -228,6 +228,7 @@ public enum CassandraRelevantProperties
     // properties for debugging simulator ASM output
     TEST_SIMULATOR_PRINT_ASM("cassandra.test.simulator.print_asm", "none"),
     TEST_SIMULATOR_PRINT_ASM_TYPES("cassandra.test.simulator.print_asm_types", 
""),
+    TEST_SIMULATOR_LIVENESS_CHECK("cassandra.test.simulator.livenesscheck", 
"true"),
 
     // determinism properties for testing
     
DETERMINISM_SSTABLE_COMPRESSION_DEFAULT("cassandra.sstable_compression_default",
 "true"),
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8bb52d4..8265fd2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -70,6 +70,7 @@ import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CacheService.CacheType;
 import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -80,6 +81,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.OS_ARCH;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.SUN_ARCH_DATA_MODEL;
 import static org.apache.cassandra.io.util.FileUtils.ONE_GB;
 import static org.apache.cassandra.io.util.FileUtils.ONE_MB;
+import static org.apache.cassandra.utils.Clock.Global.logInitializationOutcome;
 
 public class DatabaseDescriptor
 {
@@ -839,6 +841,8 @@ public class DatabaseDescriptor
         }
 
         Paxos.setPaxosVariant(conf.paxos_variant);
+
+        logInitializationOutcome(logger);
     }
 
     @VisibleForTesting
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index cb88bdd..e5136f4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
  * Utility class to collect updates.
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 795230e..285d08c 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -78,7 +78,7 @@ import org.apache.cassandra.utils.concurrent.Promise;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
  * It represents a Keyspace.
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index a30b567..6350082 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.utils.concurrent.Future;
 import static org.apache.cassandra.net.MessagingService.VERSION_30;
 import static org.apache.cassandra.net.MessagingService.VERSION_3014;
 import static org.apache.cassandra.net.MessagingService.VERSION_40;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 public class Mutation implements IMutation
 {
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index f14240b..3bf0d6d 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -72,7 +72,7 @@ import org.apache.cassandra.utils.ObjectSizes;
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 import static 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
 
 /**
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java 
b/src/java/org/apache/cassandra/db/ReadExecutionController.java
index 5bcd84b..2fbe3ac 100644
--- a/src/java/org/apache/cassandra/db/ReadExecutionController.java
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
-import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
 
 public class ReadExecutionController implements AutoCloseable
 {
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index be3f8cd..6b5378f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -45,6 +45,7 @@ import static 
org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static 
org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
 import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore;
 import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
 
@@ -151,7 +152,7 @@ public abstract class AbstractCommitLogService
             throw new IllegalArgumentException(String.format("Commit log flush 
interval must be positive: %fms",
                                                              syncIntervalNanos 
* 1e-6));
 
-        SyncRunnable sync = new SyncRunnable(MonotonicClock.preciseTime);
+        SyncRunnable sync = new SyncRunnable(preciseTime);
         executor = executorFactory().infiniteLoop(name, sync, SAFE, 
NON_DAEMON, SYNCHRONIZED);
     }
 
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java 
b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
index a6e7947..31b5404 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.db.monitoring;
 
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 public abstract class MonitorableImpl implements Monitorable
 {
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java 
b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
index 52d6160..d681e4b 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.lang.System.getProperty;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 import static 
org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
 
 /**
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 40a2de5..6db41de 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -28,7 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import javax.management.openmbean.*;
-import org.apache.cassandra.io.util.File;
+
 import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ import org.apache.cassandra.utils.MBeanWrapper;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.newFailureDetector;
-import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
 
 /**
  * This FailureDetector is an implementation of the paper titled
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java 
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 2b6d9a3..b627338 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.concurrent.Condition;
 import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.*;
 import static 
org.apache.cassandra.metrics.HintsServiceMetrics.updateDelayMetrics;
 import static org.apache.cassandra.net.Verb.HINT_REQ;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 import static 
org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
 
 /**
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index fb9d43b..202bd6a 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -862,42 +862,8 @@ public class TokenMetadata
         {
             TokenMetadataDiagnostics.pendingRangeCalculationStarted(this, 
keyspaceName);
 
-            // create clone of current state
-            BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone;
-            Set<InetAddressAndPort> leavingEndpointsClone;
-            Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone;
-            TokenMetadata metadata;
-
-            lock.readLock().lock();
-            try
-            {
-
-                if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty())
-                {
-                    if (logger.isTraceEnabled())
-                        logger.trace("No bootstrapping, leaving or moving 
nodes -> empty pending ranges for {}", keyspaceName);
-                    if (bootstrapTokens.isEmpty() && 
leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
-                    {
-                        if (logger.isTraceEnabled())
-                            logger.trace("No bootstrapping, leaving or moving 
nodes -> empty pending ranges for {}", keyspaceName);
-                        pendingRanges.put(keyspaceName, new 
PendingRangeMaps());
-
-                        return;
-                    }
-                }
+            unsafeCalculatePendingRanges(strategy, keyspaceName);
 
-                bootstrapTokensClone  = new 
BiMultiValMap<>(this.bootstrapTokens);
-                leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
-                movingEndpointsClone = new HashSet<>(this.movingEndpoints);
-                metadata = this.cloneOnlyTokenMap();
-            }
-            finally
-            {
-                lock.readLock().unlock();
-            }
-
-            pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, 
metadata, bootstrapTokensClone,
-                                                                   
leavingEndpointsClone, movingEndpointsClone));
             if (logger.isDebugEnabled())
                 logger.debug("Starting pending range calculation for {}", 
keyspaceName);
 
@@ -910,6 +876,46 @@ public class TokenMetadata
         }
     }
 
+    public void unsafeCalculatePendingRanges(AbstractReplicationStrategy 
strategy, String keyspaceName)
+    {
+        // create clone of current state
+        BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone;
+        Set<InetAddressAndPort> leavingEndpointsClone;
+        Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone;
+        TokenMetadata metadata;
+
+        lock.readLock().lock();
+        try
+        {
+
+            if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty())
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("No bootstrapping, leaving or moving nodes -> 
empty pending ranges for {}", keyspaceName);
+                if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty())
+                {
+                    if (logger.isTraceEnabled())
+                        logger.trace("No bootstrapping, leaving or moving 
nodes -> empty pending ranges for {}", keyspaceName);
+                    pendingRanges.put(keyspaceName, new PendingRangeMaps());
+
+                    return;
+                }
+            }
+
+            bootstrapTokensClone  = new BiMultiValMap<>(this.bootstrapTokens);
+            leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
+            movingEndpointsClone = new HashSet<>(this.movingEndpoints);
+            metadata = this.cloneOnlyTokenMap();
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+
+        pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, 
metadata, bootstrapTokensClone,
+                                                               
leavingEndpointsClone, movingEndpointsClone));
+    }
+
     /**
      * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, 
String)
      */
diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java 
b/src/java/org/apache/cassandra/metrics/Sampler.java
index 90cc90c..b3d0f21 100644
--- a/src/java/org/apache/cassandra/metrics/Sampler.java
+++ b/src/java/org/apache/cassandra/metrics/Sampler.java
@@ -40,7 +40,7 @@ public abstract class Sampler<T>
     }
 
     @VisibleForTesting
-    MonotonicClock clock = MonotonicClock.approxTime;
+    MonotonicClock clock = MonotonicClock.Global.approxTime;
 
     @VisibleForTesting
     static final ExecutorPlus samplerExecutor = executorFactory()
diff --git a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java 
b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
index 1045f28..e2cf68d 100644
--- a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.net.ResourceLimits.Limit;
 import static java.lang.Math.max;
 import static java.lang.Math.min;
 import static org.apache.cassandra.net.Crc.InvalidCrc;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
  * Core logic for handling inbound message deserialization and execution (in 
tandem with {@link FrameDecoder}).
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java 
b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index c1b51be..e12fcec 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
  * Implementation of {@link AbstractMessageHandler} for processing internode 
messages from peers.
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java 
b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
index a706557..c7b9463 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.metrics.InternodeInboundMetrics;
 import org.apache.cassandra.net.Message.Header;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
  * An aggregation of {@link InboundMessageHandler}s for all connections from a 
peer.
diff --git a/src/java/org/apache/cassandra/net/Message.java 
b/src/java/org/apache/cassandra/net/Message.java
index 802c79f..8fe8971 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -57,7 +57,7 @@ import static 
org.apache.cassandra.net.MessagingService.VERSION_3014;
 import static org.apache.cassandra.net.MessagingService.VERSION_30;
 import static org.apache.cassandra.net.MessagingService.VERSION_40;
 import static org.apache.cassandra.net.MessagingService.instance;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 import static 
org.apache.cassandra.utils.vint.VIntCoding.computeUnsignedVIntSize;
 import static org.apache.cassandra.utils.vint.VIntCoding.getUnsignedVInt;
 import static org.apache.cassandra.utils.vint.VIntCoding.skipUnsignedVInt;
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java 
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index fbf0c73..c2aecb0 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -68,7 +68,7 @@ import static org.apache.cassandra.net.ResourceLimits.*;
 import static org.apache.cassandra.net.ResourceLimits.Outcome.*;
 import static org.apache.cassandra.net.SocketFactory.*;
 import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 import static org.apache.cassandra.utils.Throwables.isCausedBy;
 import static 
org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
 
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java 
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 6275c15..fa1a03e 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -49,7 +49,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
 
 /**
  * An expiring map of request callbacks.
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 369e5f4..1cee468 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.tracing.Tracing;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 class ResponseVerbHandler implements IVerbHandler
 {
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 6fbfc5d..8e485e0 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -23,6 +23,8 @@ import java.lang.management.ManagementFactory;
 import java.util.function.LongSupplier;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.utils.Simulate;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.concurrent.Stage.MIGRATION;
 import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ;
+import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
 
+@Simulate(with = GLOBAL_CLOCK)
 public class MigrationManager
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 6d5e331..90859ce 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -49,6 +49,7 @@ import 
org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
 
 import static java.lang.String.format;
 
@@ -58,6 +59,7 @@ import static java.util.stream.Collectors.toSet;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 import static org.apache.cassandra.schema.SchemaKeyspaceTables.*;
+import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
 
 /**
  * system_schema.* tables and methods for manipulating them.
@@ -294,6 +296,7 @@ public final class SchemaKeyspace
     /**
      * Add entries to system_schema.* for the hardcoded system keyspaces
      */
+    @Simulate(with = GLOBAL_CLOCK)
     static void saveSystemKeyspacesSchema()
     {
         KeyspaceMetadata system = 
Schema.instance.getKeyspaceMetadata(SchemaConstants.SYSTEM_KEYSPACE_NAME);
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 322fd18..7d0a290 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.locator.EndpointsByRange;
 import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.utils.Simulate;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +107,7 @@ import static 
org.apache.cassandra.config.DatabaseDescriptor.*;
 import static org.apache.cassandra.net.Message.out;
 import static org.apache.cassandra.net.Verb.PREPARE_MSG;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
 import static 
org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
 
 /**
@@ -122,6 +124,7 @@ import static 
org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownL
  * The creation of a repair session is done through the submitRepairSession 
that
  * returns a future on the completion of that session.
  */
+@Simulate(with = MONITORS)
 public class ActiveRepairService implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener, ActiveRepairServiceMBean
 {
 
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 62bd01a..3883062 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -141,6 +141,7 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 import static com.google.common.collect.Iterables.concat;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
 import static org.apache.cassandra.net.Message.out;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
@@ -152,6 +153,8 @@ import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetr
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.*;
 import static 
org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
+import static 
org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallotTimestampMicros;
+import static 
org.apache.cassandra.service.paxos.BallotGenerator.Global.randomBallot;
 import static org.apache.cassandra.service.paxos.PrepareVerbHandler.doPrepare;
 import static org.apache.cassandra.service.paxos.ProposeVerbHandler.doPropose;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@@ -335,7 +338,6 @@ public class StorageProxy implements StorageProxyMBean
                            consistencyForPaxos,
                            consistencyForCommit,
                            consistencyForCommit,
-                           state,
                            queryStartNanoTime,
                            casWriteMetrics,
                            updateProposer);
@@ -430,7 +432,6 @@ public class StorageProxy implements StorageProxyMBean
                                        ConsistencyLevel consistencyForPaxos,
                                        ConsistencyLevel 
consistencyForReplayCommits,
                                        ConsistencyLevel consistencyForCommit,
-                                       ClientState state,
                                        long queryStartNanoTime,
                                        CASClientRequestMetrics casMetrics,
                                        Supplier<Pair<PartitionUpdate, 
RowIterator>> createUpdateProposal)
@@ -457,8 +458,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                     
replicaPlan,
                                                                     
consistencyForPaxos,
                                                                     
consistencyForReplayCommits,
-                                                                    casMetrics,
-                                                                    state);
+                                                                    
casMetrics);
 
                 final UUID ballot = pair.ballot;
                 contentions += pair.contentions;
@@ -525,8 +525,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                 
ReplicaPlan.ForPaxosWrite paxosPlan,
                                                                 
ConsistencyLevel consistencyForPaxos,
                                                                 
ConsistencyLevel consistencyForCommit,
-                                                                
CASClientRequestMetrics casMetrics,
-                                                                ClientState 
state)
+                                                                
CASClientRequestMetrics casMetrics)
     throws WriteTimeoutException, WriteFailureException
     {
         long timeoutNanos = 
DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
@@ -540,10 +539,10 @@ public class StorageProxy implements StorageProxyMBean
             // in progress (#5667). Lastly, we don't want to use a timestamp 
that is older than the last one assigned by ClientState or operations may appear
             // out-of-order (#7801).
             long minTimestampMicrosToUse = summary == null ? Long.MIN_VALUE : 
1 + UUIDGen.microsTimestamp(summary.mostRecentInProgressCommit.ballot);
-            long ballotMicros = 
state.getTimestampForPaxos(minTimestampMicrosToUse);
+            long ballotMicros = 
nextBallotTimestampMicros(minTimestampMicrosToUse);
             // Note that ballotMicros is not guaranteed to be unique if two 
proposal are being handled concurrently by the same coordinator. But we still
             // need ballots to be unique for each proposal so we have to use 
getRandomTimeUUIDFromMicros.
-            UUID ballot = UUIDGen.getRandomTimeUUIDFromMicros(ballotMicros);
+            UUID ballot = randomBallot(ballotMicros, consistencyForPaxos == 
SERIAL);
 
             // prepare
             try
@@ -1810,7 +1809,6 @@ public class StorageProxy implements StorageProxyMBean
                         consistencyLevel,
                         consistencyForReplayCommitsOrFetch,
                         ConsistencyLevel.ANY,
-                        state,
                         start,
                         casReadMetrics,
                         updateProposer);
@@ -2077,12 +2075,12 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    
MessagingService.instance().metrics.recordSelfDroppedMessage(verb, 
MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
+                    
MessagingService.instance().metrics.recordSelfDroppedMessage(verb, 
MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
                     
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.UNKNOWN);
                 }
 
                 if (!readRejected)
-                    
MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(),
 MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
+                    
MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(),
 MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
             }
             catch (Throwable t)
             {
@@ -2375,13 +2373,13 @@ public class StorageProxy implements StorageProxyMBean
 
         public DroppableRunnable(Verb verb)
         {
-            this.approxCreationTimeNanos = MonotonicClock.approxTime.now();
+            this.approxCreationTimeNanos = 
MonotonicClock.Global.approxTime.now();
             this.verb = verb;
         }
 
         public final void run()
         {
-            long approxCurrentTimeNanos = MonotonicClock.approxTime.now();
+            long approxCurrentTimeNanos = 
MonotonicClock.Global.approxTime.now();
             long expirationTimeNanos = 
verb.expiresAtNanos(approxCreationTimeNanos);
             if (approxCurrentTimeNanos > expirationTimeNanos)
             {
@@ -2408,7 +2406,7 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static abstract class LocalMutationRunnable implements Runnable
     {
-        private final long approxCreationTimeNanos = 
MonotonicClock.approxTime.now();
+        private final long approxCreationTimeNanos = 
MonotonicClock.Global.approxTime.now();
 
         private final Replica localReplica;
 
@@ -2420,7 +2418,7 @@ public class StorageProxy implements StorageProxyMBean
         public final void run()
         {
             final Verb verb = verb();
-            long nowNanos = MonotonicClock.approxTime.now();
+            long nowNanos = MonotonicClock.Global.approxTime.now();
             long expirationTimeNanos = 
verb.expiresAtNanos(approxCreationTimeNanos);
             if (nowNanos > expirationTimeNanos)
             {
diff --git a/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java 
b/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java
new file mode 100644
index 0000000..d031f38
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.Shared;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
+public interface BallotGenerator
+{
+    static class Default implements BallotGenerator
+    {
+        public UUID randomBallot(long whenInMicros, boolean isSerial)
+        {
+            return UUIDGen.getRandomTimeUUIDFromMicros(whenInMicros, isSerial 
? 2 : 1);
+        }
+
+        public UUID randomBallot(long fromInMicros, long toInMicros, boolean 
isSerial)
+        {
+            long timestampMicros = 
ThreadLocalRandom.current().nextLong(fromInMicros, toInMicros);
+            return randomBallot(timestampMicros, isSerial);
+        }
+
+        public long nextBallotTimestampMicros(long minTimestamp)
+        {
+            return ClientState.getTimestampForPaxos(minTimestamp);
+        }
+
+        public long prevBallotTimestampMicros()
+        {
+            return ClientState.getLastTimestampMicros();
+        }
+    }
+
+    static class Global
+    {
+        private static BallotGenerator instance = new Default();
+        public static UUID randomBallot(long whenInMicros, boolean isSerial) { 
return instance.randomBallot(whenInMicros, isSerial); }
+        public static UUID randomBallot(long fromInMicros, long toInMicros, 
boolean isSerial) { return instance.randomBallot(fromInMicros, toInMicros, 
isSerial); }
+        public static long nextBallotTimestampMicros(long minWhenInMicros) { 
return instance.nextBallotTimestampMicros(minWhenInMicros); }
+        public static long prevBallotTimestampMicros() { return 
instance.prevBallotTimestampMicros(); }
+
+        public static void unsafeSet(BallotGenerator newInstance)
+        {
+            instance = newInstance;
+        }
+    }
+
+    UUID randomBallot(long whenInMicros, boolean isSerial);
+    UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial);
+    long nextBallotTimestampMicros(long minWhenInMicros);
+    long prevBallotTimestampMicros();
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java 
b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index dc2f9a7..64eca68 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.Nemesis;
 
 /**
  * ProposeCallback has two modes of operation, controlled by the failFast 
parameter.
@@ -46,7 +47,7 @@ public class ProposeCallback extends 
AbstractPaxosCallback<Boolean>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ProposeCallback.class);
 
-    private final AtomicInteger accepts = new AtomicInteger(0);
+    @Nemesis private final AtomicInteger accepts = new AtomicInteger(0);
     private final int requiredAccepts;
     private final boolean failFast;
 
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java 
b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index bbb8cb5..09e9996 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -46,7 +46,7 @@ import 
org.apache.cassandra.transport.Flusher.FlushItem.Framed;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.utils.NoSpamLogger;
 
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 /**
  * Implementation of {@link AbstractMessageHandler} for processing CQL 
messages which comprise a {@link Message} wrapped
diff --git a/src/java/org/apache/cassandra/utils/Clock.java 
b/src/java/org/apache/cassandra/utils/Clock.java
index d1a7337..acdfc82 100644
--- a/src/java/org/apache/cassandra/utils/Clock.java
+++ b/src/java/org/apache/cassandra/utils/Clock.java
@@ -37,10 +37,12 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 @Shared(scope = SIMULATION)
 public interface Clock
 {
-    static final Logger logger = LoggerFactory.getLogger(Clock.class);
-
     public static class Global
     {
+        // something weird happens with class loading Logger that can cause a 
deadlock
+        private static Throwable FAILED_TO_INITIALISE;
+        private static String INITIALIZE_MESSAGE;
+
         /**
          * Static singleton object that will be instantiated by default with a 
system clock
          * implementation. Set <code>cassandra.clock</code> system property to 
a FQCN to use a
@@ -52,19 +54,38 @@ public interface Clock
         {
             String classname = CLOCK_GLOBAL.getString();
             Clock clock = new Default();
+            Throwable errorOutcome = null;
+            String outcome = null;
             if (classname != null)
             {
                 try
                 {
-                    logger.debug("Using custom clock implementation: {}", 
classname);
+                    outcome = "Using custom clock implementation: " + 
classname;
                     clock = (Clock) Class.forName(classname).newInstance();
                 }
-                catch (Exception e)
+                catch (Throwable t)
                 {
-                    logger.error("Failed to load clock implementation {}", 
classname, e);
+                    outcome = "Failed to load clock implementation " + 
classname;
+                    errorOutcome = t;
                 }
             }
             instance = clock;
+            FAILED_TO_INITIALISE = errorOutcome;
+            INITIALIZE_MESSAGE = outcome;
+        }
+
+        public static void logInitializationOutcome(Logger logger)
+        {
+            if (FAILED_TO_INITIALISE != null)
+            {
+                logger.error(INITIALIZE_MESSAGE, FAILED_TO_INITIALISE);
+            }
+            else if (INITIALIZE_MESSAGE != null)
+            {
+                logger.debug(INITIALIZE_MESSAGE);
+            }
+            FAILED_TO_INITIALISE = null;
+            INITIALIZE_MESSAGE = null;
         }
 
         /**
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java 
b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index e14fd45..d4590c9 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -39,7 +39,7 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
  * Wrapper around time related functions that are either implemented by using 
the default JVM calls
  * or by using a custom implementation for testing purposes.
  *
- * See {@link #preciseTime} for how to use a custom implementation.
+ * See {@link Global#preciseTime} for how to use a custom implementation.
  *
  * Please note that {@link java.time.Clock} wasn't used, as it would not be 
possible to provide an
  * implementation for {@link #now()} with the exact same properties of {@link 
System#nanoTime()}.
@@ -49,13 +49,6 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 @Shared(scope = SIMULATION)
 public interface MonotonicClock
 {
-    /**
-     * Static singleton object that will be instantiated by default with a 
system clock
-     * implementation. Set <code>cassandra.clock</code> system property to a 
FQCN to use a
-     * different implementation instead.
-     */
-    public static final MonotonicClock preciseTime = Defaults.precise();
-    public static final MonotonicClock approxTime = 
Defaults.approx(preciseTime);
 
     /**
      * @see System#nanoTime()
@@ -77,10 +70,18 @@ public interface MonotonicClock
     public boolean isAfter(long instant);
     public boolean isAfter(long now, long instant);
 
-    static class Defaults
+    public static class Global
     {
         private static final Logger logger = 
LoggerFactory.getLogger(MonotonicClock.class);
 
+        /**
+         * Static singleton object that will be instantiated by default with a 
system clock
+         * implementation. Set <code>cassandra.clock</code> system property to 
a FQCN to use a
+         * different implementation instead.
+         */
+        public static final MonotonicClock preciseTime = precise();
+        public static final MonotonicClock approxTime = approx(preciseTime);
+
         private static MonotonicClock precise()
         {
             String sclock = CLOCK_MONOTONIC_PRECISE.getString();
diff --git 
a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java 
b/src/java/org/apache/cassandra/utils/Nemesis.java
similarity index 50%
copy from src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
copy to src/java/org/apache/cassandra/utils/Nemesis.java
index ecf073d..b5110c4 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
+++ b/src/java/org/apache/cassandra/utils/Nemesis.java
@@ -16,15 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.concurrent;
+package org.apache.cassandra.utils;
 
-import java.util.concurrent.ScheduledExecutorService;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-import org.apache.cassandra.utils.Shared;
+import static org.apache.cassandra.utils.Nemesis.Traffic.HIGH;
 
-import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
-
-@Shared(scope = SIMULATION)
-public interface ScheduledExecutorPlus extends ExecutorPlus, 
ScheduledExecutorService
+/**
+ * Annotate fields, particularly important volatile fields, where the system 
should adversarially schedule
+ * thread events around memory accesses (read or write).
+ *
+ * This can introduce significant simulation overhead, so should be used 
sparingly.
+ *
+ * TODO: Support @Nemesis on methods, to insert nemesis points either before 
or after invocations of the method
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.FIELD })
+public @interface Nemesis
 {
+    enum Traffic { LOW, HIGH }
+
+    Traffic traffic() default HIGH;
 }
diff --git a/src/java/org/apache/cassandra/utils/Simulate.java 
b/src/java/org/apache/cassandra/utils/Simulate.java
new file mode 100644
index 0000000..dd0d230
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Simulate.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Enable certain features for a specific method or class.
+ *
+ * Note that presently class level annotations are not inherited by inner 
classes.
+ *
+ * TODO: support package level, and apply to all nested classes
+ */
+public @interface Simulate
+{
+    enum With
+    {
+        /**
+         * Calls to FBUtilities.timestampMicros() will be guaranteed globally 
monotonically increasing.
+         *
+         * May be annotated at the method or class level.
+         */
+        GLOBAL_CLOCK,
+
+        /**
+         * synchronized methods and blocks, and wait/notify.
+         *
+         * May be annotated at the class level.
+         */
+        MONITORS,
+
+        /**
+         * Usages of LockSupport. This defaults to ON for all classes, 
including system classes.
+         *
+         * May be annotated at the method or class level.
+         */
+        LOCK_SUPPORT
+    }
+
+    With[] with() default {};
+    With[] without() default {};
+}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java 
b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index 86e3c12..026f9f2 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -316,6 +316,18 @@ public abstract class AbstractFuture<V> implements 
Future<V>
         return this;
     }
 
+
+    /**
+     * Support {@link 
com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, 
AsyncFunction, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    @Override
+    public <T> Future<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return map(mapper, null);
+    }
+
     /**
      * Support more fluid version of {@link 
com.google.common.util.concurrent.Futures#addCallback}
      *
@@ -435,31 +447,31 @@ public abstract class AbstractFuture<V> implements 
Future<V>
     @Override
     public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException
     {
-        return Awaitable.await(this, timeout, unit);
+        return Defaults.await(this, timeout, unit);
     }
 
     @Override
     public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) 
throws UncheckedInterruptedException
     {
-        return Awaitable.awaitThrowUncheckedOnInterrupt(this, time, units);
+        return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units);
     }
 
     @Override
     public boolean awaitUninterruptibly(long timeout, TimeUnit unit)
     {
-        return Awaitable.awaitUninterruptibly(this, timeout, unit);
+        return Defaults.awaitUninterruptibly(this, timeout, unit);
     }
 
     @Override
     public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) 
throws UncheckedInterruptedException
     {
-        return Awaitable.awaitUntilThrowUncheckedOnInterrupt(this, 
nanoTimeDeadline);
+        return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, 
nanoTimeDeadline);
     }
 
     @Override
     public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
     {
-        return Awaitable.awaitUntilUninterruptibly(this, nanoTimeDeadline);
+        return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline);
     }
 
     /**
@@ -468,7 +480,7 @@ public abstract class AbstractFuture<V> implements Future<V>
     @Override
     public Future<V> awaitUninterruptibly()
     {
-        return Awaitable.awaitUninterruptibly(this);
+        return Defaults.awaitUninterruptibly(this);
     }
 
     /**
@@ -477,7 +489,7 @@ public abstract class AbstractFuture<V> implements Future<V>
     @Override
     public Future<V> awaitThrowUncheckedOnInterrupt() throws 
UncheckedInterruptedException
     {
-        return Awaitable.awaitThrowUncheckedOnInterrupt(this);
+        return Defaults.awaitThrowUncheckedOnInterrupt(this);
     }
 
     public String toString()
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java 
b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index b09eeb7..0ef35d5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -123,7 +123,7 @@ public class AsyncFuture<V> extends AbstractFuture<V>
     }
 
     /**
-     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
+     * Support {@link com.google.common.util.concurrent.Futures#transform} 
natively
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java 
b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
index 03aab5f..25bdf02 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
@@ -105,91 +105,96 @@ public interface Awaitable
      */
     Awaitable awaitUninterruptibly();
 
-    public static boolean await(Awaitable await, long time, TimeUnit unit) 
throws InterruptedException
+    // we must declare the static implementation methods outside of the 
interface,
+    // so that they can be loaded by different classloaders during simulation
+    class Defaults
     {
-        return await.awaitUntil(nanoTime() + unit.toNanos(time));
-    }
-
-    public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long 
time, TimeUnit units) throws UncheckedInterruptedException
-    {
-        return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + 
units.toNanos(time));
-    }
-
-    public static boolean awaitUninterruptibly(Awaitable await, long time, 
TimeUnit units)
-    {
-        return awaitUntilUninterruptibly(await, nanoTime() + 
units.toNanos(time));
-    }
-
-    public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A 
await) throws UncheckedInterruptedException
-    {
-        try
-        {
-            await.await();
-        }
-        catch (InterruptedException e)
+        public static boolean await(Awaitable await, long time, TimeUnit unit) 
throws InterruptedException
         {
-            throw new UncheckedInterruptedException();
+            return await.awaitUntil(nanoTime() + unit.toNanos(time));
         }
-        return await;
-    }
 
-    public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, 
long nanoTimeDeadline) throws UncheckedInterruptedException
-    {
-        try
+        public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, 
long time, TimeUnit units) throws UncheckedInterruptedException
         {
-            return await.awaitUntil(nanoTimeDeadline);
+            return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + 
units.toNanos(time));
         }
-        catch (InterruptedException e)
+
+        public static boolean awaitUninterruptibly(Awaitable await, long time, 
TimeUnit units)
         {
-            throw new UncheckedInterruptedException();
+            return awaitUntilUninterruptibly(await, nanoTime() + 
units.toNanos(time));
         }
-    }
 
-    /**
-     * {@link Awaitable#awaitUntilUninterruptibly(long)}
-     */
-    public static boolean awaitUntilUninterruptibly(Awaitable await, long 
nanoTimeDeadline)
-    {
-        boolean interrupted = false;
-        boolean result;
-        while (true)
+        public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A 
await) throws UncheckedInterruptedException
         {
             try
             {
-                result = await.awaitUntil(nanoTimeDeadline);
-                break;
+                await.await();
             }
             catch (InterruptedException e)
             {
-                interrupted = true;
+                throw new UncheckedInterruptedException();
             }
+            return await;
         }
-        if (interrupted)
-            Thread.currentThread().interrupt();
-        return result;
-    }
 
-    /**
-     * {@link Awaitable#awaitUninterruptibly()}
-     */
-    public static <A extends Awaitable> A awaitUninterruptibly(A await)
-    {
-        boolean interrupted = false;
-        while (true)
+        public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable 
await, long nanoTimeDeadline) throws UncheckedInterruptedException
         {
             try
             {
-                await.await();
-                break;
+                return await.awaitUntil(nanoTimeDeadline);
             }
             catch (InterruptedException e)
             {
-                interrupted = true;
+                throw new UncheckedInterruptedException();
+            }
+        }
+
+        /**
+         * {@link Awaitable#awaitUntilUninterruptibly(long)}
+         */
+        public static boolean awaitUntilUninterruptibly(Awaitable await, long 
nanoTimeDeadline)
+        {
+            boolean interrupted = false;
+            boolean result;
+            while (true)
+            {
+                try
+                {
+                    result = await.awaitUntil(nanoTimeDeadline);
+                    break;
+                }
+                catch (InterruptedException e)
+                {
+                    interrupted = true;
+                }
+            }
+            if (interrupted)
+                Thread.currentThread().interrupt();
+            return result;
+        }
+
+        /**
+         * {@link Awaitable#awaitUninterruptibly()}
+         */
+        public static <A extends Awaitable> A awaitUninterruptibly(A await)
+        {
+            boolean interrupted = false;
+            while (true)
+            {
+                try
+                {
+                    await.await();
+                    break;
+                }
+                catch (InterruptedException e)
+                {
+                    interrupted = true;
+                }
             }
+            if (interrupted)
+                Thread.currentThread().interrupt();
+            return await;
         }
-        if (interrupted)
-            Thread.currentThread().interrupt();
-        return await;
     }
 
     abstract class AbstractAwaitable implements Awaitable
@@ -202,7 +207,7 @@ public interface Awaitable
         @Override
         public boolean await(long time, TimeUnit unit) throws 
InterruptedException
         {
-            return Awaitable.await(this, time, unit);
+            return Defaults.await(this, time, unit);
         }
 
         /**
@@ -211,7 +216,7 @@ public interface Awaitable
         @Override
         public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit 
units) throws UncheckedInterruptedException
         {
-            return Awaitable.awaitThrowUncheckedOnInterrupt(this, time, units);
+            return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units);
         }
 
         /**
@@ -227,7 +232,7 @@ public interface Awaitable
          */
         public Awaitable awaitThrowUncheckedOnInterrupt() throws 
UncheckedInterruptedException
         {
-            return Awaitable.awaitThrowUncheckedOnInterrupt(this);
+            return Defaults.awaitThrowUncheckedOnInterrupt(this);
         }
 
         /**
@@ -235,7 +240,7 @@ public interface Awaitable
          */
         public boolean awaitUntilThrowUncheckedOnInterrupt(long 
nanoTimeDeadline) throws UncheckedInterruptedException
         {
-            return Awaitable.awaitUntilThrowUncheckedOnInterrupt(this, 
nanoTimeDeadline);
+            return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, 
nanoTimeDeadline);
         }
 
         /**
@@ -243,7 +248,7 @@ public interface Awaitable
          */
         public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
         {
-            return Awaitable.awaitUntilUninterruptibly(this, nanoTimeDeadline);
+            return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline);
         }
 
         /**
@@ -251,7 +256,7 @@ public interface Awaitable
          */
         public Awaitable awaitUninterruptibly()
         {
-            return Awaitable.awaitUninterruptibly(this);
+            return Defaults.awaitUninterruptibly(this);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java 
b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index 69dc83d..fae5d43 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -96,6 +96,17 @@ public interface Future<V> extends 
io.netty.util.concurrent.Future<V>, Listenabl
         return this;
     }
 
+    /**
+     * waits for completion; in case of failure rethrows the original 
exception without a new wrapping exception
+     * so may cause problems for reporting stack traces
+     */
+    default Future<V> syncThrowUncheckedOnInterrupt()
+    {
+        awaitThrowUncheckedOnInterrupt();
+        rethrowIfFailed();
+        return this;
+    }
+
     @Deprecated
     @Override
     default boolean await(long l) throws InterruptedException
diff --git a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java 
b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
index 57737ea..40b908b 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
@@ -38,7 +38,7 @@ import static 
org.apache.cassandra.utils.concurrent.ListenerList.Notifying.NOTIF
 /**
  * Encapsulate one or more items in a linked-list that is immutable whilst 
shared, forming a prepend-only list (or stack).
  * Once the list is ready to consume, exclusive ownership is taken by clearing 
the shared variable containing it, after
- * which the list may be invoked using {@link #notify}, which reverses the 
list before invoking the work it contains.
+ * which the list may be invoked using {@link #notifyExclusive(ListenerList, 
Future)}, which reverses the list before invoking the work it contains.
  */
 abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
 {
@@ -93,7 +93,7 @@ abstract class ListenerList<V> extends 
IntrusiveStack<ListenerList<V>>
             {
                 while (true)
                 {
-                    notify(listeners, in);
+                    notifyExclusive(listeners, in);
                     if (updater.compareAndSet(in, NOTIFYING, null))
                         return;
 
@@ -113,17 +113,13 @@ abstract class ListenerList<V> extends 
IntrusiveStack<ListenerList<V>>
      *
      * @param head must be either a {@link ListenerList} or {@link 
GenericFutureListener}
      */
-    static <T> void notify(ListenerList<T> head, Future<T> future)
+    static <T> void notifyExclusive(ListenerList<T> head, Future<T> future)
     {
-        Executor notifyExecutor = future.notifyExecutor();
-        if (inExecutor(notifyExecutor))
-            notifyExecutor = null;
-
-        notify(head, notifyExecutor, future);
-    }
+        Executor notifyExecutor; {
+            Executor exec = future.notifyExecutor();
+            notifyExecutor = inExecutor(exec) ? null : exec;
+        }
 
-    private static <T> void notify(ListenerList<T> head, Executor 
notifyExecutor, Future<T> future)
-    {
         head = reverse(head);
         forEach(head, i -> i.notifySelf(notifyExecutor, future));
     }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java 
b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index c077467..f40e08f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -47,12 +47,15 @@ import org.apache.cassandra.io.util.SafeMemory;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Shared;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static java.util.Collections.emptyList;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -91,6 +94,13 @@ public final class Ref<T> implements RefCounted<T>
 {
     static final Logger logger = LoggerFactory.getLogger(Ref.class);
     public static final boolean DEBUG_ENABLED = 
System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+    static OnLeak ON_LEAK;
+
+    @Shared(scope = SIMULATION)
+    public interface OnLeak
+    {
+        void onLeak(Object state);
+    }
 
     final State state;
     final T referent;
@@ -227,6 +237,9 @@ public final class Ref<T> implements RefCounted<T>
                 logger.error("LEAK DETECTED: a reference ({}) to {} was not 
released before the reference was garbage collected", id, globalState);
                 if (DEBUG_ENABLED)
                     debug.log(id);
+                OnLeak onLeak = ON_LEAK;
+                if (onLeak != null)
+                    onLeak.onLeak(this);
             }
             else if (DEBUG_ENABLED)
             {
@@ -235,6 +248,12 @@ public final class Ref<T> implements RefCounted<T>
             if (fail != null)
                 logger.error("Error when closing {}", globalState, fail);
         }
+
+        @Override
+        public String toString()
+        {
+            return globalState.toString();
+        }
     }
 
     static final class Debug
@@ -678,7 +697,10 @@ public final class Ref<T> implements RefCounted<T>
         {
             final Set<Tidy> candidates = Collections.newSetFromMap(new 
IdentityHashMap<>());
             for (GlobalState state : globallyExtant)
-                candidates.add(state.tidy);
+            {
+                if (state.tidy != null)
+                    candidates.add(state.tidy);
+            }
             removeExpected(candidates);
             this.candidates.retainAll(candidates);
             if (!this.candidates.isEmpty())
@@ -706,6 +728,11 @@ public final class Ref<T> implements RefCounted<T>
         }
     }
 
+    public static void setOnLeak(OnLeak onLeak)
+    {
+        ON_LEAK = onLeak;
+    }
+
     @VisibleForTesting
     public static void shutdownReferenceReaper(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException
     {
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java 
b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
index c3f03a5..01c52c5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
@@ -19,15 +19,10 @@
 package org.apache.cassandra.utils.concurrent;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import net.openhft.chronicle.core.util.ThrowingConsumer;
 import org.apache.cassandra.utils.Intercept;
 import org.apache.cassandra.utils.Shared;
 
-import static java.lang.System.nanoTime;
-import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 @Shared(scope = SIMULATION)
@@ -88,7 +83,7 @@ public interface Semaphore
     @Intercept
     public static Semaphore newSemaphore(int permits)
     {
-        return new UnfairAsync(permits);
+        return new Standard(permits, false);
     }
 
     /**
@@ -99,160 +94,19 @@ public interface Semaphore
     @Intercept
     public static Semaphore newFairSemaphore(int permits)
     {
-        return new FairJDK(permits);
+        return new Standard(permits, true);
     }
 
-    /**
-     * An unfair semaphore, making no guarantees about thread starvation.
-     *
-     * TODO this Semaphore is potentially inefficient if used with release 
quantities other than 1
-     *      (this is unimportant at time of authoring)
-     */
-    public static class UnfairAsync implements Semaphore
+    public static class Standard extends java.util.concurrent.Semaphore 
implements Semaphore
     {
-        private static final AtomicReferenceFieldUpdater<UnfairAsync, 
WaitQueue> waitingUpdater = 
AtomicReferenceFieldUpdater.newUpdater(UnfairAsync.class, WaitQueue.class, 
"waiting");
-        private static final AtomicIntegerFieldUpdater<UnfairAsync> 
permitsUpdater = AtomicIntegerFieldUpdater.newUpdater(UnfairAsync.class, 
"permits");
-        private volatile WaitQueue waiting;
-        private volatile int permits;
-
-        // WARNING: if extending this class, consider simulator interactions
-        public UnfairAsync(int permits)
-        {
-            this.permits = permits;
-        }
-
-        /**
-         * {@link Semaphore#drain()}
-         */
-        public int drain()
-        {
-            return permitsUpdater.getAndSet(this, 0);
-        }
-
-        /**
-         * {@link Semaphore#permits()}
-         */
-        public int permits()
-        {
-            return permits;
-        }
-
-        /**
-         * {@link Semaphore#release(int)}
-         */
-        public void release(int permits)
-        {
-            if (permits < 0) throw new IllegalArgumentException();
-            if (permits > 0 && permitsUpdater.getAndAdd(this, permits) == 0)
-            {
-                if (waiting != null)
-                {
-                    if (permits > 1) waiting.signalAll();
-                    else waiting.signal();
-                }
-            }
-        }
-
-        /**
-         * {@link Semaphore#tryAcquire(int)}
-         */
-        public boolean tryAcquire(int acquire)
-        {
-            if (acquire < 0)
-                throw new IllegalArgumentException();
-            while (true)
-            {
-                int cur = permits;
-                if (cur < acquire)
-                    return false;
-                if (permitsUpdater.compareAndSet(this, cur, cur - acquire))
-                    return true;
-            }
-        }
-
-        /**
-         * {@link Semaphore#tryAcquire(int, long, TimeUnit)}
-         */
-        public boolean tryAcquire(int acquire, long time, TimeUnit unit) 
throws InterruptedException
-        {
-            return tryAcquireUntil(acquire, nanoTime() + unit.toNanos(time));
-        }
-
-        /**
-         * {@link Semaphore#tryAcquireUntil(int, long)}
-         */
-        public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) 
throws InterruptedException
-        {
-            boolean wait = true;
-            while (true)
-            {
-                int cur = permits;
-                if (cur < acquire)
-                {
-                    if (!wait) return false;
-
-                    WaitQueue.Signal signal = register();
-                    if (permits < acquire) wait = 
signal.awaitUntil(nanoTimeDeadline);
-                    else signal.cancel();
-                }
-                else if (permitsUpdater.compareAndSet(this, cur, cur - 
acquire))
-                    return true;
-            }
-        }
-
-        /**
-         * {@link Semaphore#acquire(int)}
-         */
-        public void acquire(int acquire) throws InterruptedException
+        public Standard(int permits)
         {
-            acquire(acquire, WaitQueue.Signal::await);
+            this(permits, false);
         }
 
-        /**
-         * {@link Semaphore#acquireThrowUncheckedOnInterrupt(int)}
-         */
-        public void acquireThrowUncheckedOnInterrupt(int acquire)
+        public Standard(int permits, boolean fair)
         {
-            acquire(acquire, WaitQueue.Signal::awaitThrowUncheckedOnInterrupt);
-        }
-
-        private <T extends Throwable> void acquire(int acquire, 
ThrowingConsumer<WaitQueue.Signal, T> wait) throws T
-        {
-            while (true)
-            {
-                int cur = permits;
-                if (cur < acquire)
-                {
-                    WaitQueue.Signal signal = register();
-                    if (permits < acquire) wait.accept(signal);
-                    else signal.cancel();
-                }
-                else if (permitsUpdater.compareAndSet(this, cur, cur - 
acquire))
-                    return;
-            }
-        }
-
-        private WaitQueue.Signal register()
-        {
-            if (waiting == null)
-                waitingUpdater.compareAndSet(this, null, newWaitQueue());
-            return waiting.register();
-        }
-    }
-
-    /**
-     * A fair semaphore, guaranteeing threads are signalled in the order they 
request permits.
-     *
-     * Unlike {@link UnfairAsync} this class is efficient for 
arbitrarily-sized increments and decrements,
-     * however it has the normal throughput bottleneck of fairness.
-     */
-    public static class FairJDK implements Semaphore
-    {
-        final java.util.concurrent.Semaphore wrapped;
-
-        public FairJDK(int permits)
-        {
-            wrapped = new java.util.concurrent.Semaphore(permits, true); // 
checkstyle: permit this instantiation
+            super(permits, fair);
         }
 
         /**
@@ -260,7 +114,7 @@ public interface Semaphore
          */
         public int drain()
         {
-            return wrapped.drainPermits();
+            return drainPermits();
         }
 
         /**
@@ -268,7 +122,7 @@ public interface Semaphore
          */
         public int permits()
         {
-            return wrapped.availablePermits();
+            return availablePermits();
         }
 
         /**
@@ -276,31 +130,7 @@ public interface Semaphore
          */
         public int waiting()
         {
-            return wrapped.getQueueLength();
-        }
-
-        /**
-         * {@link Semaphore#release(int)}
-         */
-        public void release(int permits)
-        {
-            wrapped.release(permits);
-        }
-
-        /**
-         * {@link Semaphore#tryAcquire(int)}
-         */
-        public boolean tryAcquire(int permits)
-        {
-            return wrapped.tryAcquire(permits);
-        }
-
-        /**
-         * {@link Semaphore#tryAcquire(int, long, TimeUnit)}
-         */
-        public boolean tryAcquire(int acquire, long time, TimeUnit unit) 
throws InterruptedException
-        {
-            return wrapped.tryAcquire(acquire, time, unit);
+            return getQueueLength();
         }
 
         /**
@@ -309,15 +139,7 @@ public interface Semaphore
         public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) 
throws InterruptedException
         {
             long wait = nanoTimeDeadline - System.nanoTime();
-            return wrapped.tryAcquire(acquire, Math.max(0, wait), 
TimeUnit.NANOSECONDS);
-        }
-
-        /**
-         * {@link Semaphore#acquire(int)}
-         */
-        public void acquire(int acquire) throws InterruptedException
-        {
-            wrapped.acquire(acquire);
+            return tryAcquire(acquire, Math.max(0, wait), 
TimeUnit.NANOSECONDS);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java 
b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 43648c0..a7b3473 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -90,7 +90,7 @@ public class SyncFuture<V> extends AbstractFuture<V>
     }
 
     /**
-     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
+     * Support {@link com.google.common.util.concurrent.Futures#transform} 
natively
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
@@ -165,7 +165,7 @@ public class SyncFuture<V> extends AbstractFuture<V>
 
     private void notifyListeners()
     {
-        ListenerList.notify(listeners, this);
+        ListenerList.notifyExclusive(listeners, this);
         listenersUpdater.lazySet(this, null);
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java 
b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index d656616..f302f4f 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.BufferPoolMetrics;
 import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 import static com.google.common.collect.ImmutableList.of;
@@ -55,6 +56,7 @@ import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFac
 import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
 import static org.apache.cassandra.utils.ExecutorUtils.*;
 import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect;
 
 /**
@@ -119,6 +121,7 @@ public class BufferPool
     private static final ByteBuffer EMPTY_BUFFER = 
ByteBuffer.allocateDirect(0);
 
     private volatile Debug debug = Debug.NO_OP;
+    private volatile DebugLeaks debugLeaks = DebugLeaks.NO_OP;
 
     protected final String name;
     protected final BufferPoolMetrics metrics;
@@ -305,10 +308,19 @@ public class BufferPool
         void recyclePartial(Chunk chunk);
     }
 
-    public void debug(Debug setDebug)
+    @Shared(scope = SIMULATION)
+    public interface DebugLeaks
     {
-        assert setDebug != null;
-        this.debug = setDebug;
+        public static DebugLeaks NO_OP = () -> {};
+        void leak();
+    }
+
+    public void debug(Debug newDebug, DebugLeaks newDebugLeaks)
+    {
+        if (newDebug != null)
+            this.debug = newDebug;
+        if (newDebugLeaks != null)
+            this.debugLeaks = newDebugLeaks;
     }
 
     interface Recycler
@@ -1025,6 +1037,7 @@ public class BufferPool
         Object obj = localPoolRefQueue.remove(100);
         if (obj instanceof LocalPoolRef)
         {
+            debugLeaks.leak();
             ((LocalPoolRef) obj).release();
             localPoolReferences.remove(obj);
         }
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java 
b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 0596aeb..ebe92ac 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.utils.memory;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 public class HeapPool extends MemtablePool
 {
     private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp();
@@ -58,6 +61,7 @@ public class HeapPool extends MemtablePool
 
     public static class Logged extends MemtablePool
     {
+        @Shared(scope = SIMULATION)
         public interface Listener
         {
             public void accept(long size, String table);
diff --git 
a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java 
b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
index d9e8372..9bb217c 100644
--- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
+++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
@@ -265,7 +265,7 @@ public class LongBufferPoolTest
         logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new 
Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration));
         logger.info("Testing BufferPool with memoryUsageThreshold={} and 
enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold());
         Debug debug = new Debug();
-        bufferPool.debug(debug);
+        bufferPool.debug(debug, null);
 
         TestEnvironment testEnv = new TestEnvironment(threadCount, duration, 
bufferPool.memoryUsageThreshold());
 
@@ -305,7 +305,7 @@ public class LongBufferPoolTest
         assertEquals(0, testEnv.executorService.shutdownNow().size());
 
         logger.info("Reverting BufferPool DEBUG config");
-        bufferPool.debug(BufferPool.Debug.NO_OP);
+        bufferPool.debug(BufferPool.Debug.NO_OP, null);
 
         testEnv.assertCheckedThreadsSucceeded();
 
diff --git 
a/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java 
b/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java
index eadacd1..52650ad 100644
--- a/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.utils.WithResources;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Semaphore;
 
+import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore;
+
 @Ignore
 public abstract class AbstractExecutorPlusTest
 {
@@ -100,9 +102,9 @@ public abstract class AbstractExecutorPlusTest
 
         SequentialExecutorPlus exec = builder.build();
 
-        Semaphore enter = new Semaphore.UnfairAsync(0);
-        Semaphore exit = new Semaphore.UnfairAsync(0);
-        Semaphore runAfter = new Semaphore.UnfairAsync(0);
+        Semaphore enter = newSemaphore(0);
+        Semaphore exit = newSemaphore(0);
+        Semaphore runAfter = newSemaphore(0);
         SequentialExecutorPlus.AtLeastOnceTrigger trigger;
         trigger = exec.atLeastOnceTrigger(() -> { enter.release(1); 
exit.acquireThrowUncheckedOnInterrupt(1); });
 
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java 
b/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java
index 77ed5ab..c4cb86d 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java
@@ -36,7 +36,7 @@ public class SemaphoreTest
     @Test
     public void testUnfair() throws InterruptedException
     {
-        Semaphore s = new Semaphore.UnfairAsync(2);
+        Semaphore s = Semaphore.newSemaphore(2);
         List<Future<Boolean>> fs = start(s);
         s.release(1);
         while (s.permits() == 1) Thread.yield();
@@ -54,7 +54,7 @@ public class SemaphoreTest
     @Test
     public void testFair() throws InterruptedException, ExecutionException, 
TimeoutException
     {
-        Semaphore s = new Semaphore.FairJDK(2);
+        Semaphore s = Semaphore.newFairSemaphore(2);
         List<Future<Boolean>> fs = start(s);
         s.release(1);
         fs.get(0).get(1L, TimeUnit.MINUTES);
@@ -83,9 +83,9 @@ public class SemaphoreTest
             try { s.tryAcquireUntil(1, System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(1L)); Assert.fail(); } catch 
(InterruptedException ignore) { }
             List<Future<Boolean>> fs = new ArrayList<>();
             fs.add(exec.submit(() -> s.tryAcquire(1, 1L, TimeUnit.MINUTES)));
-            while (s instanceof Semaphore.FairJDK && ((Semaphore.FairJDK) 
s).waiting() == 0) Thread.yield();
+            while (s instanceof Semaphore.Standard && ((Semaphore.Standard) 
s).waiting() == 0) Thread.yield();
             fs.add(exec.submit(() -> s.tryAcquireUntil(1, System.nanoTime() + 
TimeUnit.MINUTES.toNanos(1L))));
-            while (s instanceof Semaphore.FairJDK && ((Semaphore.FairJDK) 
s).waiting() == 1) Thread.yield();
+            while (s instanceof Semaphore.Standard && ((Semaphore.Standard) 
s).waiting() == 1) Thread.yield();
             fs.add(exec.submit(() -> { s.acquire(1); return true; } ));
             return fs;
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to