This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new ff755a191b Accord: Need to simulate Cassandra Journal in Accord 
BurnTest to detect issues earlier before they are seen in Cassandra
ff755a191b is described below

commit ff755a191b3650c99d98a978de894561177b831f
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri May 3 12:57:46 2024 -0700

    Accord: Need to simulate Cassandra Journal in Accord BurnTest to detect 
issues earlier before they are seen in Cassandra
    
    patch by Benedict Elliott Smith, David Capwell; reviewed by Benedict 
Elliott Smith, David Capwell for CASSANDRA-19618
---
 .gitmodules                                        |   2 +-
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |  60 ++++++
 .../cassandra/config/DatabaseDescriptor.java       |   5 +
 src/java/org/apache/cassandra/net/Verb.java        |   8 +-
 .../cassandra/service/accord/AccordJournal.java    | 124 +++++++-----
 .../cassandra/service/accord/AccordKeyspace.java   |  12 +-
 .../cassandra/service/accord/AccordService.java    |   7 +-
 .../service/accord/async/AsyncOperation.java       |   4 +-
 .../service/accord/async/ExecutionOrder.java       | 127 +++++++------
 .../migration/ConsensusRequestRouter.java          |   2 +-
 .../test/AccordJournalSimulationTest.java          |   3 +-
 .../config/DatabaseDescriptorRefTest.java          |   4 +
 .../compaction/CompactionAccordIteratorsTest.java  |   8 +-
 .../cassandra/service/accord/AccordTestUtils.java  |   3 +-
 .../cassandra/service/accord/MockJournal.java      |  28 +++
 .../accord/SimulatedAccordCommandStore.java        |  13 +-
 .../accord/async/SimulatedAsyncOperationTest.java  | 207 +++++++++++++++++++++
 18 files changed, 497 insertions(+), 122 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 60a9510e7a..616dacf610 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = ../cassandra-accord.git
+       url = https://github.com/apache/cassandra-accord.git
        branch = trunk
diff --git a/modules/accord b/modules/accord
index 202e673583..256b35e27d 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 202e67358396a1e413e29498bea71047bd586d06
+Subproject commit 256b35e27d170db9fcd8024d5678b4f6e9d3a956
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index d6fb1a5011..b035b0b9b5 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.config;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.cassandra.journal.Params;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 
 public class AccordSpec
@@ -71,4 +73,62 @@ public class AccordSpec
     public TransactionalMode default_transactional_mode = 
TransactionalMode.off;
     public boolean ephemeralReadEnabled = false;
     public boolean state_cache_listener_jfr_enabled = true;
