This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit eae581a5f07c340594f6af47bb558693ef363611 Author: Benedict Elliott Smith <bened...@apple.com> AuthorDate: Wed Nov 17 14:34:23 2021 +0000 [CEP-10] Cluster and Code Simulations: Minor improvements - Simplify Semaphore - Future improvements - ScheduledExecutorPlus improvements for simulator compatibility - Debug leaks in Ref or BufferPool - Support use of TokenMetadata without initialising Cassandra - Additional system properties and simulator flags - Permit Clock initialisation within separate ClassLoader - Introduce BallotGenerator patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-17008 --- .../cassandra/auth/CassandraRoleManager.java | 2 +- .../concurrent/ScheduledExecutorPlus.java | 24 +++ .../ScheduledThreadPoolExecutorPlus.java | 27 +++ .../cassandra/concurrent/SyncFutureTask.java | 5 +- .../config/CassandraRelevantProperties.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 4 + .../cql3/statements/BatchUpdatesCollector.java | 2 +- src/java/org/apache/cassandra/db/Keyspace.java | 2 +- src/java/org/apache/cassandra/db/Mutation.java | 2 +- src/java/org/apache/cassandra/db/ReadCommand.java | 2 +- .../cassandra/db/ReadExecutionController.java | 2 +- .../db/commitlog/AbstractCommitLogService.java | 3 +- .../cassandra/db/monitoring/MonitorableImpl.java | 2 +- .../cassandra/db/monitoring/MonitoringTask.java | 2 +- .../org/apache/cassandra/gms/FailureDetector.java | 4 +- .../apache/cassandra/hints/HintsDispatcher.java | 2 +- .../apache/cassandra/locator/TokenMetadata.java | 76 ++++---- src/java/org/apache/cassandra/metrics/Sampler.java | 2 +- .../cassandra/net/AbstractMessageHandler.java | 2 +- .../cassandra/net/InboundMessageHandler.java | 2 +- .../cassandra/net/InboundMessageHandlers.java | 2 +- src/java/org/apache/cassandra/net/Message.java | 2 +- .../apache/cassandra/net/OutboundConnection.java | 2 +- .../org/apache/cassandra/net/RequestCallbacks.java | 2 +- .../apache/cassandra/net/ResponseVerbHandler.java | 2 +- .../apache/cassandra/schema/MigrationManager.java | 4 + .../apache/cassandra/schema/SchemaKeyspace.java | 3 + .../cassandra/service/ActiveRepairService.java | 3 + .../org/apache/cassandra/service/StorageProxy.java | 28 ++- .../cassandra/service/paxos/BallotGenerator.java | 75 ++++++++ .../cassandra/service/paxos/ProposeCallback.java | 3 +- .../cassandra/transport/CQLMessageHandler.java | 2 +- src/java/org/apache/cassandra/utils/Clock.java | 31 +++- .../org/apache/cassandra/utils/MonotonicClock.java | 19 +- .../Nemesis.java} | 27 ++- src/java/org/apache/cassandra/utils/Simulate.java | 56 ++++++ .../cassandra/utils/concurrent/AbstractFuture.java | 26 ++- .../cassandra/utils/concurrent/AsyncFuture.java | 2 +- .../cassandra/utils/concurrent/Awaitable.java | 135 +++++++------- .../apache/cassandra/utils/concurrent/Future.java | 11 ++ .../cassandra/utils/concurrent/ListenerList.java | 18 +- .../org/apache/cassandra/utils/concurrent/Ref.java | 29 ++- .../cassandra/utils/concurrent/Semaphore.java | 200 ++------------------- .../cassandra/utils/concurrent/SyncFuture.java | 4 +- .../apache/cassandra/utils/memory/BufferPool.java | 19 +- .../apache/cassandra/utils/memory/HeapPool.java | 4 + .../cassandra/utils/memory/LongBufferPoolTest.java | 4 +- .../concurrent/AbstractExecutorPlusTest.java | 8 +- .../cassandra/utils/concurrent/SemaphoreTest.java | 8 +- 49 files changed, 515 insertions(+), 382 deletions(-) diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java index 0e49056..b813b55 100644 --- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java +++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java @@ -385,7 +385,7 @@ public class CassandraRoleManager implements IRoleManager protected void scheduleSetupTask(final Callable<Void> setupTask) { // The delay is to give the node a chance to see its peers before attempting the operation - ScheduledExecutors.optionalTasks.schedule(() -> { + ScheduledExecutors.optionalTasks.scheduleSelfRecurring(() -> { isClusterReady = true; try { diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java index ecf073d..a2b033a 100644 --- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java +++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java @@ -19,6 +19,8 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.utils.Shared; @@ -27,4 +29,26 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) public interface ScheduledExecutorPlus extends ExecutorPlus, ScheduledExecutorService { + /** + * Schedule an action that is recurring but self-administered. + */ + ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit units); + + /** + * Schedule a timeout action. This method is primarily used by the Simulator to modify its + * scheduling behaviour with respect to this operation. + */ + ScheduledFuture<?> scheduleAt(Runnable run, long deadline); + + /** + * Schedule a timeout action. This method is primarily used by the Simulator to modify its + * scheduling behaviour with respect to this operation. + */ + ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline); + + /** + * Schedule a timeout action. This method is primarily used by the Simulator to modify its + * scheduling behaviour with respect to this operation. + */ + ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit units); } diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java index efd284f..0ab09a4 100644 --- a/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java +++ b/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java @@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.service.StorageService; +import static com.google.common.primitives.Longs.max; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.concurrent.ExecutionFailure.propagating; import static org.apache.cassandra.concurrent.ExecutionFailure.suppressing; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; /** * Like ExecutorPlus, ScheduledThreadPoolExecutorPlus always @@ -97,6 +100,30 @@ public class ScheduledThreadPoolExecutorPlus extends ScheduledThreadPoolExecutor return super.scheduleWithFixedDelay(suppressing(task), initialDelay, delay, unit); } + @Override + public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit units) + { + return schedule(run, delay, units); + } + + @Override + public ScheduledFuture<?> scheduleAt(Runnable run, long deadline) + { + return schedule(run, max(0, deadline - nanoTime()), NANOSECONDS); + } + + @Override + public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline) + { + return scheduleTimeoutWithDelay(run, max(0, deadline - nanoTime()), NANOSECONDS); + } + + @Override + public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit units) + { + return schedule(run, delay, units); + } + /*======== BEGIN DIRECT COPY OF ThreadPoolExecutorPlus ===============*/ private <T extends Runnable> T addTask(T task) diff --git a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java index 4f4aa67..4885821 100644 --- a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java +++ b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java @@ -60,7 +60,10 @@ public class SyncFutureTask<T> extends SyncFuture<T> implements RunnableFuture<T try { if (!setUncancellable()) - throw new IllegalStateException(); + { + if (isCancelled()) return; + else throw new IllegalStateException(); + } if (!trySuccess(call.call())) throw new IllegalStateException(); diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 43db1c3..807516c 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -228,6 +228,7 @@ public enum CassandraRelevantProperties // properties for debugging simulator ASM output TEST_SIMULATOR_PRINT_ASM("cassandra.test.simulator.print_asm", "none"), TEST_SIMULATOR_PRINT_ASM_TYPES("cassandra.test.simulator.print_asm_types", ""), + TEST_SIMULATOR_LIVENESS_CHECK("cassandra.test.simulator.livenesscheck", "true"), // determinism properties for testing DETERMINISM_SSTABLE_COMPRESSION_DEFAULT("cassandra.sstable_compression_default", "true"), diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 8bb52d4..8265fd2 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -70,6 +70,7 @@ import org.apache.cassandra.security.EncryptionContext; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.CacheService.CacheType; import org.apache.cassandra.service.paxos.Paxos; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang3.ArrayUtils; @@ -80,6 +81,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.OS_ARCH; import static org.apache.cassandra.config.CassandraRelevantProperties.SUN_ARCH_DATA_MODEL; import static org.apache.cassandra.io.util.FileUtils.ONE_GB; import static org.apache.cassandra.io.util.FileUtils.ONE_MB; +import static org.apache.cassandra.utils.Clock.Global.logInitializationOutcome; public class DatabaseDescriptor { @@ -839,6 +841,8 @@ public class DatabaseDescriptor } Paxos.setPaxosVariant(conf.paxos_variant); + + logInitializationOutcome(logger); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java index cb88bdd..e5136f4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java @@ -30,7 +30,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionUpdate; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; /** * Utility class to collect updates. diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 795230e..285d08c 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -78,7 +78,7 @@ import org.apache.cassandra.utils.concurrent.Promise; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; /** * It represents a Keyspace. diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index a30b567..6350082 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -42,7 +42,7 @@ import org.apache.cassandra.utils.concurrent.Future; import static org.apache.cassandra.net.MessagingService.VERSION_30; import static org.apache.cassandra.net.MessagingService.VERSION_3014; import static org.apache.cassandra.net.MessagingService.VERSION_40; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; public class Mutation implements IMutation { diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index f14240b..3bf0d6d 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -72,7 +72,7 @@ import org.apache.cassandra.utils.ObjectSizes; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; /** diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java index 5bcd84b..2fbe3ac 100644 --- a/src/java/org/apache/cassandra/db/ReadExecutionController.java +++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java @@ -28,7 +28,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.concurrent.OpOrder; -import static org.apache.cassandra.utils.MonotonicClock.preciseTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; public class ReadExecutionController implements AutoCloseable { diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index be3f8cd..6b5378f 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -45,6 +45,7 @@ import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL; import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore; import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; @@ -151,7 +152,7 @@ public abstract class AbstractCommitLogService throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", syncIntervalNanos * 1e-6)); - SyncRunnable sync = new SyncRunnable(MonotonicClock.preciseTime); + SyncRunnable sync = new SyncRunnable(preciseTime); executor = executorFactory().infiniteLoop(name, sync, SAFE, NON_DAEMON, SYNCHRONIZED); } diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java index a6e7947..31b5404 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java @@ -18,7 +18,7 @@ package org.apache.cassandra.db.monitoring; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; public abstract class MonitorableImpl implements Monitorable { diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java index 52d6160..d681e4b 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java @@ -38,7 +38,7 @@ import org.apache.cassandra.utils.NoSpamLogger; import static java.lang.System.getProperty; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; /** diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 40a2de5..6db41de 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -28,7 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import javax.management.openmbean.*; -import org.apache.cassandra.io.util.File; + import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,7 @@ import org.apache.cassandra.utils.MBeanWrapper; import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR; import static org.apache.cassandra.config.DatabaseDescriptor.newFailureDetector; -import static org.apache.cassandra.utils.MonotonicClock.preciseTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; /** * This FailureDetector is an implementation of the paper titled diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index 2b6d9a3..b627338 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -39,7 +39,7 @@ import org.apache.cassandra.utils.concurrent.Condition; import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.*; import static org.apache.cassandra.metrics.HintsServiceMetrics.updateDelayMetrics; import static org.apache.cassandra.net.Verb.HINT_REQ; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; /** diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index fb9d43b..202bd6a 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -862,42 +862,8 @@ public class TokenMetadata { TokenMetadataDiagnostics.pendingRangeCalculationStarted(this, keyspaceName); - // create clone of current state - BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone; - Set<InetAddressAndPort> leavingEndpointsClone; - Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone; - TokenMetadata metadata; - - lock.readLock().lock(); - try - { - - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) - { - if (logger.isTraceEnabled()) - logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) - { - if (logger.isTraceEnabled()) - logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - pendingRanges.put(keyspaceName, new PendingRangeMaps()); - - return; - } - } + unsafeCalculatePendingRanges(strategy, keyspaceName); - bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens); - leavingEndpointsClone = new HashSet<>(this.leavingEndpoints); - movingEndpointsClone = new HashSet<>(this.movingEndpoints); - metadata = this.cloneOnlyTokenMap(); - } - finally - { - lock.readLock().unlock(); - } - - pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone, - leavingEndpointsClone, movingEndpointsClone)); if (logger.isDebugEnabled()) logger.debug("Starting pending range calculation for {}", keyspaceName); @@ -910,6 +876,46 @@ public class TokenMetadata } } + public void unsafeCalculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) + { + // create clone of current state + BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone; + Set<InetAddressAndPort> leavingEndpointsClone; + Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone; + TokenMetadata metadata; + + lock.readLock().lock(); + try + { + + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) + { + if (logger.isTraceEnabled()) + logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) + { + if (logger.isTraceEnabled()) + logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); + pendingRanges.put(keyspaceName, new PendingRangeMaps()); + + return; + } + } + + bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens); + leavingEndpointsClone = new HashSet<>(this.leavingEndpoints); + movingEndpointsClone = new HashSet<>(this.movingEndpoints); + metadata = this.cloneOnlyTokenMap(); + } + finally + { + lock.readLock().unlock(); + } + + pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone, + leavingEndpointsClone, movingEndpointsClone)); + } + /** * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String) */ diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java b/src/java/org/apache/cassandra/metrics/Sampler.java index 90cc90c..b3d0f21 100644 --- a/src/java/org/apache/cassandra/metrics/Sampler.java +++ b/src/java/org/apache/cassandra/metrics/Sampler.java @@ -40,7 +40,7 @@ public abstract class Sampler<T> } @VisibleForTesting - MonotonicClock clock = MonotonicClock.approxTime; + MonotonicClock clock = MonotonicClock.Global.approxTime; @VisibleForTesting static final ExecutorPlus samplerExecutor = executorFactory() diff --git a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java index 1045f28..e2cf68d 100644 --- a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java +++ b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java @@ -44,7 +44,7 @@ import org.apache.cassandra.net.ResourceLimits.Limit; import static java.lang.Math.max; import static java.lang.Math.min; import static org.apache.cassandra.net.Crc.InvalidCrc; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; /** * Core logic for handling inbound message deserialization and execution (in tandem with {@link FrameDecoder}). diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java b/src/java/org/apache/cassandra/net/InboundMessageHandler.java index c1b51be..e12fcec 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java @@ -43,7 +43,7 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.NoSpamLogger; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; /** * Implementation of {@link AbstractMessageHandler} for processing internode messages from peers. diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java index a706557..c7b9463 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java @@ -32,7 +32,7 @@ import org.apache.cassandra.metrics.InternodeInboundMetrics; import org.apache.cassandra.net.Message.Header; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; /** * An aggregation of {@link InboundMessageHandler}s for all connections from a peer. diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index 802c79f..8fe8971 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -57,7 +57,7 @@ import static org.apache.cassandra.net.MessagingService.VERSION_3014; import static org.apache.cassandra.net.MessagingService.VERSION_30; import static org.apache.cassandra.net.MessagingService.VERSION_40; import static org.apache.cassandra.net.MessagingService.instance; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; import static org.apache.cassandra.utils.vint.VIntCoding.computeUnsignedVIntSize; import static org.apache.cassandra.utils.vint.VIntCoding.getUnsignedVInt; import static org.apache.cassandra.utils.vint.VIntCoding.skipUnsignedVInt; diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index fbf0c73..c2aecb0 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -68,7 +68,7 @@ import static org.apache.cassandra.net.ResourceLimits.*; import static org.apache.cassandra.net.ResourceLimits.Outcome.*; import static org.apache.cassandra.net.SocketFactory.*; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; import static org.apache.cassandra.utils.Throwables.isCausedBy; import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch; diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java index 6275c15..fa1a03e 100644 --- a/src/java/org/apache/cassandra/net/RequestCallbacks.java +++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java @@ -49,7 +49,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -import static org.apache.cassandra.utils.MonotonicClock.preciseTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; /** * An expiring map of request callbacks. diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 369e5f4..1cee468 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -24,7 +24,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.tracing.Tracing; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; class ResponseVerbHandler implements IVerbHandler { diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index 6fbfc5d..8e485e0 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -23,6 +23,8 @@ import java.lang.management.ManagementFactory; import java.util.function.LongSupplier; import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +44,9 @@ import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.concurrent.Stage.MIGRATION; import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ; +import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK; +@Simulate(with = GLOBAL_CLOCK) public class MigrationManager { private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 6d5e331..90859ce 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -49,6 +49,7 @@ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Simulate; import static java.lang.String.format; @@ -58,6 +59,7 @@ import static java.util.stream.Collectors.toSet; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; import static org.apache.cassandra.schema.SchemaKeyspaceTables.*; +import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK; /** * system_schema.* tables and methods for manipulating them. @@ -294,6 +296,7 @@ public final class SchemaKeyspace /** * Add entries to system_schema.* for the hardcoded system keyspaces */ + @Simulate(with = GLOBAL_CLOCK) static void saveSystemKeyspacesSchema() { KeyspaceMetadata system = Schema.instance.getKeyspaceMetadata(SchemaConstants.SYSTEM_KEYSPACE_NAME); diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 322fd18..7d0a290 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -43,6 +43,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.locator.EndpointsByRange; import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +107,7 @@ import static org.apache.cassandra.config.DatabaseDescriptor.*; import static org.apache.cassandra.net.Message.out; import static org.apache.cassandra.net.Verb.PREPARE_MSG; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch; /** @@ -122,6 +124,7 @@ import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownL * The creation of a repair session is done through the submitRepairSession that * returns a future on the completion of that session. */ +@Simulate(with = MONITORS) public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean { diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 62bd01a..3883062 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -141,6 +141,7 @@ import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static com.google.common.collect.Iterables.concat; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.db.ConsistencyLevel.SERIAL; import static org.apache.cassandra.net.Message.out; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics; @@ -152,6 +153,8 @@ import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetr import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.*; import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup; +import static org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallotTimestampMicros; +import static org.apache.cassandra.service.paxos.BallotGenerator.Global.randomBallot; import static org.apache.cassandra.service.paxos.PrepareVerbHandler.doPrepare; import static org.apache.cassandra.service.paxos.ProposeVerbHandler.doPropose; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @@ -335,7 +338,6 @@ public class StorageProxy implements StorageProxyMBean consistencyForPaxos, consistencyForCommit, consistencyForCommit, - state, queryStartNanoTime, casWriteMetrics, updateProposer); @@ -430,7 +432,6 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForReplayCommits, ConsistencyLevel consistencyForCommit, - ClientState state, long queryStartNanoTime, CASClientRequestMetrics casMetrics, Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal) @@ -457,8 +458,7 @@ public class StorageProxy implements StorageProxyMBean replicaPlan, consistencyForPaxos, consistencyForReplayCommits, - casMetrics, - state); + casMetrics); final UUID ballot = pair.ballot; contentions += pair.contentions; @@ -525,8 +525,7 @@ public class StorageProxy implements StorageProxyMBean ReplicaPlan.ForPaxosWrite paxosPlan, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, - CASClientRequestMetrics casMetrics, - ClientState state) + CASClientRequestMetrics casMetrics) throws WriteTimeoutException, WriteFailureException { long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS); @@ -540,10 +539,10 @@ public class StorageProxy implements StorageProxyMBean // in progress (#5667). Lastly, we don't want to use a timestamp that is older than the last one assigned by ClientState or operations may appear // out-of-order (#7801). long minTimestampMicrosToUse = summary == null ? Long.MIN_VALUE : 1 + UUIDGen.microsTimestamp(summary.mostRecentInProgressCommit.ballot); - long ballotMicros = state.getTimestampForPaxos(minTimestampMicrosToUse); + long ballotMicros = nextBallotTimestampMicros(minTimestampMicrosToUse); // Note that ballotMicros is not guaranteed to be unique if two proposal are being handled concurrently by the same coordinator. But we still // need ballots to be unique for each proposal so we have to use getRandomTimeUUIDFromMicros. - UUID ballot = UUIDGen.getRandomTimeUUIDFromMicros(ballotMicros); + UUID ballot = randomBallot(ballotMicros, consistencyForPaxos == SERIAL); // prepare try @@ -1810,7 +1809,6 @@ public class StorageProxy implements StorageProxyMBean consistencyLevel, consistencyForReplayCommitsOrFetch, ConsistencyLevel.ANY, - state, start, casReadMetrics, updateProposer); @@ -2077,12 +2075,12 @@ public class StorageProxy implements StorageProxyMBean } else { - MessagingService.instance().metrics.recordSelfDroppedMessage(verb, MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); + MessagingService.instance().metrics.recordSelfDroppedMessage(verb, MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); } if (!readRejected) - MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); + MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); } catch (Throwable t) { @@ -2375,13 +2373,13 @@ public class StorageProxy implements StorageProxyMBean public DroppableRunnable(Verb verb) { - this.approxCreationTimeNanos = MonotonicClock.approxTime.now(); + this.approxCreationTimeNanos = MonotonicClock.Global.approxTime.now(); this.verb = verb; } public final void run() { - long approxCurrentTimeNanos = MonotonicClock.approxTime.now(); + long approxCurrentTimeNanos = MonotonicClock.Global.approxTime.now(); long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos); if (approxCurrentTimeNanos > expirationTimeNanos) { @@ -2408,7 +2406,7 @@ public class StorageProxy implements StorageProxyMBean */ private static abstract class LocalMutationRunnable implements Runnable { - private final long approxCreationTimeNanos = MonotonicClock.approxTime.now(); + private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now(); private final Replica localReplica; @@ -2420,7 +2418,7 @@ public class StorageProxy implements StorageProxyMBean public final void run() { final Verb verb = verb(); - long nowNanos = MonotonicClock.approxTime.now(); + long nowNanos = MonotonicClock.Global.approxTime.now(); long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos); if (nowNanos > expirationTimeNanos) { diff --git a/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java b/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java new file mode 100644 index 0000000..d031f38 --- /dev/null +++ b/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.paxos; + +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.Shared; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; + +@Shared(scope = SIMULATION) +public interface BallotGenerator +{ + static class Default implements BallotGenerator + { + public UUID randomBallot(long whenInMicros, boolean isSerial) + { + return UUIDGen.getRandomTimeUUIDFromMicros(whenInMicros, isSerial ? 2 : 1); + } + + public UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial) + { + long timestampMicros = ThreadLocalRandom.current().nextLong(fromInMicros, toInMicros); + return randomBallot(timestampMicros, isSerial); + } + + public long nextBallotTimestampMicros(long minTimestamp) + { + return ClientState.getTimestampForPaxos(minTimestamp); + } + + public long prevBallotTimestampMicros() + { + return ClientState.getLastTimestampMicros(); + } + } + + static class Global + { + private static BallotGenerator instance = new Default(); + public static UUID randomBallot(long whenInMicros, boolean isSerial) { return instance.randomBallot(whenInMicros, isSerial); } + public static UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial) { return instance.randomBallot(fromInMicros, toInMicros, isSerial); } + public static long nextBallotTimestampMicros(long minWhenInMicros) { return instance.nextBallotTimestampMicros(minWhenInMicros); } + public static long prevBallotTimestampMicros() { return instance.prevBallotTimestampMicros(); } + + public static void unsafeSet(BallotGenerator newInstance) + { + instance = newInstance; + } + } + + UUID randomBallot(long whenInMicros, boolean isSerial); + UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial); + long nextBallotTimestampMicros(long minWhenInMicros); + long prevBallotTimestampMicros(); +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java index dc2f9a7..64eca68 100644 --- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.net.Message; +import org.apache.cassandra.utils.Nemesis; /** * ProposeCallback has two modes of operation, controlled by the failFast parameter. @@ -46,7 +47,7 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean> { private static final Logger logger = LoggerFactory.getLogger(ProposeCallback.class); - private final AtomicInteger accepts = new AtomicInteger(0); + @Nemesis private final AtomicInteger accepts = new AtomicInteger(0); private final int requiredAccepts; private final boolean failFast; diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java index bbb8cb5..09e9996 100644 --- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java +++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java @@ -46,7 +46,7 @@ import org.apache.cassandra.transport.Flusher.FlushItem.Framed; import org.apache.cassandra.transport.messages.ErrorMessage; import org.apache.cassandra.utils.NoSpamLogger; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; /** * Implementation of {@link AbstractMessageHandler} for processing CQL messages which comprise a {@link Message} wrapped diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java index d1a7337..acdfc82 100644 --- a/src/java/org/apache/cassandra/utils/Clock.java +++ b/src/java/org/apache/cassandra/utils/Clock.java @@ -37,10 +37,12 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) public interface Clock { - static final Logger logger = LoggerFactory.getLogger(Clock.class); - public static class Global { + // something weird happens with class loading Logger that can cause a deadlock + private static Throwable FAILED_TO_INITIALISE; + private static String INITIALIZE_MESSAGE; + /** * Static singleton object that will be instantiated by default with a system clock * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a @@ -52,19 +54,38 @@ public interface Clock { String classname = CLOCK_GLOBAL.getString(); Clock clock = new Default(); + Throwable errorOutcome = null; + String outcome = null; if (classname != null) { try { - logger.debug("Using custom clock implementation: {}", classname); + outcome = "Using custom clock implementation: " + classname; clock = (Clock) Class.forName(classname).newInstance(); } - catch (Exception e) + catch (Throwable t) { - logger.error("Failed to load clock implementation {}", classname, e); + outcome = "Failed to load clock implementation " + classname; + errorOutcome = t; } } instance = clock; + FAILED_TO_INITIALISE = errorOutcome; + INITIALIZE_MESSAGE = outcome; + } + + public static void logInitializationOutcome(Logger logger) + { + if (FAILED_TO_INITIALISE != null) + { + logger.error(INITIALIZE_MESSAGE, FAILED_TO_INITIALISE); + } + else if (INITIALIZE_MESSAGE != null) + { + logger.debug(INITIALIZE_MESSAGE); + } + FAILED_TO_INITIALISE = null; + INITIALIZE_MESSAGE = null; } /** diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java b/src/java/org/apache/cassandra/utils/MonotonicClock.java index e14fd45..d4590c9 100644 --- a/src/java/org/apache/cassandra/utils/MonotonicClock.java +++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java @@ -39,7 +39,7 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; * Wrapper around time related functions that are either implemented by using the default JVM calls * or by using a custom implementation for testing purposes. * - * See {@link #preciseTime} for how to use a custom implementation. + * See {@link Global#preciseTime} for how to use a custom implementation. * * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an * implementation for {@link #now()} with the exact same properties of {@link System#nanoTime()}. @@ -49,13 +49,6 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) public interface MonotonicClock { - /** - * Static singleton object that will be instantiated by default with a system clock - * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a - * different implementation instead. - */ - public static final MonotonicClock preciseTime = Defaults.precise(); - public static final MonotonicClock approxTime = Defaults.approx(preciseTime); /** * @see System#nanoTime() @@ -77,10 +70,18 @@ public interface MonotonicClock public boolean isAfter(long instant); public boolean isAfter(long now, long instant); - static class Defaults + public static class Global { private static final Logger logger = LoggerFactory.getLogger(MonotonicClock.class); + /** + * Static singleton object that will be instantiated by default with a system clock + * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a + * different implementation instead. + */ + public static final MonotonicClock preciseTime = precise(); + public static final MonotonicClock approxTime = approx(preciseTime); + private static MonotonicClock precise() { String sclock = CLOCK_MONOTONIC_PRECISE.getString(); diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java b/src/java/org/apache/cassandra/utils/Nemesis.java similarity index 50% copy from src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java copy to src/java/org/apache/cassandra/utils/Nemesis.java index ecf073d..b5110c4 100644 --- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java +++ b/src/java/org/apache/cassandra/utils/Nemesis.java @@ -16,15 +16,28 @@ * limitations under the License. */ -package org.apache.cassandra.concurrent; +package org.apache.cassandra.utils; -import java.util.concurrent.ScheduledExecutorService; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; -import org.apache.cassandra.utils.Shared; +import static org.apache.cassandra.utils.Nemesis.Traffic.HIGH; -import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; - -@Shared(scope = SIMULATION) -public interface ScheduledExecutorPlus extends ExecutorPlus, ScheduledExecutorService +/** + * Annotate fields, particularly important volatile fields, where the system should adversarially schedule + * thread events around memory accesses (read or write). + * + * This can introduce significant simulation overhead, so should be used sparingly. + * + * TODO: Support @Nemesis on methods, to insert nemesis points either before or after invocations of the method + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.FIELD }) +public @interface Nemesis { + enum Traffic { LOW, HIGH } + + Traffic traffic() default HIGH; } diff --git a/src/java/org/apache/cassandra/utils/Simulate.java b/src/java/org/apache/cassandra/utils/Simulate.java new file mode 100644 index 0000000..dd0d230 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Simulate.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +/** + * Enable certain features for a specific method or class. + * + * Note that presently class level annotations are not inherited by inner classes. + * + * TODO: support package level, and apply to all nested classes + */ +public @interface Simulate +{ + enum With + { + /** + * Calls to FBUtilities.timestampMicros() will be guaranteed globally monotonically increasing. + * + * May be annotated at the method or class level. + */ + GLOBAL_CLOCK, + + /** + * synchronized methods and blocks, and wait/notify. + * + * May be annotated at the class level. + */ + MONITORS, + + /** + * Usages of LockSupport. This defaults to ON for all classes, including system classes. + * + * May be annotated at the method or class level. + */ + LOCK_SUPPORT + } + + With[] with() default {}; + With[] without() default {}; +} diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java index 86e3c12..026f9f2 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java @@ -316,6 +316,18 @@ public abstract class AbstractFuture<V> implements Future<V> return this; } + + /** + * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively + * + * See {@link #addListener(GenericFutureListener)} for ordering semantics. + */ + @Override + public <T> Future<T> map(Function<? super V, ? extends T> mapper) + { + return map(mapper, null); + } + /** * Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback} * @@ -435,31 +447,31 @@ public abstract class AbstractFuture<V> implements Future<V> @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return Awaitable.await(this, timeout, unit); + return Defaults.await(this, timeout, unit); } @Override public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException { - return Awaitable.awaitThrowUncheckedOnInterrupt(this, time, units); + return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units); } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { - return Awaitable.awaitUninterruptibly(this, timeout, unit); + return Defaults.awaitUninterruptibly(this, timeout, unit); } @Override public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException { - return Awaitable.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline); + return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline); } @Override public boolean awaitUntilUninterruptibly(long nanoTimeDeadline) { - return Awaitable.awaitUntilUninterruptibly(this, nanoTimeDeadline); + return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline); } /** @@ -468,7 +480,7 @@ public abstract class AbstractFuture<V> implements Future<V> @Override public Future<V> awaitUninterruptibly() { - return Awaitable.awaitUninterruptibly(this); + return Defaults.awaitUninterruptibly(this); } /** @@ -477,7 +489,7 @@ public abstract class AbstractFuture<V> implements Future<V> @Override public Future<V> awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException { - return Awaitable.awaitThrowUncheckedOnInterrupt(this); + return Defaults.awaitThrowUncheckedOnInterrupt(this); } public String toString() diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java index b09eeb7..0ef35d5 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java @@ -123,7 +123,7 @@ public class AsyncFuture<V> extends AbstractFuture<V> } /** - * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively + * Support {@link com.google.common.util.concurrent.Futures#transform} natively * * See {@link #addListener(GenericFutureListener)} for ordering semantics. */ diff --git a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java index 03aab5f..25bdf02 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java @@ -105,91 +105,96 @@ public interface Awaitable */ Awaitable awaitUninterruptibly(); - public static boolean await(Awaitable await, long time, TimeUnit unit) throws InterruptedException + // we must declare the static implementation methods outside of the interface, + // so that they can be loaded by different classloaders during simulation + class Defaults { - return await.awaitUntil(nanoTime() + unit.toNanos(time)); - } - - public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long time, TimeUnit units) throws UncheckedInterruptedException - { - return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + units.toNanos(time)); - } - - public static boolean awaitUninterruptibly(Awaitable await, long time, TimeUnit units) - { - return awaitUntilUninterruptibly(await, nanoTime() + units.toNanos(time)); - } - - public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A await) throws UncheckedInterruptedException - { - try - { - await.await(); - } - catch (InterruptedException e) + public static boolean await(Awaitable await, long time, TimeUnit unit) throws InterruptedException { - throw new UncheckedInterruptedException(); + return await.awaitUntil(nanoTime() + unit.toNanos(time)); } - return await; - } - public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, long nanoTimeDeadline) throws UncheckedInterruptedException - { - try + public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long time, TimeUnit units) throws UncheckedInterruptedException { - return await.awaitUntil(nanoTimeDeadline); + return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + units.toNanos(time)); } - catch (InterruptedException e) + + public static boolean awaitUninterruptibly(Awaitable await, long time, TimeUnit units) { - throw new UncheckedInterruptedException(); + return awaitUntilUninterruptibly(await, nanoTime() + units.toNanos(time)); } - } - /** - * {@link Awaitable#awaitUntilUninterruptibly(long)} - */ - public static boolean awaitUntilUninterruptibly(Awaitable await, long nanoTimeDeadline) - { - boolean interrupted = false; - boolean result; - while (true) + public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A await) throws UncheckedInterruptedException { try { - result = await.awaitUntil(nanoTimeDeadline); - break; + await.await(); } catch (InterruptedException e) { - interrupted = true; + throw new UncheckedInterruptedException(); } + return await; } - if (interrupted) - Thread.currentThread().interrupt(); - return result; - } - /** - * {@link Awaitable#awaitUninterruptibly()} - */ - public static <A extends Awaitable> A awaitUninterruptibly(A await) - { - boolean interrupted = false; - while (true) + public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, long nanoTimeDeadline) throws UncheckedInterruptedException { try { - await.await(); - break; + return await.awaitUntil(nanoTimeDeadline); } catch (InterruptedException e) { - interrupted = true; + throw new UncheckedInterruptedException(); + } + } + + /** + * {@link Awaitable#awaitUntilUninterruptibly(long)} + */ + public static boolean awaitUntilUninterruptibly(Awaitable await, long nanoTimeDeadline) + { + boolean interrupted = false; + boolean result; + while (true) + { + try + { + result = await.awaitUntil(nanoTimeDeadline); + break; + } + catch (InterruptedException e) + { + interrupted = true; + } + } + if (interrupted) + Thread.currentThread().interrupt(); + return result; + } + + /** + * {@link Awaitable#awaitUninterruptibly()} + */ + public static <A extends Awaitable> A awaitUninterruptibly(A await) + { + boolean interrupted = false; + while (true) + { + try + { + await.await(); + break; + } + catch (InterruptedException e) + { + interrupted = true; + } } + if (interrupted) + Thread.currentThread().interrupt(); + return await; } - if (interrupted) - Thread.currentThread().interrupt(); - return await; } abstract class AbstractAwaitable implements Awaitable @@ -202,7 +207,7 @@ public interface Awaitable @Override public boolean await(long time, TimeUnit unit) throws InterruptedException { - return Awaitable.await(this, time, unit); + return Defaults.await(this, time, unit); } /** @@ -211,7 +216,7 @@ public interface Awaitable @Override public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException { - return Awaitable.awaitThrowUncheckedOnInterrupt(this, time, units); + return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units); } /** @@ -227,7 +232,7 @@ public interface Awaitable */ public Awaitable awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException { - return Awaitable.awaitThrowUncheckedOnInterrupt(this); + return Defaults.awaitThrowUncheckedOnInterrupt(this); } /** @@ -235,7 +240,7 @@ public interface Awaitable */ public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException { - return Awaitable.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline); + return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline); } /** @@ -243,7 +248,7 @@ public interface Awaitable */ public boolean awaitUntilUninterruptibly(long nanoTimeDeadline) { - return Awaitable.awaitUntilUninterruptibly(this, nanoTimeDeadline); + return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline); } /** @@ -251,7 +256,7 @@ public interface Awaitable */ public Awaitable awaitUninterruptibly() { - return Awaitable.awaitUninterruptibly(this); + return Defaults.awaitUninterruptibly(this); } } diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java index 69dc83d..fae5d43 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Future.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java @@ -96,6 +96,17 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl return this; } + /** + * waits for completion; in case of failure rethrows the original exception without a new wrapping exception + * so may cause problems for reporting stack traces + */ + default Future<V> syncThrowUncheckedOnInterrupt() + { + awaitThrowUncheckedOnInterrupt(); + rethrowIfFailed(); + return this; + } + @Deprecated @Override default boolean await(long l) throws InterruptedException diff --git a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java index 57737ea..40b908b 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java +++ b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java @@ -38,7 +38,7 @@ import static org.apache.cassandra.utils.concurrent.ListenerList.Notifying.NOTIF /** * Encapsulate one or more items in a linked-list that is immutable whilst shared, forming a prepend-only list (or stack). * Once the list is ready to consume, exclusive ownership is taken by clearing the shared variable containing it, after - * which the list may be invoked using {@link #notify}, which reverses the list before invoking the work it contains. + * which the list may be invoked using {@link #notifyExclusive(ListenerList, Future)}, which reverses the list before invoking the work it contains. */ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>> { @@ -93,7 +93,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>> { while (true) { - notify(listeners, in); + notifyExclusive(listeners, in); if (updater.compareAndSet(in, NOTIFYING, null)) return; @@ -113,17 +113,13 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>> * * @param head must be either a {@link ListenerList} or {@link GenericFutureListener} */ - static <T> void notify(ListenerList<T> head, Future<T> future) + static <T> void notifyExclusive(ListenerList<T> head, Future<T> future) { - Executor notifyExecutor = future.notifyExecutor(); - if (inExecutor(notifyExecutor)) - notifyExecutor = null; - - notify(head, notifyExecutor, future); - } + Executor notifyExecutor; { + Executor exec = future.notifyExecutor(); + notifyExecutor = inExecutor(exec) ? null : exec; + } - private static <T> void notify(ListenerList<T> head, Executor notifyExecutor, Future<T> future) - { head = reverse(head); forEach(head, i -> i.notifySelf(notifyExecutor, future)); } diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java index c077467..f40e08f 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java @@ -47,12 +47,15 @@ import org.apache.cassandra.io.util.SafeMemory; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.Shared; + import org.cliffc.high_scale_lib.NonBlockingHashMap; import static java.util.Collections.emptyList; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE; +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; import static org.apache.cassandra.utils.Throwables.maybeFail; import static org.apache.cassandra.utils.Throwables.merge; @@ -91,6 +94,13 @@ public final class Ref<T> implements RefCounted<T> { static final Logger logger = LoggerFactory.getLogger(Ref.class); public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true"); + static OnLeak ON_LEAK; + + @Shared(scope = SIMULATION) + public interface OnLeak + { + void onLeak(Object state); + } final State state; final T referent; @@ -227,6 +237,9 @@ public final class Ref<T> implements RefCounted<T> logger.error("LEAK DETECTED: a reference ({}) to {} was not released before the reference was garbage collected", id, globalState); if (DEBUG_ENABLED) debug.log(id); + OnLeak onLeak = ON_LEAK; + if (onLeak != null) + onLeak.onLeak(this); } else if (DEBUG_ENABLED) { @@ -235,6 +248,12 @@ public final class Ref<T> implements RefCounted<T> if (fail != null) logger.error("Error when closing {}", globalState, fail); } + + @Override + public String toString() + { + return globalState.toString(); + } } static final class Debug @@ -678,7 +697,10 @@ public final class Ref<T> implements RefCounted<T> { final Set<Tidy> candidates = Collections.newSetFromMap(new IdentityHashMap<>()); for (GlobalState state : globallyExtant) - candidates.add(state.tidy); + { + if (state.tidy != null) + candidates.add(state.tidy); + } removeExpected(candidates); this.candidates.retainAll(candidates); if (!this.candidates.isEmpty()) @@ -706,6 +728,11 @@ public final class Ref<T> implements RefCounted<T> } } + public static void setOnLeak(OnLeak onLeak) + { + ON_LEAK = onLeak; + } + @VisibleForTesting public static void shutdownReferenceReaper(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { diff --git a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java index c3f03a5..01c52c5 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java @@ -19,15 +19,10 @@ package org.apache.cassandra.utils.concurrent; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import net.openhft.chronicle.core.util.ThrowingConsumer; import org.apache.cassandra.utils.Intercept; import org.apache.cassandra.utils.Shared; -import static java.lang.System.nanoTime; -import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) @@ -88,7 +83,7 @@ public interface Semaphore @Intercept public static Semaphore newSemaphore(int permits) { - return new UnfairAsync(permits); + return new Standard(permits, false); } /** @@ -99,160 +94,19 @@ public interface Semaphore @Intercept public static Semaphore newFairSemaphore(int permits) { - return new FairJDK(permits); + return new Standard(permits, true); } - /** - * An unfair semaphore, making no guarantees about thread starvation. - * - * TODO this Semaphore is potentially inefficient if used with release quantities other than 1 - * (this is unimportant at time of authoring) - */ - public static class UnfairAsync implements Semaphore + public static class Standard extends java.util.concurrent.Semaphore implements Semaphore { - private static final AtomicReferenceFieldUpdater<UnfairAsync, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(UnfairAsync.class, WaitQueue.class, "waiting"); - private static final AtomicIntegerFieldUpdater<UnfairAsync> permitsUpdater = AtomicIntegerFieldUpdater.newUpdater(UnfairAsync.class, "permits"); - private volatile WaitQueue waiting; - private volatile int permits; - - // WARNING: if extending this class, consider simulator interactions - public UnfairAsync(int permits) - { - this.permits = permits; - } - - /** - * {@link Semaphore#drain()} - */ - public int drain() - { - return permitsUpdater.getAndSet(this, 0); - } - - /** - * {@link Semaphore#permits()} - */ - public int permits() - { - return permits; - } - - /** - * {@link Semaphore#release(int)} - */ - public void release(int permits) - { - if (permits < 0) throw new IllegalArgumentException(); - if (permits > 0 && permitsUpdater.getAndAdd(this, permits) == 0) - { - if (waiting != null) - { - if (permits > 1) waiting.signalAll(); - else waiting.signal(); - } - } - } - - /** - * {@link Semaphore#tryAcquire(int)} - */ - public boolean tryAcquire(int acquire) - { - if (acquire < 0) - throw new IllegalArgumentException(); - while (true) - { - int cur = permits; - if (cur < acquire) - return false; - if (permitsUpdater.compareAndSet(this, cur, cur - acquire)) - return true; - } - } - - /** - * {@link Semaphore#tryAcquire(int, long, TimeUnit)} - */ - public boolean tryAcquire(int acquire, long time, TimeUnit unit) throws InterruptedException - { - return tryAcquireUntil(acquire, nanoTime() + unit.toNanos(time)); - } - - /** - * {@link Semaphore#tryAcquireUntil(int, long)} - */ - public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) throws InterruptedException - { - boolean wait = true; - while (true) - { - int cur = permits; - if (cur < acquire) - { - if (!wait) return false; - - WaitQueue.Signal signal = register(); - if (permits < acquire) wait = signal.awaitUntil(nanoTimeDeadline); - else signal.cancel(); - } - else if (permitsUpdater.compareAndSet(this, cur, cur - acquire)) - return true; - } - } - - /** - * {@link Semaphore#acquire(int)} - */ - public void acquire(int acquire) throws InterruptedException + public Standard(int permits) { - acquire(acquire, WaitQueue.Signal::await); + this(permits, false); } - /** - * {@link Semaphore#acquireThrowUncheckedOnInterrupt(int)} - */ - public void acquireThrowUncheckedOnInterrupt(int acquire) + public Standard(int permits, boolean fair) { - acquire(acquire, WaitQueue.Signal::awaitThrowUncheckedOnInterrupt); - } - - private <T extends Throwable> void acquire(int acquire, ThrowingConsumer<WaitQueue.Signal, T> wait) throws T - { - while (true) - { - int cur = permits; - if (cur < acquire) - { - WaitQueue.Signal signal = register(); - if (permits < acquire) wait.accept(signal); - else signal.cancel(); - } - else if (permitsUpdater.compareAndSet(this, cur, cur - acquire)) - return; - } - } - - private WaitQueue.Signal register() - { - if (waiting == null) - waitingUpdater.compareAndSet(this, null, newWaitQueue()); - return waiting.register(); - } - } - - /** - * A fair semaphore, guaranteeing threads are signalled in the order they request permits. - * - * Unlike {@link UnfairAsync} this class is efficient for arbitrarily-sized increments and decrements, - * however it has the normal throughput bottleneck of fairness. - */ - public static class FairJDK implements Semaphore - { - final java.util.concurrent.Semaphore wrapped; - - public FairJDK(int permits) - { - wrapped = new java.util.concurrent.Semaphore(permits, true); // checkstyle: permit this instantiation + super(permits, fair); } /** @@ -260,7 +114,7 @@ public interface Semaphore */ public int drain() { - return wrapped.drainPermits(); + return drainPermits(); } /** @@ -268,7 +122,7 @@ public interface Semaphore */ public int permits() { - return wrapped.availablePermits(); + return availablePermits(); } /** @@ -276,31 +130,7 @@ public interface Semaphore */ public int waiting() { - return wrapped.getQueueLength(); - } - - /** - * {@link Semaphore#release(int)} - */ - public void release(int permits) - { - wrapped.release(permits); - } - - /** - * {@link Semaphore#tryAcquire(int)} - */ - public boolean tryAcquire(int permits) - { - return wrapped.tryAcquire(permits); - } - - /** - * {@link Semaphore#tryAcquire(int, long, TimeUnit)} - */ - public boolean tryAcquire(int acquire, long time, TimeUnit unit) throws InterruptedException - { - return wrapped.tryAcquire(acquire, time, unit); + return getQueueLength(); } /** @@ -309,15 +139,7 @@ public interface Semaphore public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) throws InterruptedException { long wait = nanoTimeDeadline - System.nanoTime(); - return wrapped.tryAcquire(acquire, Math.max(0, wait), TimeUnit.NANOSECONDS); - } - - /** - * {@link Semaphore#acquire(int)} - */ - public void acquire(int acquire) throws InterruptedException - { - wrapped.acquire(acquire); + return tryAcquire(acquire, Math.max(0, wait), TimeUnit.NANOSECONDS); } @Override diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java index 43648c0..a7b3473 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java +++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java @@ -90,7 +90,7 @@ public class SyncFuture<V> extends AbstractFuture<V> } /** - * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively + * Support {@link com.google.common.util.concurrent.Futures#transform} natively * * See {@link #addListener(GenericFutureListener)} for ordering semantics. */ @@ -165,7 +165,7 @@ public class SyncFuture<V> extends AbstractFuture<V> private void notifyListeners() { - ListenerList.notify(listeners, this); + ListenerList.notifyExclusive(listeners, this); listenersUpdater.lazySet(this, null); } } diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index d656616..f302f4f 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -48,6 +48,7 @@ import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.BufferPoolMetrics; import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Shared; import org.apache.cassandra.utils.concurrent.Ref; import static com.google.common.collect.ImmutableList.of; @@ -55,6 +56,7 @@ import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFac import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE; import static org.apache.cassandra.utils.ExecutorUtils.*; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect; /** @@ -119,6 +121,7 @@ public class BufferPool private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); private volatile Debug debug = Debug.NO_OP; + private volatile DebugLeaks debugLeaks = DebugLeaks.NO_OP; protected final String name; protected final BufferPoolMetrics metrics; @@ -305,10 +308,19 @@ public class BufferPool void recyclePartial(Chunk chunk); } - public void debug(Debug setDebug) + @Shared(scope = SIMULATION) + public interface DebugLeaks { - assert setDebug != null; - this.debug = setDebug; + public static DebugLeaks NO_OP = () -> {}; + void leak(); + } + + public void debug(Debug newDebug, DebugLeaks newDebugLeaks) + { + if (newDebug != null) + this.debug = newDebug; + if (newDebugLeaks != null) + this.debugLeaks = newDebugLeaks; } interface Recycler @@ -1025,6 +1037,7 @@ public class BufferPool Object obj = localPoolRefQueue.remove(100); if (obj instanceof LocalPoolRef) { + debugLeaks.leak(); ((LocalPoolRef) obj).release(); localPoolReferences.remove(obj); } diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java index 0596aeb..ebe92ac 100644 --- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java +++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java @@ -21,8 +21,11 @@ package org.apache.cassandra.utils.memory; import java.nio.ByteBuffer; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.utils.Shared; import org.apache.cassandra.utils.concurrent.OpOrder; +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; + public class HeapPool extends MemtablePool { private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp(); @@ -58,6 +61,7 @@ public class HeapPool extends MemtablePool public static class Logged extends MemtablePool { + @Shared(scope = SIMULATION) public interface Listener { public void accept(long size, String table); diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index d9e8372..9bb217c 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -265,7 +265,7 @@ public class LongBufferPoolTest logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration)); logger.info("Testing BufferPool with memoryUsageThreshold={} and enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold()); Debug debug = new Debug(); - bufferPool.debug(debug); + bufferPool.debug(debug, null); TestEnvironment testEnv = new TestEnvironment(threadCount, duration, bufferPool.memoryUsageThreshold()); @@ -305,7 +305,7 @@ public class LongBufferPoolTest assertEquals(0, testEnv.executorService.shutdownNow().size()); logger.info("Reverting BufferPool DEBUG config"); - bufferPool.debug(BufferPool.Debug.NO_OP); + bufferPool.debug(BufferPool.Debug.NO_OP, null); testEnv.assertCheckedThreadsSucceeded(); diff --git a/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java b/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java index eadacd1..52650ad 100644 --- a/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java +++ b/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java @@ -32,6 +32,8 @@ import org.apache.cassandra.utils.WithResources; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.Semaphore; +import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore; + @Ignore public abstract class AbstractExecutorPlusTest { @@ -100,9 +102,9 @@ public abstract class AbstractExecutorPlusTest SequentialExecutorPlus exec = builder.build(); - Semaphore enter = new Semaphore.UnfairAsync(0); - Semaphore exit = new Semaphore.UnfairAsync(0); - Semaphore runAfter = new Semaphore.UnfairAsync(0); + Semaphore enter = newSemaphore(0); + Semaphore exit = newSemaphore(0); + Semaphore runAfter = newSemaphore(0); SequentialExecutorPlus.AtLeastOnceTrigger trigger; trigger = exec.atLeastOnceTrigger(() -> { enter.release(1); exit.acquireThrowUncheckedOnInterrupt(1); }); diff --git a/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java b/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java index 77ed5ab..c4cb86d 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java @@ -36,7 +36,7 @@ public class SemaphoreTest @Test public void testUnfair() throws InterruptedException { - Semaphore s = new Semaphore.UnfairAsync(2); + Semaphore s = Semaphore.newSemaphore(2); List<Future<Boolean>> fs = start(s); s.release(1); while (s.permits() == 1) Thread.yield(); @@ -54,7 +54,7 @@ public class SemaphoreTest @Test public void testFair() throws InterruptedException, ExecutionException, TimeoutException { - Semaphore s = new Semaphore.FairJDK(2); + Semaphore s = Semaphore.newFairSemaphore(2); List<Future<Boolean>> fs = start(s); s.release(1); fs.get(0).get(1L, TimeUnit.MINUTES); @@ -83,9 +83,9 @@ public class SemaphoreTest try { s.tryAcquireUntil(1, System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(1L)); Assert.fail(); } catch (InterruptedException ignore) { } List<Future<Boolean>> fs = new ArrayList<>(); fs.add(exec.submit(() -> s.tryAcquire(1, 1L, TimeUnit.MINUTES))); - while (s instanceof Semaphore.FairJDK && ((Semaphore.FairJDK) s).waiting() == 0) Thread.yield(); + while (s instanceof Semaphore.Standard && ((Semaphore.Standard) s).waiting() == 0) Thread.yield(); fs.add(exec.submit(() -> s.tryAcquireUntil(1, System.nanoTime() + TimeUnit.MINUTES.toNanos(1L)))); - while (s instanceof Semaphore.FairJDK && ((Semaphore.FairJDK) s).waiting() == 1) Thread.yield(); + while (s instanceof Semaphore.Standard && ((Semaphore.Standard) s).waiting() == 1) Thread.yield(); fs.add(exec.submit(() -> { s.acquire(1); return true; } )); return fs; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org