This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6f59f68c1bab37ed1c77794d7382bfdc09c405f3 Author: David Capwell <dcapw...@apple.com> AuthorDate: Wed Oct 4 15:13:37 2023 -0700 Get simulator working (again) Co-authored-by: Ariel Weisberg <aweisb...@apple.com> Co-authored-by: Benedict Elliott Smith <bened...@apache.org> --- .build/checkstyle_suppressions.xml | 1 - build.xml | 18 ++++ .../cassandra/concurrent/InfiniteLoopExecutor.java | 5 + .../config/CassandraRelevantProperties.java | 1 + .../db/memtable/AbstractAllocatorMemtable.java | 6 ++ .../apache/cassandra/journal/ActiveSegment.java | 9 ++ src/java/org/apache/cassandra/journal/Flusher.java | 29 ++++-- src/java/org/apache/cassandra/journal/Journal.java | 3 + src/java/org/apache/cassandra/journal/Params.java | 2 +- .../cassandra/metrics/AccordStateCacheMetrics.java | 6 +- .../service/accord/AccordConfigurationService.java | 61 +++++++++--- .../service/accord/AccordFastPathCoordinator.java | 2 +- .../cassandra/service/accord/AccordJournal.java | 4 +- .../cassandra/service/accord/AccordService.java | 33 +++++-- .../cassandra/utils/concurrent/Semaphore.java | 4 +- .../cassandra/distributed/impl/Instance.java | 3 + .../distributed/impl/IsolatedExecutor.java | 2 +- .../cassandra/simulator/asm/ClassTransformer.java | 4 + .../simulator/asm/GlobalMethodTransformer.java | 3 +- .../cassandra/simulator/asm/InterceptAgent.java | 106 ++++++++++++++++++++- .../cassandra/simulator/asm/InterceptClasses.java | 2 + .../simulator/asm/MonitorMethodTransformer.java | 3 +- .../cassandra/simulator/asm/StringHashcode.java | 44 +++++++++ .../apache/cassandra/simulator/ActionSchedule.java | 17 ++-- .../cassandra/simulator/ClusterSimulation.java | 5 + .../cassandra/simulator/SimulationRunner.java | 12 +++ .../simulator/cluster/KeyspaceActions.java | 1 + .../apache/cassandra/simulator/debug/Record.java | 4 +- .../cassandra/simulator/debug/SelfReconcile.java | 2 + .../AbstractPairOfSequencesPaxosSimulation.java | 11 +-- .../simulator/paxos/AccordClusterSimulation.java | 5 +- .../simulator/paxos/AccordSimulationRunner.java | 27 ++++++ .../paxos/PairOfSequencesAccordSimulation.java | 6 +- .../simulator/paxos/PaxosSimulationRunner.java | 2 + .../simulator/systems/InterceptingMonitors.java | 3 - .../simulator/test/SimulationTestBase.java | 1 + .../org/apache/cassandra/journal/TestParams.java | 2 +- 37 files changed, 386 insertions(+), 63 deletions(-) diff --git a/.build/checkstyle_suppressions.xml b/.build/checkstyle_suppressions.xml index ed4d1443f7..230c808c14 100644 --- a/.build/checkstyle_suppressions.xml +++ b/.build/checkstyle_suppressions.xml @@ -21,5 +21,4 @@ "https://checkstyle.org/dtds/suppressions_1_1.dtd"> <suppressions> - <suppress checks="RegexpSinglelineJava" files="Semaphore\.java"/> </suppressions> diff --git a/build.xml b/build.xml index 53b273478f..e0bbe27b17 100644 --- a/build.xml +++ b/build.xml @@ -223,6 +223,24 @@ <condition property="is.java.default"><equals arg1="${ant.java.version}" arg2="${java.default}"/></condition> <echo unless:true="${is.java.default}" message="Non default JDK version used: ${ant.java.version}"/> + <condition property="arch_x86"> + <equals arg1="${os.arch}" arg2="x86" /> + </condition> + <!-- On non-X86 JDK 8 (such as M1 Mac) the smallest allowed Xss is 384k; so need a larger value + when on these platforms. --> + <condition property="jvm_xss" value="-Xss256k" else="-Xss384k"> + <isset property="arch_x86" /> + </condition> + <condition property="java.version.11"> + <!-- This includes every JDK other than 8; so JDK 9 is flagged as JDK 11, and JDK 17 is as well... at the moment this is desired behavior + and may be relooked at once JDK 8 support is dropped --> + <not><isset property="java.version.8"/></not> + </condition> + <fail message="Unsupported JDK version used: ${ant.java.version}"><condition><not><or> + <isset property="java.version.8"/> + <isset property="java.version.11"/> + </or></not></condition></fail> + <condition property="arch_x86"> <equals arg1="${os.arch}" arg2="x86" /> </condition> diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java index 51c5f9f69e..97410a9138 100644 --- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java @@ -52,6 +52,11 @@ public class InfiniteLoopExecutor implements Interruptible @Shared(scope = Shared.Scope.SIMULATION) public enum SimulatorSafe { SAFE, UNSAFE } + /** + * Does this loop always block on some external work provision that is going to be simulator-controlled, or does + * it loop periodically? If the latter, it may prevent simulation making progress between phases, and should be + * marked as a DAEMON process. + */ @Shared(scope = Shared.Scope.SIMULATION) public enum Daemon { DAEMON, NON_DAEMON } diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 3cbfab0307..c6cfe9e3db 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -564,6 +564,7 @@ public enum CassandraRelevantProperties * faster. Note that this is disabled for unit tests but if an individual test requires schema to be flushed, it * can be also done manually for that particular case: {@code flush(SchemaConstants.SCHEMA_KEYSPACE_NAME);}. */ TEST_FLUSH_LOCAL_SCHEMA_CHANGES("cassandra.test.flush_local_schema_changes", "true"), + TEST_HISTORY_VALIDATOR_LOGGING_ENABLED("cassandra.test.history_validator.logging.enabled", "false"), TEST_IGNORE_SIGAR("cassandra.test.ignore_sigar"), TEST_INVALID_LEGACY_SSTABLE_ROOT("invalid-legacy-sstable-root"), TEST_JVM_DTEST_DISABLE_SSL("cassandra.test.disable_ssl"), diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java index b431d360ed..2dbe41374f 100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java @@ -220,6 +220,12 @@ public abstract class AbstractAllocatorMemtable extends AbstractMemtableWithComm if (current instanceof AbstractAllocatorMemtable) ((AbstractAllocatorMemtable) current).flushIfPeriodExpired(); } + + @Override + public String toString() + { + return "Scheduled Flush of " + owner; + } }; ScheduledExecutors.scheduledTasks.scheduleSelfRecurring(runnable, period, TimeUnit.MILLISECONDS); } diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index 22a3aba766..f16126c157 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -33,6 +33,9 @@ import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.WaitQueue; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; + +@Simulate(with=MONITORS) final class ActiveSegment<K, V> extends Segment<K, V> { final FileChannel channel; @@ -247,6 +250,12 @@ final class ActiveSegment<K, V> extends Segment<K, V> * Flush logic; closing and component flushing */ + boolean shouldFlush() + { + int allocatePosition = this.allocatePosition.get(); + return lastFlushedOffset < allocatePosition; + } + /** * Possibly force a disk flush for this segment file. * TODO FIXME: calls from outside Flusher + callbacks diff --git a/src/java/org/apache/cassandra/journal/Flusher.java b/src/java/org/apache/cassandra/journal/Flusher.java index a0ae320a9d..c4c6d75348 100644 --- a/src/java/org/apache/cassandra/journal/Flusher.java +++ b/src/java/org/apache/cassandra/journal/Flusher.java @@ -28,6 +28,7 @@ import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.concurrent.Interruptible.TerminateException; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.Semaphore; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -44,6 +45,9 @@ import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; +import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK; +import static org.apache.cassandra.utils.Simulate.With.LOCK_SUPPORT; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore; import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; @@ -95,6 +99,7 @@ final class Flusher<K, V> flushExecutor.shutdown(); } + @Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT}) private class FlushRunnable implements Interruptible.Task { private final MonotonicClock clock; @@ -150,9 +155,17 @@ final class Flusher<K, V> if (state == SHUTTING_DOWN) return; - long wakeUpAt = startedRunAt + flushPeriodNanos(); - if (wakeUpAt > now) - haveWork.tryAcquireUntil(1, wakeUpAt); + long flushPeriodNanos = flushPeriodNanos(); + if (flushPeriodNanos <= 0) + { + haveWork.acquire(1); + } + else + { + long wakeUpAt = startedRunAt + flushPeriodNanos; + if (wakeUpAt > now) + haveWork.tryAcquireUntil(1, wakeUpAt); + } } private void doFlush() @@ -167,6 +180,9 @@ final class Flusher<K, V> for (ActiveSegment<K, V> segment : segmentsToFlush) { + if (!segment.shouldFlush()) + break; + syncedSegment = segment.descriptor.timestamp; syncedOffset = segment.flush(); @@ -201,8 +217,9 @@ final class Flusher<K, V> flushCount++; flushDuration += (finishedFlushAt - startedFlushAt); - long lag = finishedFlushAt - (startedFlushAt + flushPeriodNanos()); - if (lag <= 0) + long flushPeriodNanos = flushPeriodNanos(); + long lag = finishedFlushAt - (startedFlushAt + flushPeriodNanos); + if (flushPeriodNanos <= 0 || lag <= 0) return; lagCount++; @@ -348,7 +365,7 @@ final class Flusher<K, V> private long flushPeriodNanos() { - return 1_000_000L * params.flushPeriod(); + return 1_000_000L * params.flushPeriodMillis(); } private long periodicFlushLagBlockNanos() diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index bb1ada27f7..844f660796 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -50,6 +50,7 @@ import org.apache.cassandra.journal.Segments.ReferencedSegments; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.WaitQueue; import static java.lang.String.format; @@ -61,6 +62,7 @@ import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL; import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; /** @@ -77,6 +79,7 @@ import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue; * @param <K> the type of keys used to address the records; must be fixed-size and byte-order comparable */ +@Simulate(with=MONITORS) public class Journal<K, V> implements Shutdownable { private static final Logger logger = LoggerFactory.getLogger(Journal.class); diff --git a/src/java/org/apache/cassandra/journal/Params.java b/src/java/org/apache/cassandra/journal/Params.java index f462f450ac..46b382ea27 100644 --- a/src/java/org/apache/cassandra/journal/Params.java +++ b/src/java/org/apache/cassandra/journal/Params.java @@ -41,7 +41,7 @@ public interface Params /** * @return milliseconds between journal flushes */ - int flushPeriod(); + int flushPeriodMillis(); /** * @return milliseconds to block writes for while waiting for a slow disk flush to complete diff --git a/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java b/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java index 26b220123e..a090a05a34 100644 --- a/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java @@ -32,7 +32,7 @@ public class AccordStateCacheMetrics extends CacheAccessMetrics public final Histogram objectSize; - private final Map<Class<?>, CacheAccessMetrics> instanceMetrics = new ConcurrentHashMap<>(2); + private final Map<String, CacheAccessMetrics> instanceMetrics = new ConcurrentHashMap<>(2); private final String type; @@ -45,6 +45,8 @@ public class AccordStateCacheMetrics extends CacheAccessMetrics public CacheAccessMetrics forInstance(Class<?> klass) { - return instanceMetrics.computeIfAbsent(klass, k -> new CacheAccessMetrics(new DefaultNameFactory(CACHE, String.format("%s-%s", type, k.getSimpleName())))); + // cannot make Class<?> hashCode deterministic, as cannot rewrite - so cannot safely use as Map key if want deterministic simulation + // (or we need to create extra hoops to catch this specific case in method rewriting) + return instanceMetrics.computeIfAbsent(klass.getSimpleName(), k -> new CacheAccessMetrics(new DefaultNameFactory(CACHE, String.format("%s-%s", type, k)))); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index ad20fea043..31565f8423 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -20,13 +20,12 @@ package org.apache.cassandra.service.accord; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import accord.impl.AbstractConfigurationService; import accord.local.Node; @@ -36,6 +35,7 @@ import accord.utils.Invariants; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.IFailureDetector; @@ -44,19 +44,23 @@ import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.accord.AccordKeyspace.EpochDiskState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; +import static org.apache.cassandra.utils.Simulate.With.MONITORS; + // TODO: listen to FailureDetector and rearrange fast path accordingly -public class AccordConfigurationService extends AbstractConfigurationService<AccordConfigurationService.EpochState, AccordConfigurationService.EpochHistory> implements ChangeListener, AccordEndpointMapper, AccordSyncPropagator.Listener +@Simulate(with=MONITORS) +public class AccordConfigurationService extends AbstractConfigurationService<AccordConfigurationService.EpochState, AccordConfigurationService.EpochHistory> implements ChangeListener, AccordEndpointMapper, AccordSyncPropagator.Listener, Shutdownable { - private static final Logger logger = LoggerFactory.getLogger(AccordConfigurationService.class); private final AccordSyncPropagator syncPropagator; private EpochDiskState diskState = EpochDiskState.EMPTY; - private enum State { INITIALIZED, LOADING, STARTED } + private enum State { INITIALIZED, LOADING, STARTED, SHUTDOWN } private State state = State.INITIALIZED; private volatile EndpointMapping mapping = EndpointMapping.EMPTY; @@ -150,6 +154,35 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc receiveRedundant(redundant, epoch); })); state = State.STARTED; + ClusterMetadataService.instance().log().addListener(this); + } + + @Override + public synchronized boolean isTerminated() + { + return state == State.SHUTDOWN; + } + + @Override + public synchronized void shutdown() + { + if (isTerminated()) + return; + ClusterMetadataService.instance().log().removeListener(this); + state = State.SHUTDOWN; + } + + @Override + public Object shutdownNow() + { + shutdown(); + return null; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException + { + return isTerminated(); } @Override @@ -262,7 +295,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc } @Override - protected void receiveRemoteSyncCompletePreListenerNotify(Node.Id node, long epoch) + protected synchronized void receiveRemoteSyncCompletePreListenerNotify(Node.Id node, long epoch) { if (state == State.STARTED) diskState = AccordKeyspace.markRemoteTopologySync(node, epoch, diskState); @@ -271,7 +304,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc @Override public synchronized void reportEpochClosed(Ranges ranges, long epoch) { - Invariants.checkState(state == State.STARTED); + checkStarted(); Topology topology = getTopologyForEpoch(epoch); syncPropagator.reportClosed(epoch, topology.nodes(), ranges); } @@ -279,7 +312,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc @Override public synchronized void reportEpochRedundant(Ranges ranges, long epoch) { - Invariants.checkState(state == State.STARTED); + checkStarted(); // TODO (expected): ensure we aren't fetching a truncated epoch; otherwise this should be non-null Topology topology = getTopologyForEpoch(epoch); syncPropagator.reportRedundant(epoch, topology.nodes(), ranges); @@ -300,18 +333,24 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc } @Override - protected void truncateTopologiesPreListenerNotify(long epoch) + protected synchronized void truncateTopologiesPreListenerNotify(long epoch) { - Invariants.checkState(state == State.STARTED); + checkStarted(); } @Override - protected void truncateTopologiesPostListenerNotify(long epoch) + protected synchronized void truncateTopologiesPostListenerNotify(long epoch) { if (state == State.STARTED) diskState = AccordKeyspace.truncateTopologyUntil(epoch, diskState); } + private void checkStarted() + { + State state = this.state; + Invariants.checkState(state == State.STARTED, "Expected state to be STARTED but was %s", state); + } + @VisibleForTesting public static class EpochSnapshot { diff --git a/src/java/org/apache/cassandra/service/accord/AccordFastPathCoordinator.java b/src/java/org/apache/cassandra/service/accord/AccordFastPathCoordinator.java index c1fc73d80f..74a9603a39 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordFastPathCoordinator.java +++ b/src/java/org/apache/cassandra/service/accord/AccordFastPathCoordinator.java @@ -253,7 +253,7 @@ public abstract class AccordFastPathCoordinator implements ChangeListener, Confi private void scheduleMaintenanceTask(long delayMillis) { - ScheduledExecutors.scheduledTasks.schedule(this::maintenance, delayMillis, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleSelfRecurring(this::maintenance, delayMillis, TimeUnit.MILLISECONDS); } synchronized void maintenance() diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 28bf2c2cc5..617dadf9dd 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -172,9 +172,9 @@ public class AccordJournal implements Shutdownable } @Override - public int flushPeriod() + public int flushPeriodMillis() { - return 1000; + return DatabaseDescriptor.getCommitLogSyncPeriod(); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index fdebd70d46..358d007e25 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -26,11 +26,14 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import accord.coordinate.TopologyMismatch; import org.apache.cassandra.cql3.statements.RequestValidations; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.transformations.AddAccordTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +94,6 @@ import org.apache.cassandra.service.accord.interop.AccordInteropExecution; import org.apache.cassandra.service.accord.interop.AccordInteropPersist; import org.apache.cassandra.service.accord.txn.TxnResult; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.utils.Clock; @@ -116,6 +118,10 @@ public class AccordService implements IAccordService, Shutdownable { private static final Logger logger = LoggerFactory.getLogger(AccordService.class); + private enum State { INIT, STARTED, SHUTDOWN} + + public static final AccordClientRequestMetrics readMetrics = new AccordClientRequestMetrics("AccordRead"); + public static final AccordClientRequestMetrics writeMetrics = new AccordClientRequestMetrics("AccordWrite"); private static final Future<Void> BOOTSTRAP_SUCCESS = ImmediateFuture.success(null); private final Node node; @@ -128,6 +134,8 @@ public class AccordService implements IAccordService, Shutdownable private final AccordJournal journal; private final AccordVerbHandler<? extends Request> verbHandler; private final LocalConfig configuration; + @GuardedBy("this") + private State state = State.INIT; private static final IAccordService NOOP_SERVICE = new IAccordService() { @@ -307,13 +315,16 @@ public class AccordService implements IAccordService, Shutdownable } @Override - public void startup() + public synchronized void startup() { + if (state != State.INIT) + return; journal.start(node); configService.start(); ClusterMetadataService.instance().log().addListener(configService); fastPathCoordinator.start(); ClusterMetadataService.instance().log().addListener(fastPathCoordinator); + state = State.STARTED; } @Override @@ -525,15 +536,18 @@ public class AccordService implements IAccordService, Shutdownable } @Override - public void shutdown() + public synchronized void shutdown() { - ExecutorUtils.shutdown(Arrays.asList(scheduler, nodeShutdown, journal)); + if (state != State.STARTED) + return; + ExecutorUtils.shutdown(shutdownableSubsystems()); + state = State.SHUTDOWN; } @Override public Object shutdownNow() { - ExecutorUtils.shutdownNow(Arrays.asList(scheduler, nodeShutdown, journal)); + shutdown(); return null; } @@ -542,7 +556,7 @@ public class AccordService implements IAccordService, Shutdownable { try { - ExecutorUtils.awaitTermination(timeout, units, Arrays.asList(scheduler, nodeShutdown, journal)); + ExecutorUtils.awaitTermination(timeout, units, shutdownableSubsystems()); return true; } catch (TimeoutException e) @@ -551,11 +565,16 @@ public class AccordService implements IAccordService, Shutdownable } } + private List<Shutdownable> shutdownableSubsystems() + { + return Arrays.asList(scheduler, nodeShutdown, journal, configService); + } + @VisibleForTesting @Override public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - scheduler.shutdownNow(); + shutdown(); ExecutorUtils.shutdownAndWait(timeout, unit, this); } diff --git a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java index c9c253f1d5..5263518c29 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java @@ -20,9 +20,11 @@ package org.apache.cassandra.utils.concurrent; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.Intercept; import org.apache.cassandra.utils.Shared; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; @Shared(scope = SIMULATION) @@ -139,7 +141,7 @@ public interface Semaphore */ public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) throws InterruptedException { - long wait = nanoTimeDeadline - System.nanoTime(); + long wait = nanoTimeDeadline - nanoTime(); return tryAcquire(acquire, Math.max(0, wait), TimeUnit.NANOSECONDS); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index a7152665ae..df5a31199d 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -611,6 +611,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance { assert config.networkTopology().contains(config.broadcastAddress()) : String.format("Network topology %s doesn't contain the address %s", config.networkTopology(), config.broadcastAddress()); + // org.apache.cassandra.distributed.impl.AbstractCluster.startup sets the exception handler for the thread + // so extract it to populate ExecutorFactory.Global + ExecutorFactory.Global.tryUnsafeSet(new ExecutorFactory.Default(Thread.currentThread().getContextClassLoader(), null, Thread.getDefaultUncaughtExceptionHandler())); DistributedTestSnitch.assign(config.networkTopology()); CassandraDaemon.getInstanceForTesting().activate(false); // TODO: filters won't work for the messages dispatched during startup diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java index 68ff1e71c6..9e84d32df7 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java @@ -126,7 +126,7 @@ public class IsolatedExecutor implements IIsolatedExecutor public Future<Void> shutdown() { - isolatedExecutor.shutdownNow(); + isolatedExecutor.shutdown(); return shutdownExecutor.shutdown(name, classLoader, isolatedExecutor, () -> { // Shutdown logging last - this is not ideal as the logging subsystem is initialized diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/ClassTransformer.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/ClassTransformer.java index f9bab8eaed..70fa3a6f04 100644 --- a/test/simulator/asm/org/apache/cassandra/simulator/asm/ClassTransformer.java +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/ClassTransformer.java @@ -189,6 +189,10 @@ class ClassTransformer extends ClassVisitor implements MethodWriterSink { if (dependentTypes != null) Utils.visitIfRefType(descriptor, dependentTypes); + // org.apache.cassandra.simulator.systems.SimulatedTime.InstanceTime.nanoTime does not change between invokes which causes AbstractQueuedSynchronizer to loop forever, + // so need to make the threshold negative to avoid the spin loop. + if (className.equals("java/util/concurrent/locks/AbstractQueuedSynchronizer") && name.equals("SPIN_FOR_TIMEOUT_THRESHOLD")) + return super.visitField(makePublic(access), name, descriptor, signature, Long.MIN_VALUE); return super.visitField(makePublic(access), name, descriptor, signature, value); } diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/GlobalMethodTransformer.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/GlobalMethodTransformer.java index 9fe551fcba..064d205646 100644 --- a/test/simulator/asm/org/apache/cassandra/simulator/asm/GlobalMethodTransformer.java +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/GlobalMethodTransformer.java @@ -97,7 +97,8 @@ class GlobalMethodTransformer extends MethodVisitor super.visitMethodInsn(Opcodes.INVOKESTATIC, "org/apache/cassandra/simulator/systems/InterceptorOfSystemMethods$Global", "sleep", "(Ljava/util/concurrent/TimeUnit;J)V", false); } else if ((globalMethods || deterministic) && opcode == Opcodes.INVOKESTATIC && - owner.equals("java/util/concurrent/ThreadLocalRandom") && (name.equals("getProbe") || name.equals("advanceProbe") || name.equals("localInit")) + ((owner.equals("java/util/concurrent/ThreadLocalRandom") && (name.equals("getProbe") || name.equals("advanceProbe") || name.equals("localInit"))) + || (owner.equals("java/util/concurrent/atomic/Striped64") && (name.equals("getProbe") || name.equals("advanceProbe")))) ) { transformer.witness(GLOBAL_METHOD); diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptAgent.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptAgent.java index 4cf1546ca8..ec63c505cd 100644 --- a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptAgent.java +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptAgent.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Objects; import java.util.function.BiFunction; import java.util.regex.Pattern; @@ -39,12 +40,14 @@ import org.objectweb.asm.ClassWriter; import org.objectweb.asm.FieldVisitor; import org.objectweb.asm.Label; import org.objectweb.asm.MethodVisitor; +import org.objectweb.asm.Opcodes; import static org.apache.cassandra.simulator.asm.Flag.DETERMINISTIC; import static org.apache.cassandra.simulator.asm.Flag.LOCK_SUPPORT; import static org.apache.cassandra.simulator.asm.Flag.NO_PROXY_METHODS; import static org.apache.cassandra.simulator.asm.Flag.SYSTEM_CLOCK; import static org.apache.cassandra.simulator.asm.InterceptClasses.BYTECODE_VERSION; +import static org.apache.cassandra.simulator.asm.TransformationKind.HASHCODE; import static org.objectweb.asm.Opcodes.ALOAD; import static org.objectweb.asm.Opcodes.GETFIELD; import static org.objectweb.asm.Opcodes.GETSTATIC; @@ -93,6 +96,9 @@ public class InterceptAgent if (className.equals("java/lang/Object")) return transformObject(bytecode); + if (className.equals("java/lang/Class")) + return transformClass(bytecode); + if (className.equals("java/lang/Enum")) return transformEnum(bytecode); @@ -103,10 +109,14 @@ public class InterceptAgent return transformThreadLocalRandom(bytecode); if (className.startsWith("java/util/concurrent/ConcurrentHashMap")) - return transformConcurrent(className, bytecode, DETERMINISTIC, NO_PROXY_METHODS); + return InterceptAgent.transform(className, bytecode, DETERMINISTIC, NO_PROXY_METHODS); if (className.startsWith("java/util/concurrent/locks")) - return transformConcurrent(className, bytecode, SYSTEM_CLOCK, LOCK_SUPPORT, NO_PROXY_METHODS); + { + if (className.equals("java/util/concurrent/locks/AbstractQueuedSynchronizer")) + return InterceptAgent.transformAbstractQueuedSynchronizer(className, bytecode, SYSTEM_CLOCK, LOCK_SUPPORT, NO_PROXY_METHODS); + return InterceptAgent.transform(className, bytecode, SYSTEM_CLOCK, LOCK_SUPPORT, NO_PROXY_METHODS); + } return null; } @@ -172,6 +182,29 @@ public class InterceptAgent return transform(bytes, ObjectVisitor::new); } + /** + * We don't want Object.toString() to invoke our overridden identityHashCode by virtue of invoking some overridden hashCode() + * So we overwrite Object.toString() to replace calls to Object.hashCode() with direct calls to System.identityHashCode() + */ + private static byte[] transformClass(byte[] bytes) + { + class ClazzVisitor extends ClassVisitor + { + public ClazzVisitor(int api, ClassVisitor classVisitor) + { + super(api, classVisitor); + } + + @Override + public void visitEnd() + { + new StringHashcode(api).accept(this); + super.visitEnd(); + } + } + return transform(bytes, ClazzVisitor::new); + } + /** * We want Enum to have a deterministic hashCode() so we simply forward calls to ordinal() */ @@ -314,7 +347,7 @@ public class InterceptAgent else { MethodVisitor mv = super.visitMethod(access, name, descriptor, signature, exceptions); - if (determinismCheck && (name.equals("nextSeed") || name.equals("nextSecondarySeed"))) + if (determinismCheck && (name.equals("nextSeed") || name.equals("nextSecondarySeed") || name.equals("advanceProbe"))) mv = new ThreadLocalRandomCheckTransformer(api, mv); return mv; } @@ -323,7 +356,61 @@ public class InterceptAgent return transform(bytes, ThreadLocalRandomVisitor::new); } - private static byte[] transform(byte[] bytes, BiFunction<Integer, ClassWriter, ClassVisitor> constructor) + /** + * We require ThreadLocalRandom to be deterministic, so we modify its initialisation method to invoke a + * global deterministic random value generator + */ + private static byte[] transformAbstractQueuedSynchronizer(String className, byte[] bytes, Flag flag, Flag ... flags) + { + class AbstractQueuedSynchronizerVisitor extends ClassVisitor + { + private long defaultSpinForTimeoutThreshold = 1000L; + + public AbstractQueuedSynchronizerVisitor(int api, ClassVisitor classVisitor) + { + super(api, classVisitor); + } + + @Override + public FieldVisitor visitField(int access, String name, String descriptor, String signature, Object value) + { + if (name.equals("SPIN_FOR_TIMEOUT_THRESHOLD")) + { + defaultSpinForTimeoutThreshold = (Long)value; + return super.visitField(access, name, descriptor, signature, 0L); + } + + return super.visitField(access, name, descriptor, signature, value); + } + + @Override + public MethodVisitor visitMethod(int access, String name, String descriptor, String signature, String[] exceptions) + { + /// !!!!! WARNING !!!!! + /// THIS IS SUPER BRITTLE BECAUSE rt.jar INLINES GETSTATIC AS LDC + // TODO (desired): visit constructor to fetch actual value of constant in case changes in future release - + // but this is brittle enough changes upstream will likely need revisiting anyway + MethodVisitor mv = super.visitMethod(access, name, descriptor, signature, exceptions); + if (!name.equals("doAcquireNanos") && !name.equals("doAcquireSharedNanos")) + return mv; + + return new MethodVisitor(api, mv) + { + @Override + public void visitLdcInsn(Object value) + { + if (Objects.equals(defaultSpinForTimeoutThreshold, value)) + super.visitLdcInsn(0L); + else + super.visitLdcInsn(value); + } + }; + } + } + return transform(className, bytes, AbstractQueuedSynchronizerVisitor::new, flag, flags); + } + + private static byte[] transform(byte[] bytes, BiFunction<Integer, ClassVisitor, ClassVisitor> constructor) { ClassWriter out = new ClassWriter(0); ClassReader in = new ClassReader(bytes); @@ -332,7 +419,7 @@ public class InterceptAgent return out.toByteArray(); } - private static byte[] transformConcurrent(String className, byte[] bytes, Flag flag, Flag ... flags) + private static byte[] transform(String className, byte[] bytes, Flag flag, Flag ... flags) { ClassTransformer transformer = new ClassTransformer(BYTECODE_VERSION, className, EnumSet.of(flag, flags), null); transformer.readAndTransform(bytes); @@ -340,4 +427,13 @@ public class InterceptAgent return null; return transformer.toBytes(); } + + private static byte[] transform(String className, byte[] bytes, BiFunction<Integer, ClassVisitor, ClassVisitor> constructor, Flag flag, Flag ... flags) + { + ClassReader in = new ClassReader(bytes); + ClassTransformer transformer = new ClassTransformer(BYTECODE_VERSION, className, EnumSet.of(flag, flags), null); + ClassVisitor extraTransformer = constructor.apply(BYTECODE_VERSION, transformer); + in.accept(extraTransformer, 0); + return transformer.toBytes(); + } } diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java index dd53ce067f..5043012472 100644 --- a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java @@ -62,6 +62,8 @@ public class InterceptClasses implements BiFunction<String, byte[], byte[]> "|org[/.]apache[/.]cassandra[/.]distributed[/.]impl[/.]DirectStreamingConnectionFactory.*" + "|org[/.]apache[/.]cassandra[/.]db[/.]commitlog[/.].*" + "|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*" + + "|org[/.]apache[/.]cassandra[/.]service[/.]accord[/.].*" + + "|org[/.]apache[/.]cassandra[/.]journal[/.].*" + "|accord[/.].*" ); diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/MonitorMethodTransformer.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/MonitorMethodTransformer.java index d9c9c7ad94..a7c21bbba7 100644 --- a/test/simulator/asm/org/apache/cassandra/simulator/asm/MonitorMethodTransformer.java +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/MonitorMethodTransformer.java @@ -122,8 +122,7 @@ class MonitorMethodTransformer extends MethodNode } int invokeCode; - if (isInstanceMethod && (access & Opcodes.ACC_PRIVATE) != 0) invokeCode = Opcodes.INVOKESPECIAL; - else if (isInstanceMethod) invokeCode = Opcodes.INVOKEVIRTUAL; + if (isInstanceMethod) invokeCode = Opcodes.INVOKESPECIAL; else invokeCode = Opcodes.INVOKESTATIC; return invokeCode; } diff --git a/test/simulator/asm/org/apache/cassandra/simulator/asm/StringHashcode.java b/test/simulator/asm/org/apache/cassandra/simulator/asm/StringHashcode.java new file mode 100644 index 0000000000..70102504e8 --- /dev/null +++ b/test/simulator/asm/org/apache/cassandra/simulator/asm/StringHashcode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.simulator.asm; + +import org.objectweb.asm.Opcodes; +import org.objectweb.asm.tree.InsnNode; +import org.objectweb.asm.tree.IntInsnNode; +import org.objectweb.asm.tree.LabelNode; +import org.objectweb.asm.tree.MethodInsnNode; +import org.objectweb.asm.tree.MethodNode; + +/** + * Generate a new hashCode method in the class that invokes a deterministic hashCode generator + */ +class StringHashcode extends MethodNode +{ + StringHashcode(int api) + { + super(api, Opcodes.ACC_PUBLIC, "hashCode", "()I", null, null); + maxLocals = 1; + maxStack = 1; + instructions.add(new LabelNode()); + instructions.add(new MethodInsnNode(Opcodes.INVOKEVIRTUAL, "java/lang/Object", "toString", "()Ljava/lang/String;", false)); + instructions.add(new LabelNode()); + instructions.add(new MethodInsnNode(Opcodes.INVOKEVIRTUAL, "java/lang/Object", "hashCode", "(Ljava/lang/Object;)I", false)); + instructions.add(new InsnNode(Opcodes.IRETURN)); + } +} diff --git a/test/simulator/main/org/apache/cassandra/simulator/ActionSchedule.java b/test/simulator/main/org/apache/cassandra/simulator/ActionSchedule.java index 427a777abe..39666077ea 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/ActionSchedule.java +++ b/test/simulator/main/org/apache/cassandra/simulator/ActionSchedule.java @@ -281,6 +281,12 @@ public class ActionSchedule implements CloseableIterator<Object>, LongConsumer if (!runnable.isEmpty() || !scheduled.isEmpty()) return true; + while (moreWork()) + { + if (!runnable.isEmpty() || !scheduled.isEmpty()) + return true; + } + if (!sequences.isEmpty()) { // TODO (feature): detection of which action is blocking progress, and logging of its stack trace only @@ -313,15 +319,12 @@ public class ActionSchedule implements CloseableIterator<Object>, LongConsumer throw failWithOOM(); } - while (moreWork()) - { - if (!runnable.isEmpty() || !scheduled.isEmpty()) - return true; - } - return false; } + // NOTE: this is only here for debugging, its a quick way to see if pre (0), interleave (1), or post (2) is active + private int step = -1; + private boolean moreWork() { if (!moreWork.hasNext()) @@ -347,6 +350,8 @@ public class ActionSchedule implements CloseableIterator<Object>, LongConsumer work.actors.forEach(runnableScheduler::attachTo); work.actors.forEach(a -> a.forEach(Action::setConsequence)); work.actors.forEach(this::add); + + step++; return true; } diff --git a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java index c50c94374c..af551eb375 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java @@ -54,6 +54,7 @@ import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableBiCons import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableConsumer; import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable; import org.apache.cassandra.distributed.impl.DirectStreamingConnectionFactory; +import org.apache.cassandra.distributed.impl.InstanceIDDefiner; import org.apache.cassandra.distributed.impl.IsolatedExecutor; import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.filesystem.ListenableFileSystem; @@ -93,6 +94,7 @@ import org.apache.cassandra.simulator.utils.KindOfSequence; import org.apache.cassandra.simulator.utils.LongRange; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.Closeable; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.memory.BufferPool; @@ -781,6 +783,9 @@ public class ClusterSimulation<S extends Simulation> implements AutoCloseable @Override public void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation) { + IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableConsumer<Integer>) InstanceIDDefiner::setInstanceId, classLoader) + .accept(num); + List<Closeable> onShutdown = new ArrayList<>(); InterceptorOfGlobalMethods interceptorOfGlobalMethods = IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableQuadFunction<Capture, LongConsumer, Consumer<Throwable>, RandomSource, InterceptorOfGlobalMethods>) InterceptingGlobalMethods::new, classLoader) .apply(builder.capture, builder.onThreadLocalRandomCheck, failures, random); diff --git a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java index e326abf2cd..c22c64e341 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java +++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java @@ -51,6 +51,7 @@ import org.apache.cassandra.simulator.systems.InterceptibleThread; import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods; import org.apache.cassandra.simulator.utils.ChanceRange; import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; @@ -381,6 +382,7 @@ public class SimulationRunner protected void run(long seed, B builder) throws IOException { logger().error("Seed 0x{}", Long.toHexString(seed)); + logger().info("Cassandra {} / {}", FBUtilities.getReleaseVersionString(), FBUtilities.getGitSHA()); try (ClusterSimulation<?> cluster = builder.create(seed)) { @@ -459,6 +461,16 @@ public class SimulationRunner } } + @Command(name = "version", description = "Display version information") + protected static class VersionCommand<B extends ClusterSimulation.Builder<?>> implements ICommand<B> + { + @Override + public void run(B builder) throws IOException + { + System.out.println(FBUtilities.getReleaseVersionString()); + System.out.println(FBUtilities.getGitSHA()); + } + } private static Optional<Long> parseHex(Optional<String> value) { diff --git a/test/simulator/main/org/apache/cassandra/simulator/cluster/KeyspaceActions.java b/test/simulator/main/org/apache/cassandra/simulator/cluster/KeyspaceActions.java index 0b654bf2da..35fa091f09 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/cluster/KeyspaceActions.java +++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/KeyspaceActions.java @@ -207,6 +207,7 @@ public class KeyspaceActions extends ClusterActions for (int i = 0; i < rfs.length; i++) rf.put(factory.lookup().dc(i + 1), rfs[i]); + nodes.sort(PlacementSimulator.Node::compareTo); return new PlacementSimulator.NtsReplicationFactor(rfs).replicate(nodes); } diff --git a/test/simulator/main/org/apache/cassandra/simulator/debug/Record.java b/test/simulator/main/org/apache/cassandra/simulator/debug/Record.java index 42c7b082fe..17e3eee05e 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/debug/Record.java +++ b/test/simulator/main/org/apache/cassandra/simulator/debug/Record.java @@ -27,6 +27,7 @@ import org.apache.cassandra.simulator.SimulationRunner.RecordOption; import org.apache.cassandra.simulator.systems.SimulatedTime; import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Threads; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class Record { private static final Logger logger = LoggerFactory.getLogger(Record.class); private static final Pattern NORMALISE_THREAD_RECORDING_OUT = Pattern.compile("(Thread\\[[^]]+:[0-9]+),[0-9](,node[0-9]+)_[0-9]+]"); - private static final Pattern NORMALISE_LAMBDA = Pattern.compile("((\\$\\$Lambda\\$[0-9]+/[0-9]+)?(@[0-9a-f]+)?)"); + private static final Pattern NORMALISE_LAMBDA = Pattern.compile("((\\$\\$Lambda\\$[0-9]+/(0x)?[a-f0-9]+)?(@[0-9a-f]+)?)"); public static void record(String saveToDir, long seed, RecordOption withRng, RecordOption withTime, ClusterSimulation.Builder<?> builder) { @@ -73,6 +74,7 @@ public class Record if (builder.capture().wakeSites) modifiers.add("WakeSites"); logger.error("Seed 0x{} ({}) (With: {})", Long.toHexString(seed), eventFile, modifiers); + logger.info("Cassandra {} / {}", FBUtilities.getReleaseVersionString(), FBUtilities.getGitSHA()); } try (PrintWriter eventOut = new PrintWriter(new GZIPOutputStream(eventFile.newOutputStream(OVERWRITE), 1 << 16)); diff --git a/test/simulator/main/org/apache/cassandra/simulator/debug/SelfReconcile.java b/test/simulator/main/org/apache/cassandra/simulator/debug/SelfReconcile.java index d57fe53bb3..92db7c3a6e 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/debug/SelfReconcile.java +++ b/test/simulator/main/org/apache/cassandra/simulator/debug/SelfReconcile.java @@ -42,6 +42,7 @@ import org.apache.cassandra.simulator.systems.InterceptibleThread; import org.apache.cassandra.simulator.systems.InterceptorOfConsequences; import org.apache.cassandra.simulator.systems.SimulatedTime; import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import org.apache.cassandra.utils.memory.HeapPool; @@ -247,6 +248,7 @@ public class SelfReconcile public static void reconcileWithSelf(long seed, RecordOption withRng, RecordOption withTime, boolean withAllocations, ClusterSimulation.Builder<?> builder) { logger.error("Seed 0x{}", Long.toHexString(seed)); + logger.info("Cassandra {} / {}", FBUtilities.getReleaseVersionString(), FBUtilities.getGitSHA()); InterceptReconciler reconciler = new InterceptReconciler(withRng == WITH_CALLSITES); if (withRng != NONE) builder.random(reconciler); diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java index 5bfb218c7e..ca6988ed59 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java @@ -39,11 +39,10 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.api.LogAction; import org.apache.cassandra.distributed.api.LogResult; -import org.apache.cassandra.distributed.impl.FileLogAction; import org.apache.cassandra.distributed.impl.Instance; import org.apache.cassandra.distributed.shared.Metrics; -import org.apache.cassandra.io.util.File; import org.apache.cassandra.simulator.Action; import org.apache.cassandra.simulator.ActionList; import org.apache.cassandra.simulator.ActionPlan; @@ -53,8 +52,6 @@ import org.apache.cassandra.simulator.RandomSource; import org.apache.cassandra.simulator.RunnableActionScheduler; import org.apache.cassandra.simulator.cluster.ClusterActions; import org.apache.cassandra.simulator.cluster.KeyspaceActions; -import org.apache.cassandra.simulator.logging.RunStartDefiner; -import org.apache.cassandra.simulator.logging.SeedDefiner; import org.apache.cassandra.simulator.systems.SimulatedActionTask; import org.apache.cassandra.simulator.systems.SimulatedSystems; import org.apache.cassandra.simulator.utils.IntRange; @@ -131,11 +128,7 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation @Override protected ActionList performSimple() { - // can't use inst.logs as that runs in the class loader, which uses in-memory file system - String suite = new RunStartDefiner().getPropertyValue() + "-" + new SeedDefiner().getPropertyValue(); - String instanceId = "node" + inst.config().num(); - File logFile = new File(String.format("build/test/logs/simulator/%s/%s/system.log", suite, instanceId)); - FileLogAction logs = new FileLogAction(logFile); + LogAction logs = inst.logs(); LogResult<List<String>> errors = logs.grepForErrors(); if (!errors.getResult().isEmpty()) diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java index 78e04454fa..a75a1ef461 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java @@ -43,7 +43,10 @@ class AccordClusterSimulation extends ClusterSimulation<PaxosSimulation> impleme public void applyHandicaps() { /** - * TODO: remove after partial replication patch + * TODO (required): remove + * We currently require coordinators to have a CommandStore to coordinate a query, but not every node + * is a replica under standard simulation + * * The current homekey implementation isn't compatible with the C* commands per key implementation when * a non-replica coordinates a query. * diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordSimulationRunner.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordSimulationRunner.java index f4bd21aaf9..f14ae9daa1 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordSimulationRunner.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordSimulationRunner.java @@ -21,12 +21,22 @@ package org.apache.cassandra.simulator.paxos; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.BeforeClass; + import io.airlift.airline.Cli; import io.airlift.airline.Command; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.simulator.SimulationRunner; +import org.apache.cassandra.utils.StorageCompatibilityMode; public class AccordSimulationRunner extends SimulationRunner { + @BeforeClass + public static void beforeAll() + { + CassandraRelevantProperties.JUNIT_STORAGE_COMPATIBILITY_MODE.setString(StorageCompatibilityMode.NONE.toString()); + } + @Command(name = "run") public static class Run extends SimulationRunner.Run<AccordClusterSimulation.Builder> { @@ -35,6 +45,7 @@ public class AccordSimulationRunner extends SimulationRunner @Override protected void run(long seed, AccordClusterSimulation.Builder builder) throws IOException { + beforeAll(); builder.applyHandicaps(); super.run(seed, builder); } @@ -44,12 +55,28 @@ public class AccordSimulationRunner extends SimulationRunner public static class Record extends SimulationRunner.Record<AccordClusterSimulation.Builder> { public Record() {} + + @Override + protected void run(long seed, AccordClusterSimulation.Builder builder) throws IOException + { + beforeAll(); + builder.applyHandicaps(); + super.run(seed, builder); + } } @Command(name = "reconcile") public static class Reconcile extends SimulationRunner.Reconcile<AccordClusterSimulation.Builder> { public Reconcile() {} + + @Override + protected void run(long seed, AccordClusterSimulation.Builder builder) throws IOException + { + beforeAll(); + builder.applyHandicaps(); + super.run(seed, builder); + } } public static class Help extends HelpCommand<AccordClusterSimulation.Builder> {} diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java index fc929a9460..8d6c8a0dcc 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import com.carrotsearch.hppc.IntArrayList; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.cursors.IntCursor; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.rows.Cell; @@ -139,7 +140,10 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo seed, primaryKeys, runForNanos, jitter); this.writeRatio = 1F - readRatio; - validator = new LoggingHistoryValidator(new StrictSerializabilityValidator(primaryKeys)); + HistoryValidator validator = new StrictSerializabilityValidator(primaryKeys); + if (CassandraRelevantProperties.TEST_HISTORY_VALIDATOR_LOGGING_ENABLED.getBoolean()) + validator = new LoggingHistoryValidator(validator); + this.validator = validator; } @Override diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java index 6c9f683c61..71734c6e68 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java @@ -137,6 +137,7 @@ public class PaxosSimulationRunner extends SimulationRunner } public static class Help extends HelpCommand<PaxosClusterSimulation.Builder> {} + public static class Version extends VersionCommand<PaxosClusterSimulation.Builder> {} static void propagateTo(String consistency, boolean withStateCache, boolean withoutStateCache, String variant, String toVariant, PaxosClusterSimulation.Builder builder) { @@ -163,6 +164,7 @@ public class PaxosSimulationRunner extends SimulationRunner .withCommand(Run.class) .withCommand(Reconcile.class) .withCommand(Record.class) + .withCommand(Version.class) .withCommand(Help.class) .withDefaultCommand(Help.class) .build() diff --git a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingMonitors.java b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingMonitors.java index 68fd74a490..cfcff4b790 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingMonitors.java +++ b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingMonitors.java @@ -698,10 +698,7 @@ public abstract class InterceptingMonitors implements InterceptorOfGlobalMethods { if (!thread.isIntercepting() && disabled) return; else if (!thread.isIntercepting()) - { throw new AssertionError("Thread " + thread + " is running but is not simulated"); - } - checkForDeadlock(thread, state.heldBy); InterceptedMonitorWait wait = new InterceptedMonitorWait(UNBOUNDED_WAIT, 0L, state, thread, captureWaitSite(thread)); diff --git a/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java b/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java index dcec241e5b..a7cb27dfee 100644 --- a/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java +++ b/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java @@ -35,6 +35,7 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IIsolatedExecutor; import org.apache.cassandra.distributed.impl.AbstractCluster; +import org.apache.cassandra.distributed.impl.InstanceIDDefiner; import org.apache.cassandra.distributed.impl.IsolatedExecutor; import org.apache.cassandra.distributed.shared.InstanceClassLoader; import org.apache.cassandra.simulator.*; diff --git a/test/unit/org/apache/cassandra/journal/TestParams.java b/test/unit/org/apache/cassandra/journal/TestParams.java index 3beb378536..7c22e896b5 100644 --- a/test/unit/org/apache/cassandra/journal/TestParams.java +++ b/test/unit/org/apache/cassandra/journal/TestParams.java @@ -42,7 +42,7 @@ public class TestParams implements Params } @Override - public int flushPeriod() + public int flushPeriodMillis() { return 1000; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org