+    public final JournalSpec journal = new JournalSpec();
+
+    public static class JournalSpec implements Params
+    {
+        public int segmentSize = 32 << 20;
+        public FailurePolicy failurePolicy = FailurePolicy.STOP;
+        public FlushMode flushMode = FlushMode.BATCH;
+        public DurationSpec.IntMillisecondsBound flushPeriod; // pulls default 
from 'commitlog_sync_period'
+        public DurationSpec.IntMillisecondsBound periodicFlushLagBlock = new 
DurationSpec.IntMillisecondsBound("1500ms");
+
+        @Override
+        public int segmentSize()
+        {
+            return segmentSize;
+        }
+
+        @Override
+        public FailurePolicy failurePolicy()
+        {
+            return failurePolicy;
+        }
+
+        @Override
+        public FlushMode flushMode()
+        {
+            return flushMode;
+        }
+
+        @JsonIgnore
+        @Override
+        public int flushPeriodMillis()
+        {
+            return flushPeriod == null ? 
DatabaseDescriptor.getCommitLogSyncPeriod()
+                                       : flushPeriod.toMilliseconds();
+        }
+
+        @JsonIgnore
+        @Override
+        public int periodicFlushLagBlock()
+        {
+            return periodicFlushLagBlock.toMilliseconds();
+        }
+
+        /**
+         * This is required by the journal, but we don't have multiple 
versions, so block it from showing up, so we don't need to worry about 
maintaining it
+         */
+        @JsonIgnore
+        @Override
+        public int userVersion()
+        {
+            /*
+             * NOTE: when accord journal version gets bumped, expose it via 
yaml.
+             * This way operators can force previous version on upgrade, 
temporarily,
+             * to allow easier downgrades if something goes wrong.
+             */
+            return 1;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 384872684b..285d7308b7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3302,6 +3302,11 @@ public class DatabaseDescriptor
         return conf.paxos_topology_repair_strict_each_quorum;
     }
 
+    public static AccordSpec getAccord()
+    {
+        return conf.accord;
+    }
+
     public static AccordSpec.TransactionalRangeMigration 
getTransactionalRangeMigration()
     {
         return conf.accord.range_migration;
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 500ca80943..eb244f1bc6 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -344,15 +344,15 @@ public enum Verb
 
     ACCORD_SYNC_NOTIFY_REQ          (151, P2, writeTimeout, IMMEDIATE,         
 () -> Notification.listSerializer,          () -> 
AccordSyncPropagator.verbHandler,       ACCORD_SIMPLE_RSP             ),
 
-    ACCORD_APPLY_AND_WAIT_REQ       (152, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.readData,         () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP),
+    ACCORD_APPLY_AND_WAIT_REQ       (152, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.readData,         AccordService::verbHandlerOrNoop, 
ACCORD_READ_RSP),
 
     CONSENSUS_KEY_MIGRATION         (153, P1, writeTimeout,  MUTATION,         
 () -> ConsensusKeyMigrationFinished.serializer,() -> 
ConsensusKeyMigrationState.consensusKeyMigrationFinishedHandler),
 
     ACCORD_INTEROP_READ_RSP         (154, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropRead.replySerializer,        RESPONSE_HANDLER),
-    ACCORD_INTEROP_READ_REQ         (155, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropRead.requestSerializer,      () -> 
AccordService.instance().verbHandler(), ACCORD_INTEROP_READ_RSP),
-    ACCORD_INTEROP_COMMIT_REQ       (156, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropCommit.serializer,           () -> 
AccordService.instance().verbHandler(), ACCORD_INTEROP_READ_RSP),
+    ACCORD_INTEROP_READ_REQ         (155, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropRead.requestSerializer,      
AccordService::verbHandlerOrNoop, ACCORD_INTEROP_READ_RSP),
+    ACCORD_INTEROP_COMMIT_REQ       (156, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropCommit.serializer,           
AccordService::verbHandlerOrNoop, ACCORD_INTEROP_READ_RSP),
     ACCORD_INTEROP_READ_REPAIR_RSP  (157, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropReadRepair.replySerializer,  RESPONSE_HANDLER),
-    ACCORD_INTEROP_READ_REPAIR_REQ  (158, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropReadRepair.requestSerializer, () -> 
AccordService.instance().verbHandler(), ACCORD_INTEROP_READ_REPAIR_RSP),
+    ACCORD_INTEROP_READ_REPAIR_REQ  (158, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropReadRepair.requestSerializer, 
AccordService::verbHandlerOrNoop, ACCORD_INTEROP_READ_REPAIR_RSP),
     ACCORD_INTEROP_APPLY_REQ        (160, P2, writeTimeout, IMMEDIATE,         
 () -> AccordInteropApply.serializer,             
AccordService::verbHandlerOrNoop,             ACCORD_APPLY_RSP),
 
     // generic failure response
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index b659cf4733..ce90b26747 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -42,6 +42,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 
+import accord.messages.ApplyThenWaitUntilApplied;
 import org.agrona.collections.Long2ObjectHashMap;
 import org.agrona.collections.LongArrayList;
 import org.agrona.collections.ObjectHashSet;
@@ -156,50 +157,6 @@ public class AccordJournal implements IJournal, 
Shutdownable
 
     private static final ThreadLocal<byte[]> keyCRCBytes = 
ThreadLocal.withInitial(() -> new byte[21]);
 
-    static final Params PARAMS = new Params()
-    {
-        @Override
-        public int segmentSize()
-        {
-            return 32 << 20;
-        }
-
-        @Override
-        public FailurePolicy failurePolicy()
-        {
-            return FailurePolicy.STOP;
-        }
-
-        @Override
-        public FlushMode flushMode()
-        {
-            return FlushMode.BATCH;
-        }
-
-        @Override
-        public int flushPeriodMillis()
-        {
-            return DatabaseDescriptor.getCommitLogSyncPeriod();
-        }
-
-        @Override
-        public int periodicFlushLagBlock()
-        {
-            return 1500;
-        }
-
-        @Override
-        public int userVersion()
-        {
-            /*
-             * NOTE: when accord journal version gets bumped, expose it via 
yaml.
-             * This way operators can force previous version on upgrade, 
temporarily,
-             * to allow easier downgrades if something goes wrong.
-             */
-            return 1;
-        }
-    };
-
     private final File directory;
     private final Journal<Key, Object> journal;
     private final AccordEndpointMapper endpointMapper;
@@ -219,10 +176,10 @@ public class AccordJournal implements IJournal, 
Shutdownable
     private final FrameApplicator frameApplicator = new FrameApplicator();
 
     @VisibleForTesting
-    public AccordJournal(AccordEndpointMapper endpointMapper)
+    public AccordJournal(AccordEndpointMapper endpointMapper, Params params)
     {
         this.directory = new 
File(DatabaseDescriptor.getAccordJournalDirectory());
-        this.journal = new Journal<>("AccordJournal", directory, PARAMS, new 
JournalCallbacks(), Key.SUPPORT, RECORD_SERIALIZER);
+        this.journal = new Journal<>("AccordJournal", directory, params, new 
JournalCallbacks(), Key.SUPPORT, RECORD_SERIALIZER);
         this.endpointMapper = endpointMapper;
     }
 
@@ -969,6 +926,22 @@ public class AccordJournal implements IJournal, 
Shutdownable
                 }
             }
             msgTypeToSynonymousTypesMap = 
ImmutableListMultimap.copyOf(msgTypeToSynonymousTypes);
+
+            //TODO (now): enable as this shows we are currently missing a 
message
+//            IllegalStateException e = null;
+//            for (MessageType t : MessageType.values)
+//            {
+//                if (!t.hasSideEffects()) continue;
+//                Type matches = msgTypeToTypeMap.get(t);
+//                if (matches == null)
+//                {
+//                    IllegalStateException ise = new 
IllegalStateException("Missing MessageType " + t);
+//                    if (e == null) e = ise;
+//                    else e.addSuppressed(ise);
+//                }
+//            }
+//            if (e != null)
+//                throw e;
         }
 
         static Type fromId(int id)
@@ -1164,7 +1137,7 @@ public class AccordJournal implements IJournal, 
Shutdownable
             while (null != (request = unframedRequests.poll()))
             {
                 long waitForEpoch = request.waitForEpoch;
-                if (!node.topology().hasEpoch(waitForEpoch))
+                if (waitForEpoch != 0 && 
!node.topology().hasEpoch(waitForEpoch))
                 {
                     delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
                     if (!waitForEpochs.containsLong(waitForEpoch))
@@ -1394,6 +1367,12 @@ public class AccordJournal implements IJournal, 
Shutdownable
             this.txnId = txnId;
         }
 
+        @Override
+        public TxnId txnId()
+        {
+            return txnId;
+        }
+
         @Override
         public Set<MessageType> test(Set<MessageType> messages)
         {
@@ -1464,6 +1443,12 @@ public class AccordJournal implements IJournal, 
Shutdownable
             return readMessage(txnId, STABLE_FAST_PATH_REQ, Commit.class);
         }
 
+        @Override
+        public Commit stableSlowPath()
+        {
+            return readMessage(txnId, STABLE_SLOW_PATH_REQ, Commit.class);
+        }
+
         @Override
         public Commit stableMaximal()
         {
@@ -1493,6 +1478,18 @@ public class AccordJournal implements IJournal, 
Shutdownable
         {
             return readMessage(txnId, PROPAGATE_APPLY_MSG, Propagate.class);
         }
+
+        @Override
+        public Propagate propagateOther()
+        {
+            return readMessage(txnId, PROPAGATE_OTHER_MSG, Propagate.class);
+        }
+
+        @Override
+        public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+        {
+            return readMessage(txnId, APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, 
ApplyThenWaitUntilApplied.class);
+        }
     }
 
     private final class LoggingMessageProvider implements 
SerializerSupport.MessageProvider
@@ -1506,6 +1503,12 @@ public class AccordJournal implements IJournal, 
Shutdownable
             this.provider = provider;
         }
 
+        @Override
+        public TxnId txnId()
+        {
+            return txnId;
+        }
+
         @Override
         public Set<MessageType> test(Set<MessageType> messages)
         {
@@ -1587,6 +1590,15 @@ public class AccordJournal implements IJournal, 
Shutdownable
             return commit;
         }
 
+        @Override
+        public Commit stableSlowPath()
+        {
+            logger.debug("Fetching {} message for {}", STABLE_SLOW_PATH_REQ, 
txnId);
+            Commit commit = provider.stableSlowPath();
+            logger.debug("Fetched {} message for {}: {}", 
STABLE_SLOW_PATH_REQ, txnId, commit);
+            return commit;
+        }
+
         @Override
         public Commit stableMaximal()
         {
@@ -1631,5 +1643,23 @@ public class AccordJournal implements IJournal, 
Shutdownable
             logger.debug("Fetched {} message for {}: {}", PROPAGATE_APPLY_MSG, 
txnId, propagate);
             return propagate;
         }
+
+        @Override
+        public Propagate propagateOther()
+        {
+            logger.debug("Fetching {} message for {}", PROPAGATE_OTHER_MSG, 
txnId);
+            Propagate propagate = provider.propagateOther();
+            logger.debug("Fetched {} message for {}: {}", PROPAGATE_OTHER_MSG, 
txnId, propagate);
+            return propagate;
+        }
+
+        @Override
+        public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+        {
+            logger.debug("Fetching {} message for {}", 
APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, txnId);
+            ApplyThenWaitUntilApplied apply = 
provider.applyThenWaitUntilApplied();
+            logger.debug("Fetched {} message for {}: {}", 
APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, txnId, apply);
+            return apply;
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index ee5c0ae619..ab6701d6d6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -249,6 +249,7 @@ public class AccordKeyspace
               + format("execute_at %s,", TIMESTAMP_TUPLE)
               + format("promised_ballot %s,", TIMESTAMP_TUPLE)
               + format("accepted_ballot %s,", TIMESTAMP_TUPLE)
+              + format("execute_atleast %s,", TIMESTAMP_TUPLE)
               + "waiting_on blob,"
               + "listeners set<blob>, "
               + "PRIMARY KEY((store_id, domain, txn_id))"
@@ -297,6 +298,7 @@ public class AccordKeyspace
         public static final ColumnMetadata execute_at = getColumn(Commands, 
"execute_at");
         static final ColumnMetadata promised_ballot = getColumn(Commands, 
"promised_ballot");
         static final ColumnMetadata accepted_ballot = getColumn(Commands, 
"accepted_ballot");
+        static final ColumnMetadata execute_atleast = getColumn(Commands, 
"execute_atleast");
         static final ColumnMetadata waiting_on = getColumn(Commands, 
"waiting_on");
         static final ColumnMetadata listeners = getColumn(Commands, 
"listeners");
 
@@ -856,6 +858,8 @@ public class AccordKeyspace
             addCellIfModified(CommandsColumns.execute_at, Command::executeAt, 
AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, 
original, command);
             addCellIfModified(CommandsColumns.promised_ballot, 
Command::promised, AccordKeyspace::serializeTimestamp, builder, 
timestampMicros, nowInSeconds, original, command);
             addCellIfModified(CommandsColumns.accepted_ballot, 
Command::acceptedOrCommitted, AccordKeyspace::serializeTimestamp, builder, 
timestampMicros, nowInSeconds, original, command);
+            if (command.txnId().kind().awaitsOnlyDeps())
+                addCellIfModified(CommandsColumns.execute_atleast, 
Command::executesAtLeast, AccordKeyspace::serializeTimestamp, builder, 
timestampMicros, nowInSeconds, original, command);
 
             if (command.isStable() && !command.isTruncated())
             {
@@ -1229,11 +1233,12 @@ public class AccordKeyspace
             Timestamp executeAt = deserializeExecuteAtOrNull(row);
             Ballot promised = deserializePromisedOrNull(row);
             Ballot accepted = deserializeAcceptedOrNull(row);
+            Timestamp executeAtLeast = status.is(Status.Truncated) && 
txnId.kind().awaitsOnlyDeps() ? deserializeExecuteAtLeastOrNull(row) : null;
 
             WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row);
             MessageProvider messages = commandStore.makeMessageProvider(txnId);
 
-            return 
SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, 
status, executeAt, promised, accepted, waitingOn, messages);
+            return 
SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, 
status, executeAt, executeAtLeast, promised, accepted, waitingOn, messages);
         }
         catch (Throwable t)
         {
@@ -1306,6 +1311,11 @@ public class AccordKeyspace
         return deserializeTimestampOrNull(row, "execute_at", 
Timestamp::fromBits);
     }
 
+    public static Timestamp 
deserializeExecuteAtLeastOrNull(UntypedResultSet.Row row)
+    {
+        return deserializeTimestampOrNull(row, "execute_atleast", 
Timestamp::fromBits);
+    }
+
     public static Ballot deserializePromisedOrNull(UntypedResultSet.Row row)
     {
         return deserializeTimestampOrNull(row.getBlob("promised_ballot"), 
Ballot::fromBits);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 54f8c369ff..01331bb2d6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -43,6 +43,7 @@ import accord.impl.CoordinateDurabilityScheduling;
 import accord.primitives.SyncPoint;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.statements.RequestValidations;
+import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import 
org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory;
@@ -310,7 +311,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         this.scheduler = new AccordScheduler();
         this.dataStore = new AccordDataStore();
         this.configuration = new 
AccordConfiguration(DatabaseDescriptor.getRawConfig());
-        this.journal = new AccordJournal(configService);
+        this.journal = new AccordJournal(configService, 
DatabaseDescriptor.getAccord().journal);
         this.node = new Node(localId,
                              messageSink,
                              this::handleLocalRequest,
@@ -441,7 +442,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     private long doWithRetries(LongSupplier action, int retryAttempts, long 
initialBackoffMillis, long maxBackoffMillis) throws InterruptedException
     {
         // Since we could end up having the barrier transaction or the 
transaction it listens to invalidated
-        CoordinationFailed existingFailures = null;
+        RuntimeException existingFailures = null;
         Long success = null;
         long backoffMillis = 0;
         for (int attempt = 0; attempt < retryAttempts; attempt++)
@@ -462,7 +463,7 @@ public class AccordService implements IAccordService, 
Shutdownable
                 success = action.getAsLong();
                 break;
             }
-            catch (CoordinationFailed newFailures)
+            catch (RequestExecutionException | CoordinationFailed newFailures)
             {
                 existingFailures = Throwables.merge(existingFailures, 
newFailures);
             }
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 51041aab58..72e673f48b 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -215,7 +215,7 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
                     commandStore.abortCurrentOperation();
                 case LOADING:
                     context.releaseResources(commandStore);
-                    commandStore.executionOrder().unregister(this);
+                    commandStore.executionOrder().unregisterOutOfOrder(this);
                 case INITIALIZED:
                     break; // nothing to clean up, call callback
             }
@@ -239,6 +239,8 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
             default: throw new IllegalStateException("Unexpected state " + 
state);
             case INITIALIZED:
                 canRun = commandStore.executionOrder().register(this);
+                if (Invariants.isParanoid())
+                    Invariants.checkState(canRun.booleanValue() == 
commandStore.executionOrder().canRun(this), "Register of %s returned canRun=%s 
but canRun returned %s!", this, canRun, !canRun);
                 state(LOADING);
             case LOADING:
                 if (null == canRun)
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java 
b/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java
index 1ef3ff6c8d..19cf170966 100644
--- a/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java
+++ b/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java
@@ -21,6 +21,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.function.Consumer;
 
 import accord.api.Key;
 import accord.api.RoutingKey;
@@ -82,31 +83,9 @@ public class ExecutionOrder
             }
         }
 
-        Conflicts remove(AsyncOperation<?> operation)
+        Conflicts remove(AsyncOperation<?> operation, boolean allowOutOfOrder)
         {
-            if (operationOrQueue instanceof AsyncOperation<?>)
-            {
-                Invariants.checkState(operationOrQueue == operation);
-                rangeQueues.remove(range);
-            }
-            else
-            {
-                @SuppressWarnings("unchecked")
-                ArrayDeque<AsyncOperation<?>> queue = 
(ArrayDeque<AsyncOperation<?>>) operationOrQueue;
-                AsyncOperation<?> head = queue.poll();
-                Invariants.checkState(head == operation);
-
-                if (queue.isEmpty())
-                {
-                    rangeQueues.remove(range);
-                }
-                else
-                {
-                    head = queue.peek();
-                    if (canRun(head))
-                        head.onUnblocked();
-                }
-            }
+            unregister("range", range, operationOrQueue, operation, 
allowOutOfOrder, () -> rangeQueues.remove(range));
             return operationToConflicts.remove(operation);
         }
 
@@ -182,20 +161,12 @@ public class ExecutionOrder
                 result.rangeConflicts.add(e.getKey());
             }
             RangeState state = e.getValue();
-            Object operationOrQueue = state.operationOrQueue;
-            if (operationOrQueue instanceof AsyncOperation)
-            {
-                ArrayDeque<AsyncOperation<?>> queue = new ArrayDeque<>(4);
-                queue.add((AsyncOperation<?>) operationOrQueue);
-                queue.add(operation);
-                state.operationOrQueue = queue;
-            }
-            else
-            {
-                @SuppressWarnings("unchecked")
-                ArrayDeque<AsyncOperation<?>> queue = 
(ArrayDeque<AsyncOperation<?>>) operationOrQueue;
-                queue.add(operation);
-            }
+            // a single range could conflict with multiple other ranges, so it 
is possible that the operation
+            // exists in the queue already due to another range in the txn... 
simple example is
+            // keys = (0, 10], (12, 15]
+            // e.getKey() == (-100, 100]
+            // in this case the operation would attempt to double add since it 
has 2 keys that conflict with this single range
+            register(state.operationOrQueue, operation, q -> 
state.operationOrQueue = q);
         });
         if (result.sameRange != null)
         {
@@ -205,7 +176,7 @@ public class ExecutionOrder
         {
             rangeQueues.add(range, new RangeState(range, keyConflicts, 
result.rangeConflicts, operation));
         }
-        return keyConflicts == null && result.rangeConflicts == null;
+        return keyConflicts == null && result.rangeConflicts == null && 
result.sameRange == null;
     }
 
     /**
@@ -221,12 +192,19 @@ public class ExecutionOrder
             return true;
         }
 
+        register(operationOrQueue, operation, q -> queues.put(keyOrTxnId, q));
+        return false;
+    }
+
+    private void register(Object operationOrQueue, AsyncOperation<?> 
operation, Consumer<ArrayDeque<AsyncOperation<?>>> onCreateQueue)
+    {
         if (operationOrQueue instanceof AsyncOperation)
         {
+            Invariants.checkState(operationOrQueue != operation, "Attempted to 
double register operation %s", operation);
             ArrayDeque<AsyncOperation<?>> queue = new ArrayDeque<>(4);
             queue.add((AsyncOperation<?>) operationOrQueue);
             queue.add(operation);
-            queues.put(keyOrTxnId, queue);
+            onCreateQueue.accept(queue);
         }
         else
         {
@@ -234,23 +212,35 @@ public class ExecutionOrder
             ArrayDeque<AsyncOperation<?>> queue = 
(ArrayDeque<AsyncOperation<?>>) operationOrQueue;
             queue.add(operation);
         }
-        return false;
+    }
+
+    /**
+     * Unregister the operation as being a dependency for its keys and TxnIds, 
but do so even if it is unable to run now.
+     */
+    void unregisterOutOfOrder(AsyncOperation<?> operation)
+    {
+        unregister(operation, true);
     }
 
     /**
      * Unregister the operation as being a dependency for its keys and TxnIds
      */
     void unregister(AsyncOperation<?> operation)
+    {
+        unregister(operation, false);
+    }
+
+    private void unregister(AsyncOperation<?> operation, boolean 
allowOutOfOrder)
     {
         for (Seekable seekable : operation.keys())
         {
             switch (seekable.domain())
             {
                 case Key:
-                    unregister(seekable.asKey(), operation);
+                    unregister(seekable.asKey(), operation, allowOutOfOrder);
                     break;
                 case Range:
-                    unregister(seekable.asRange(), operation);
+                    unregister(seekable.asRange(), operation, allowOutOfOrder);
                     break;
                 default:
                     throw new AssertionError("Unexpected domain: " + 
seekable.domain());
@@ -259,48 +249,69 @@ public class ExecutionOrder
         }
         TxnId primaryTxnId = operation.primaryTxnId();
         if (null != primaryTxnId)
-            unregister(primaryTxnId, operation);
+            unregister(primaryTxnId, operation, allowOutOfOrder);
     }
 
-    private void unregister(Range range, AsyncOperation<?> operation)
+    private void unregister(Range range, AsyncOperation<?> operation, boolean 
allowOutOfOrder)
     {
         var state = state(range);
-        var conflicts = state.remove(operation);
+        var conflicts = state.remove(operation, allowOutOfOrder);
         if (conflicts.rangeConflicts != null)
-            conflicts.rangeConflicts.forEach(r -> state(r).remove(operation));
+            conflicts.rangeConflicts.forEach(r -> state(r).remove(operation, 
allowOutOfOrder));
         if (conflicts.keyConflicts != null)
-            conflicts.keyConflicts.forEach(k -> unregister(k, operation));
+            conflicts.keyConflicts.forEach(k -> unregister(k, operation, 
allowOutOfOrder));
     }
 
     /**
      * Unregister the operation as being a dependency for key or TxnId
      */
-    private void unregister(Object keyOrTxnId, AsyncOperation<?> operation)
+    private void unregister(Object keyOrTxnId, AsyncOperation<?> operation, 
boolean allowOutOfOrder)
     {
         Object operationOrQueue = queues.get(keyOrTxnId);
         Invariants.nonNull(operationOrQueue);
 
+        unregister("Key or TxnId", keyOrTxnId, operationOrQueue, operation, 
allowOutOfOrder, () -> queues.remove(keyOrTxnId));
+    }
+
+    private void unregister(String name, Object key, Object operationOrQueue, 
AsyncOperation<?> operation, boolean allowOutOfOrder, Runnable onEmpty)
+    {
         if (operationOrQueue instanceof AsyncOperation<?>)
         {
-            Invariants.checkState(operationOrQueue == operation);
-            queues.remove(keyOrTxnId);
+            Invariants.checkState(operationOrQueue == operation, "Only single 
operation present and was not %s; %s %s", name, key);
+            onEmpty.run();
         }
         else
         {
             @SuppressWarnings("unchecked")
             ArrayDeque<AsyncOperation<?>> queue = 
(ArrayDeque<AsyncOperation<?>>) operationOrQueue;
-            AsyncOperation<?> head = queue.poll();
-            Invariants.checkState(head == operation);
+            if (allowOutOfOrder)
+            {
+                Invariants.checkState(queue.remove(operation), "Operation %s 
was not found in queue: %s; %s %s", operation, queue, name, key);
+            }
+            else
+            {
+                Invariants.checkState(queue.peek() == operation, "Operation %s 
is not at the top of the queue; %s; %s %s", operation, queue, name, key);
+                queue.poll();
+            }
 
             if (queue.isEmpty())
             {
-                queues.remove(keyOrTxnId);
+                onEmpty.run();
             }
             else
             {
-                head = queue.peek();
-                if (canRun(head))
-                    head.onUnblocked();
+                AsyncOperation<?> next = queue.peek();
+                if (next == operation)
+                {
+                    // a single range could conflict with multiple other 
ranges, so it is possible that the operation
+                    // exists in the queue already due to another range in the 
txn... simple example is
+                    // keys = (0, 10], (12, 15]
+                    // e.getKey() == (-100, 100]
+                    // in this case the operation would attempt to double add 
since it has 2 keys that conflict with this single range
+                    return;
+                }
+                if (canRun(next))
+                    next.onUnblocked();
             }
         }
     }
@@ -357,7 +368,7 @@ public class ExecutionOrder
     private RangeState state(Range range)
     {
         var list = rangeQueues.get(range);
-        assert list.size() == 1 : String.format("Expected 1 element but saw 
list %s", list);
+        assert list.size() == 1 : String.format("Expected 1 element for range 
%s but saw list %s", range, list);
         return list.get(0);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java
index fb6232c560..3c2422dfa4 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java
@@ -120,7 +120,7 @@ public class ConsensusRequestRouter
         ClusterMetadata cm = ClusterMetadata.current();
         TableMetadata metadata = cm.schema.getTableMetadata(tableId);
         if (metadata == null)
-            throw new IllegalStateException("Can't route consensus request for 
nonexistent table %s".format(tableId.toString()));
+            throw new IllegalStateException(String.format("Can't route 
consensus request for nonexistent table %s", tableId));
 
         if (!mayWriteThroughAccord(metadata))
             return false;
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index cd98c71f18..22dd55bb31 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import com.google.common.collect.ImmutableMap;
 
 import accord.topology.TopologyUtils;
+import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.schema.*;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -166,7 +167,7 @@ public class AccordJournalSimulationTest extends 
SimulationTestBase
             }
         }
         private static final ExecutorPlus executor = 
ExecutorFactory.Global.executorFactory().pooled("name", 10);
-        private static final AccordJournal journal = new AccordJournal(null);
+        private static final AccordJournal journal = new AccordJournal(null, 
new AccordSpec.JournalSpec());
         private static final int events = 100;
         private static final CountDownLatch eventsWritten = 
CountDownLatch.newCountDownLatch(events);
         private static final CountDownLatch eventsDurable = 
CountDownLatch.newCountDownLatch(events);
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 7fbfc89cc8..fb47302904 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -78,6 +78,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.auth.INetworkAuthorizer",
     "org.apache.cassandra.auth.IRoleManager",
     "org.apache.cassandra.config.AccordSpec",
+    "org.apache.cassandra.config.AccordSpec$JournalSpec",
     "org.apache.cassandra.config.AccordSpec$TransactionalRangeMigration",
     "org.apache.cassandra.config.CassandraRelevantProperties",
     
"org.apache.cassandra.config.CassandraRelevantProperties$PropertyConverter",
@@ -267,6 +268,9 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.util.PathUtils$IOToLongFunction",
     "org.apache.cassandra.io.util.RebufferingInputStream",
     "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy",
+    "org.apache.cassandra.journal.Params",
+    "org.apache.cassandra.journal.Params$FailurePolicy",
+    "org.apache.cassandra.journal.Params$FlushMode",
     "org.apache.cassandra.locator.Endpoint",
     "org.apache.cassandra.locator.IEndpointSnitch",
     "org.apache.cassandra.locator.InetAddressAndPort",
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index ee38ffceb9..5cbf0915b5 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -91,6 +91,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey;
 import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.assertj.core.api.Assertions;
 
 import static accord.impl.TimestampsForKey.NO_LAST_EXECUTED_HLC;
 import static accord.local.KeyHistory.COMMANDS;
@@ -369,9 +370,14 @@ public class CompactionAccordIteratorsTest
             assertEquals(1, Iterators.size(partition.unfilteredIterator()));
             ByteBuffer[] partitionKeyComponents = 
CommandRows.splitPartitionKey(partition.partitionKey());
             Row row = (Row)partition.unfilteredIterator().next();
-            assertEquals(commands.metadata().regularColumns().size(), 
row.columnCount());
+
+            // execute_atleast is null, so when we read from the scanner the 
column won't be present in the partition
+            Assertions.assertThat(new 
ArrayList<>(row.columns())).isEqualTo(commands.metadata().regularColumns().stream().filter(c
 -> !c.name.toString().equals("execute_atleast")).collect(Collectors.toList()));
             for (ColumnMetadata cm : commands.metadata().regularColumns())
+            {
+                if (cm.name.toString().equals("execute_atleast")) continue;
                 assertNotNull(row.getColumnData(cm));
+            }
             assertEquals(TXN_ID, CommandRows.getTxnId(partitionKeyComponents));
             assertEquals(SaveStatus.Applied, 
AccordKeyspace.CommandRows.getStatus(row));
         };
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index a608f41f8d..0d3b3d005c 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -76,6 +76,7 @@ import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.concurrent.ManualExecutor;
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
@@ -399,7 +400,7 @@ public class AccordTestUtils
             public long unix(TimeUnit timeUnit) { return 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, 
this::now).applyAsLong(timeUnit); }
         };
 
-        AccordJournal journal = new AccordJournal(null);
+        AccordJournal journal = new AccordJournal(null, new 
AccordSpec.JournalSpec());
         journal.start(null);
 
         SingleEpochRanges holder = new 
SingleEpochRanges(topology.rangesForNode(node));
diff --git a/test/unit/org/apache/cassandra/service/accord/MockJournal.java 
b/test/unit/org/apache/cassandra/service/accord/MockJournal.java
index 8a68163ede..dc22f540ed 100644
--- a/test/unit/org/apache/cassandra/service/accord/MockJournal.java
+++ b/test/unit/org/apache/cassandra/service/accord/MockJournal.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import accord.local.SerializerSupport;
 import accord.messages.Accept;
 import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
 import accord.messages.BeginRecovery;
 import accord.messages.Commit;
 import accord.messages.Message;
@@ -43,15 +44,18 @@ import 
org.apache.cassandra.service.accord.AccordJournal.Type;
 import static accord.messages.MessageType.ACCEPT_REQ;
 import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
 import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
 import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
 import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
 import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
 import static accord.messages.MessageType.PRE_ACCEPT_REQ;
 import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
+import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
 import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
 import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
 import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
 import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
+import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
 
 public class MockJournal implements IJournal
 {
@@ -61,6 +65,12 @@ public class MockJournal implements IJournal
     {
         return new SerializerSupport.MessageProvider()
         {
+            @Override
+            public TxnId txnId()
+            {
+                return txnId;
+            }
+
             @Override
             public Set<MessageType> test(Set<MessageType> messages)
             {
@@ -146,6 +156,12 @@ public class MockJournal implements IJournal
                 return get(STABLE_FAST_PATH_REQ);
             }
 
+            @Override
+            public Commit stableSlowPath()
+            {
+                return get(STABLE_SLOW_PATH_REQ);
+            }
+
             @Override
             public Commit stableMaximal()
             {
@@ -175,6 +191,18 @@ public class MockJournal implements IJournal
             {
                 return get(PROPAGATE_APPLY_MSG);
             }
+
+            @Override
+            public Propagate propagateOther()
+            {
+                return get(PROPAGATE_OTHER_MSG);
+            }
+
+            @Override
+            public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+            {
+                return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+            }
         };
     }
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index bf28d696c8..1a1b7f98d2 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 
 import accord.impl.SizeOfIntersectionSorter;
@@ -76,7 +77,7 @@ import static 
org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
 import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
 import static org.apache.cassandra.utils.AccordGenerators.fromQT;
 
-class SimulatedAccordCommandStore implements AutoCloseable
+public class SimulatedAccordCommandStore implements AutoCloseable
 {
     private final List<Throwable> failures = new ArrayList<>();
     private final SimulatedExecutorFactory globalExecutor;
@@ -90,8 +91,9 @@ class SimulatedAccordCommandStore implements AutoCloseable
     public final MockJournal journal;
     public final ScheduledExecutorPlus unorderedScheduled;
     public final List<String> evictions = new ArrayList<>();
+    public Predicate<Throwable> ignoreExceptions = ignore -> false;
 
-    SimulatedAccordCommandStore(RandomSource rs)
+    public SimulatedAccordCommandStore(RandomSource rs)
     {
         globalExecutor = new SimulatedExecutorFactory(rs.fork(), 
fromQT(Generators.TIMESTAMP_GEN.map(java.sql.Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs),
 failures::add);
         this.unorderedScheduled = globalExecutor.scheduled("ignored");
@@ -151,6 +153,13 @@ class SimulatedAccordCommandStore implements AutoCloseable
                                                 {
                                                     return false;
                                                 }
+
+                                                @Override
+                                                public void 
onUncaughtException(Throwable t)
+                                                {
+                                                    if 
(ignoreExceptions.test(t)) return;
+                                                    
super.onUncaughtException(t);
+                                                }
                                             },
                                             null,
                                             ignore -> 
AccordTestUtils.NOOP_PROGRESS_LOG,
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
 
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
new file mode 100644
index 0000000000..6e216ff56d
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import accord.api.Key;
+import accord.impl.basic.SimulatedFault;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.RandomSource;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordCommandStore;
+import org.apache.cassandra.service.accord.AccordKeyspace;
+import org.apache.cassandra.service.accord.SimulatedAccordCommandStore;
+import org.apache.cassandra.service.accord.SimulatedAccordCommandStoreTestBase;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+public class SimulatedAsyncOperationTest extends 
SimulatedAccordCommandStoreTestBase
+{
+    @Before
+    public void precondition()
+    {
+        
Assertions.assertThat(intTbl.partitioner).isEqualTo(Murmur3Partitioner.instance);
+    }
+
+    @Test
+    public void happyPath()
+    {
+        qt().withExamples(100).check(rs -> test(rs, 100, intTbl, ignore -> 
Action.SUCCESS));
+    }
+
+    @Test
+    public void fuzz()
+    {
+        Gen<Action> actionGen = Gens.enums().allWithWeights(Action.class, 10, 
1, 1);
+        qt().withExamples(100).check(rs -> test(rs, 100, intTbl, actionGen));
+    }
+
+    private static void test(RandomSource rs, int numSamples, TableMetadata 
tbl, Gen<Action> actionGen) throws Exception
+    {
+        AccordKeyspace.unsafeClear();
+
+        int numKeys = rs.nextInt(20, 1000);
+        long minToken = 0;
+        long maxToken = numKeys;
+
+        Gen<Key> keyGen = Gens.longs().between(minToken + 1, maxToken).map(t 
-> new PartitionKey(tbl.id, 
tbl.partitioner.decorateKey(LongToken.keyForToken(t))));
+
+
+        Gen<Keys> keysGen = Gens.lists(keyGen).unique().ofSizeBetween(1, 
10).map(l -> Keys.of(l));
+        Gen<Ranges> rangesGen = Gens.lists(rangeInsideRange(tbl.id, minToken, 
maxToken)).uniqueBestEffort().ofSizeBetween(1, 10).map(l -> 
Ranges.of(l.toArray(Range[]::new)));
+        Gen<Seekables<?, ?>> seekablesGen = Gens.oneOf(keysGen, rangesGen);
+
+        try (var instance = new SimulatedAccordCommandStore(rs))
+        {
+            instance.ignoreExceptions = t -> t instanceof SimulatedFault;
+            Counter counter = new Counter();
+            for (int i = 0; i < numSamples; i++)
+            {
+                PreLoadContext ctx = 
PreLoadContext.contextFor(seekablesGen.next(rs));
+                operation(instance, ctx, actionGen.next(rs), 
rs::nextBoolean).begin((ignore, failure) -> {
+                    counter.counter++;
+                    if (failure != null && !(failure instanceof 
SimulatedFault)) throw new AssertionError("Unexpected error", failure);
+                });
+            }
+            instance.processAll();
+            Assertions.assertThat(counter.counter).isEqualTo(numSamples);
+        }
+    }
+
+    private static Gen<Range> rangeInsideRange(TableId tableId, long minToken, 
long maxToken)
+    {
+        if (minToken + 1 == maxToken)
+        {
+            // only one range is possible...
+            return Gens.constant(range(tableId, minToken, maxToken));
+        }
+        return rs -> {
+            long a = rs.nextLong(minToken, maxToken + 1);
+            long b = rs.nextLong(minToken, maxToken + 1);
+            while (a == b)
+                b = rs.nextLong(minToken, maxToken + 1);
+            if (a > b)
+            {
+                long tmp = a;
+                a = b;
+                b = tmp;
+            }
+            return range(tableId, a, b);
+        };
+    }
+
+    private static TokenRange range(TableId tableId, long start, long end)
+    {
+        return new TokenRange(new TokenKey(tableId, new LongToken(start)), new 
TokenKey(tableId, new LongToken(end)));
+    }
+
+    private enum Action {SUCCESS, FAILURE, LOAD_FAILURE}
+
+    private static AsyncOperation<Void> operation(SimulatedAccordCommandStore 
instance, PreLoadContext ctx, Action action, BooleanSupplier delay)
+    {
+        return new SimulatedOperation(instance.store, ctx, action == 
Action.FAILURE ? SimulatedOperation.Action.FAILURE : 
SimulatedOperation.Action.SUCCESS)
+        {
+            @Override
+            AsyncLoader createAsyncLoader(AccordCommandStore commandStore, 
PreLoadContext preLoadContext)
+            {
+                return new SimulatedLoader(action == 
SimulatedAsyncOperationTest.Action.LOAD_FAILURE ? 
SimulatedLoader.Action.FAILURE : SimulatedLoader.Action.SUCCESS, 
delay.getAsBoolean(), instance.unorderedScheduled);
+            }
+        };
+    }
+
+    private static class Counter
+    {
+        int counter = 0;
+    }
+
+    private static class SimulatedOperation extends AsyncOperation<Void>
+    {
+        enum Action { SUCCESS, FAILURE}
+        private final Action action;
+
+        public SimulatedOperation(AccordCommandStore commandStore, 
PreLoadContext preLoadContext, Action action)
+        {
+            super(commandStore, preLoadContext);
+            this.action = action;
+        }
+
+        @Override
+        public Void apply(SafeCommandStore safe)
+        {
+            if (action == Action.FAILURE)
+                throw new SimulatedFault("Operation failed for keys " + 
keys());
+            return null;
+        }
+    }
+
+    private static class SimulatedLoader extends AsyncLoader
+    {
+
+        enum Action { SUCCESS, FAILURE}
+
+        private final Action action;
+        private boolean delay;
+        private final ScheduledExecutorService executor;
+        SimulatedLoader(Action action, boolean delay, ScheduledExecutorService 
executor)
+        {
+            super(null, null, null, null);
+            this.action = action;
+            this.delay = delay;
+            this.executor = executor;
+        }
+
+        @Override
+        public boolean load(AsyncOperation.Context context, BiConsumer<Object, 
Throwable> callback)
+        {
+            if (delay)
+            {
+                executor.schedule(() -> {
+                    callback.accept(null, action == Action.FAILURE ? new 
SimulatedFault("Failure loading " + context) : null);
+                }, 1, TimeUnit.SECONDS);
+                delay = false;
+                return false;
+            }
+            if (action == Action.FAILURE)
+                throw new SimulatedFault("Failure loading " + context);
+
+            return true;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to