This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 09dd3d69af CEP-15: (Accord) When nodes are removed from a cluster, 
need to update topology tracking to avoid being blocked
09dd3d69af is described below

commit 09dd3d69af6755d6f68b6e0d019238f793f3246a
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Tue Jun 18 13:55:38 2024 -0700

    CEP-15: (Accord) When nodes are removed from a cluster, need to update 
topology tracking to avoid being blocked
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19719
---
 build.xml                                          |   3 +-
 modules/accord                                     |   2 +-
 .../service/accord/AccordConfigurationService.java | 138 +++-
 .../cassandra/service/accord/AccordKeyspace.java   |   3 +-
 .../service/accord/AccordSyncPropagator.java       |  82 ++-
 .../concurrent/SimulatedExecutorFactory.java       |  18 +
 .../index/accord/AccordIndexStressTest.java        |   8 +-
 .../cassandra/index/accord/RouteIndexTest.java     |  15 +-
 .../cassandra/net/SimulatedMessageDelivery.java    | 333 +++++++++
 .../org/apache/cassandra/repair/FuzzTestBase.java  | 272 +-------
 .../accord/AccordConfigurationServiceTest.java     |  11 +-
 .../service/accord/AccordKeyspaceTest.java         |   4 +-
 .../service/accord/AccordSyncPropagatorTest.java   |   8 +
 .../cassandra/service/accord/EpochSyncTest.java    | 753 +++++++++++++++++++++
 .../service/accord/LoggingDiskStateManager.java    |  93 +++
 .../service/accord/MockDiskStateManager.java       |  79 +++
 .../cassandra/utils/StatefulRangeTreeTest.java     |  14 +-
 17 files changed, 1548 insertions(+), 288 deletions(-)

diff --git a/build.xml b/build.xml
index d3943b1a35..4ed704de7e 100644
--- a/build.xml
+++ b/build.xml
@@ -1278,13 +1278,12 @@
       <condition property="maxMemory" value="8G" else="4G">
           <equals arg1="${test.classlistprefix}" arg2="distributed"/>
       </condition>
-      <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" 
filelist="@{test.file.list}" exclude="**/*.java" timeout="${test.timeout}">
+      <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" 
filelist="@{test.file.list}" exclude="**/*.java" timeout="${test.timeout}" 
maxmemory="${maxMemory}">
         <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
         <jvmarg 
value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
         <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
         <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
         <jvmarg value="-Dcassandra.skip_sync=true" />
-        <jvmarg value="-Xmx${maxMemory}"/>
       </testmacrohelper>
     </sequential>
   </macrodef>
diff --git a/modules/accord b/modules/accord
index f1f5ea5ccb..694ae39e2e 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit f1f5ea5ccbd6e0a8abf579a4331fa84a1b3d9f95
+Subproject commit 694ae39e2e00075bdabd47632dced0db12a9981d
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index d2a14a44db..1e6cb1d769 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -34,6 +34,8 @@ import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
+import org.agrona.collections.LongArrayList;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.concurrent.Stage;
@@ -57,6 +59,7 @@ import static 
org.apache.cassandra.utils.Simulate.With.MONITORS;
 public class AccordConfigurationService extends 
AbstractConfigurationService<AccordConfigurationService.EpochState, 
AccordConfigurationService.EpochHistory> implements ChangeListener, 
AccordEndpointMapper, AccordSyncPropagator.Listener, Shutdownable
 {
     private final AccordSyncPropagator syncPropagator;
+    private final DiskStateManager diskStateManager;
 
     private EpochDiskState diskState = EpochDiskState.EMPTY;
 
@@ -114,15 +117,88 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         }
     }
 
-    public AccordConfigurationService(Node.Id node, MessageDelivery 
messagingService, IFailureDetector failureDetector)
+    @VisibleForTesting
+    interface DiskStateManager
+    {
+        EpochDiskState loadTopologies(AccordKeyspace.TopologyLoadConsumer 
consumer);
+        EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> pending, 
EpochDiskState diskState);
+
+        EpochDiskState setCompletedLocalSync(long epoch, EpochDiskState 
diskState);
+
+        EpochDiskState markLocalSyncAck(Node.Id id, long epoch, EpochDiskState 
diskState);
+
+        EpochDiskState saveTopology(Topology topology, EpochDiskState 
diskState);
+
+        EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, 
EpochDiskState diskState);
+
+        EpochDiskState markClosed(Ranges ranges, long epoch, EpochDiskState 
diskState);
+
+        EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState 
diskState);
+    }
+
+    enum SystemTableDiskStateManager implements DiskStateManager
+    {
+        instance;
+
+        @Override
+        public EpochDiskState 
loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer)
+        {
+            return AccordKeyspace.loadTopologies(consumer);
+        }
+
+        @Override
+        public EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> 
notify, EpochDiskState diskState)
+        {
+            return AccordKeyspace.setNotifyingLocalSync(epoch, notify, 
diskState);
+        }
+
+        @Override
+        public EpochDiskState setCompletedLocalSync(long epoch, EpochDiskState 
diskState)
+        {
+            return AccordKeyspace.setCompletedLocalSync(epoch, diskState);
+        }
+
+        @Override
+        public EpochDiskState markLocalSyncAck(Node.Id id, long epoch, 
EpochDiskState diskState)
+        {
+            return AccordKeyspace.markLocalSyncAck(id, epoch, diskState);
+        }
+
+        @Override
+        public EpochDiskState saveTopology(Topology topology, EpochDiskState 
diskState)
+        {
+            return AccordKeyspace.saveTopology(topology, diskState);
+        }
+
+        @Override
+        public EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, 
EpochDiskState diskState)
+        {
+            return AccordKeyspace.markRemoteTopologySync(node, epoch, 
diskState);
+        }
+
+        @Override
+        public EpochDiskState markClosed(Ranges ranges, long epoch, 
EpochDiskState diskState)
+        {
+            return AccordKeyspace.markClosed(ranges, epoch, diskState);
+        }
+
+        @Override
+        public EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState 
diskState)
+        {
+            return AccordKeyspace.truncateTopologyUntil(epoch, diskState);
+        }
+    }
+
+    public AccordConfigurationService(Node.Id node, MessageDelivery 
messagingService, IFailureDetector failureDetector, DiskStateManager 
diskStateManager, ScheduledExecutorPlus scheduledTasks)
     {
         super(node);
-        this.syncPropagator = new AccordSyncPropagator(localId, this, 
messagingService, failureDetector, ScheduledExecutors.scheduledTasks, this);
+        this.syncPropagator = new AccordSyncPropagator(localId, this, 
messagingService, failureDetector, scheduledTasks, this);
+        this.diskStateManager = diskStateManager;
     }
 
     public AccordConfigurationService(Node.Id node)
     {
-        this(node, MessagingService.instance(), FailureDetector.instance);
+        this(node, MessagingService.instance(), FailureDetector.instance, 
SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks);
     }
 
     @Override
@@ -137,7 +213,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         state = State.LOADING;
         updateMapping(ClusterMetadata.current());
         EndpointMapping snapshot = mapping;
-        diskState = AccordKeyspace.loadTopologies(((epoch, topology, 
syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> {
+        diskState = diskStateManager.loadTopologies(((epoch, topology, 
syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> {
             if (topology != null)
                 reportTopology(topology, syncStatus == SyncStatus.NOT_STARTED);
 
@@ -221,12 +297,41 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
             synchronized (AccordConfigurationService.this)
             {
                 updateMapping(metadata);
-                reportTopology(AccordTopology.createAccordTopology(metadata));
+                Topology topology = 
AccordTopology.createAccordTopology(metadata);
+                Topology current = isEmpty() ? Topology.EMPTY : 
currentTopology();
+                reportTopology(topology);
+                Sets.SetView<Node.Id> removedNodes = 
Sets.difference(current.nodes(), topology.nodes());
+                if (!removedNodes.isEmpty())
+                    onNodesRemoved(topology.epoch(), removedNodes);
             }
         });
     }
 
-    private void maybeReportMetadata(ClusterMetadata metadata)
+    private synchronized void onNodesRemoved(long epoch, Set<Node.Id> removed)
+    {
+        syncPropagator.onNodesRemoved(removed);
+        for (long oldEpoch : nonCompletedEpochsBefore(epoch))
+        {
+            for (Node.Id node : removed)
+                receiveRemoteSyncComplete(node, oldEpoch);
+        }
+        listeners.forEach(l -> l.onRemoveNodes(epoch, removed));
+    }
+
+    private long[] nonCompletedEpochsBefore(long max)
+    {
+        LongArrayList notComplete = new LongArrayList();
+        for (long epoch = epochs.minEpoch(); epoch <= max && epoch <= 
epochs.maxEpoch(); epoch++)
+        {
+            EpochSnapshot snapshot = getEpochSnapshot(epoch);
+            if (snapshot.syncStatus != SyncStatus.COMPLETED)
+                notComplete.add(epoch);
+        }
+        return notComplete.toLongArray();
+    }
+
+    @VisibleForTesting
+    void maybeReportMetadata(ClusterMetadata metadata)
     {
         // don't report metadata until the previous one has been acknowledged
         synchronized (this)
@@ -265,7 +370,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
             return;
 
         Set<Node.Id> notify = topology.nodes().stream().filter(i -> 
!localId.equals(i)).collect(Collectors.toSet());
-        diskState = AccordKeyspace.setNotifyingLocalSync(epoch, notify, 
diskState);
+        diskState = diskStateManager.setNotifyingLocalSync(epoch, notify, 
diskState);
         epochState.setSyncStatus(SyncStatus.NOTIFYING);
         syncPropagator.reportSyncComplete(epoch, notify, localId);
     }
@@ -276,7 +381,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         EpochState epochState = getOrCreateEpochState(epoch);
         if (epochState.syncStatus != SyncStatus.NOTIFYING)
             return;
-        diskState = AccordKeyspace.markLocalSyncAck(id, epoch, diskState);
+        diskState = diskStateManager.markLocalSyncAck(id, epoch, diskState);
     }
 
     @Override
@@ -284,21 +389,21 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     {
         EpochState epochState = getOrCreateEpochState(epoch);
         epochState.setSyncStatus(SyncStatus.COMPLETED);
-        diskState = AccordKeyspace.setCompletedLocalSync(epoch, diskState);
+        diskState = diskStateManager.setCompletedLocalSync(epoch, diskState);
     }
 
     @Override
     protected synchronized void topologyUpdatePreListenerNotify(Topology 
topology)
     {
         if (state == State.STARTED)
-            diskState = AccordKeyspace.saveTopology(topology, diskState);
+            diskState = diskStateManager.saveTopology(topology, diskState);
     }
 
     @Override
     protected synchronized void 
receiveRemoteSyncCompletePreListenerNotify(Node.Id node, long epoch)
     {
         if (state == State.STARTED)
-            diskState = AccordKeyspace.markRemoteTopologySync(node, epoch, 
diskState);
+            diskState = diskStateManager.markRemoteTopologySync(node, epoch, 
diskState);
     }
 
     @Override
@@ -309,6 +414,11 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         syncPropagator.reportClosed(epoch, topology.nodes(), ranges);
     }
 
+    public AccordSyncPropagator syncPropagator()
+    {
+        return syncPropagator;
+    }
+
     @Override
     public synchronized void reportEpochRedundant(Ranges ranges, long epoch)
     {
@@ -321,14 +431,14 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     @Override
     public synchronized void receiveClosed(Ranges ranges, long epoch)
     {
-        diskState = AccordKeyspace.markClosed(ranges, epoch, diskState);
+        diskState = diskStateManager.markClosed(ranges, epoch, diskState);
         super.receiveClosed(ranges, epoch);
     }
 
     @Override
     public synchronized void receiveRedundant(Ranges ranges, long epoch)
     {
-        diskState = AccordKeyspace.markClosed(ranges, epoch, diskState);
+        diskState = diskStateManager.markClosed(ranges, epoch, diskState);
         super.receiveRedundant(ranges, epoch);
     }
 
@@ -342,7 +452,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     protected synchronized void truncateTopologiesPostListenerNotify(long 
epoch)
     {
         if (state == State.STARTED)
-            diskState = AccordKeyspace.truncateTopologyUntil(epoch, diskState);
+            diskState = diskStateManager.truncateTopologyUntil(epoch, 
diskState);
     }
 
     private void checkStarted()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index b96ffa9bd0..a2eb1c23d5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -1586,7 +1586,8 @@ public class AccordKeyspace
             return minEpoch == maxEpoch && maxEpoch == 0;
         }
 
-        private EpochDiskState withNewMaxEpoch(long epoch)
+        @VisibleForTesting
+        EpochDiskState withNewMaxEpoch(long epoch)
         {
             Invariants.checkArgument(epoch > maxEpoch, "Epoch %d <= %d (max)", 
epoch, maxEpoch);
             return EpochDiskState.create(Math.max(1, minEpoch), epoch);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java 
b/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java
index e16facee4a..2c9626718d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -29,6 +30,9 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.local.Node;
 import accord.messages.SimpleReply;
 import accord.primitives.Ranges;
@@ -38,6 +42,7 @@ import org.agrona.collections.Long2ObjectHashMap;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -50,6 +55,7 @@ import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.service.accord.serializers.TopologySerializers;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.CollectionSerializers;
 
 import static 
org.apache.cassandra.utils.CollectionSerializers.newListSerializer;
@@ -59,6 +65,8 @@ import static 
org.apache.cassandra.utils.CollectionSerializers.newListSerializer
  */
 public class AccordSyncPropagator
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordSyncPropagator.class);
+
     public static final IVerbHandler<List<Notification>> verbHandler = message 
-> {
         if (!AccordService.isSetup())
             return;
@@ -120,6 +128,11 @@ public class AccordSyncPropagator
             return new Notification(epoch, Collections.emptySet(), 
Ranges.EMPTY, addRedundant);
         }
 
+        boolean isEmpty()
+        {
+            return syncComplete.isEmpty() && closed.isEmpty() && 
redundant.isEmpty();
+        }
+
         boolean ack(Notification notification)
         {
             if (!notification.syncComplete.isEmpty())
@@ -201,6 +214,15 @@ public class AccordSyncPropagator
         return !pending.isEmpty();
     }
 
+    synchronized boolean hasPending(long epoch)
+    {
+        if (pending.isEmpty()) return false;
+        return pending.values().stream().allMatch(n -> {
+            PendingEpoch p = n.get(epoch);
+            return p != null && !p.isEmpty();
+        });
+    }
+
     @Override
     public String toString()
     {
@@ -210,6 +232,28 @@ public class AccordSyncPropagator
                '}';
     }
 
+    public synchronized void onNodesRemoved(Set<Node.Id> removed)
+    {
+        for (Node.Id node : removed)
+        {
+            PendingEpochs pendingEpochs = pending.get(node.id);
+            if (pendingEpochs == null) continue;
+            long[] toComplete = new long[pendingEpochs.size()];
+            Long2ObjectHashMap<PendingEpoch>.KeyIterator it = 
pendingEpochs.keySet().iterator();
+            for (int i = 0; it.hasNext(); i++)
+                toComplete[i] = it.nextLong();
+            Arrays.sort(toComplete);
+            for (long epoch : toComplete)
+                listener.onEndpointAck(node, epoch);
+            pending.remove(node.id);
+            for (long epoch : toComplete)
+            {
+                if (hasSyncCompletedFor(epoch))
+                    listener.onComplete(epoch);
+            }
+        }
+    }
+
     public void reportSyncComplete(long epoch, Collection<Node.Id> notify, 
Node.Id syncCompleteId)
     {
         if (notify.isEmpty())
@@ -258,17 +302,13 @@ public class AccordSyncPropagator
     private boolean notify(Node.Id to, List<Notification> notifications)
     {
         InetAddressAndPort toEp = endpointMapper.mappedEndpoint(to);
-        if (!failureDetector.isAlive(toEp))
-        {
-            scheduler.schedule(() -> notify(to, notifications), 1, 
TimeUnit.MINUTES);
-            return false;
-        }
         Message<List<Notification>> msg = 
Message.out(Verb.ACCORD_SYNC_NOTIFY_REQ, notifications);
-        messagingService.sendWithCallback(msg, toEp, new 
RequestCallback<SimpleReply>(){
+        RequestCallback<SimpleReply> cb = new RequestCallback<>()
+        {
             @Override
             public void onResponse(Message<SimpleReply> msg)
             {
-                Invariants.checkState(msg.payload == SimpleReply.Ok, 
"Unexpected message: %s",  msg);
+                Invariants.checkState(msg.payload == SimpleReply.Ok, 
"Unexpected message: %s", msg);
                 Set<Long> completedEpochs = new HashSet<>();
                 // TODO review is it a good idea to call the listener while 
not holding the `AccordSyncPropagator` lock?
                 synchronized (AccordSyncPropagator.this)
@@ -304,7 +344,22 @@ public class AccordSyncPropagator
             {
                 return true;
             }
-        });
+        };
+        if (!failureDetector.isAlive(toEp))
+        {
+            // was the endpoint removed from membership?
+            ClusterMetadata metadata = ClusterMetadata.current();
+            if (Gossiper.instance.getEndpointStateForEndpoint(toEp) == null && 
!metadata.directory.allJoinedEndpoints().contains(toEp) && 
!metadata.fullCMSMembers().contains(toEp))
+            {
+                // endpoint no longer exists...
+                cb.onResponse(msg.responseWith(SimpleReply.Ok));
+                return true;
+            }
+            logger.warn("Node{} is not alive, unable to notify of {}", to, 
notifications);
+            scheduler.schedule(() -> notify(to, notifications), 1, 
TimeUnit.MINUTES);
+            return false;
+        }
+        messagingService.sendWithCallback(msg, toEp, cb);
         return true;
     }
 
@@ -352,5 +407,16 @@ public class AccordSyncPropagator
             this.closed = closed;
             this.redundant = redundant;
         }
+
+        @Override
+        public String toString()
+        {
+            return "Notification{" +
+                   "epoch=" + epoch +
+                   ", syncComplete=" + syncComplete +
+                   ", closed=" + closed +
+                   ", redundant=" + redundant +
+                   '}';
+        }
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java 
b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
index 884927bd9a..677f33e704 100644
--- a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
+++ b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.concurrent;
 
+import java.sql.Timestamp;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
 import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.Generators;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
@@ -50,6 +52,7 @@ import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.InternalState
 import static org.apache.cassandra.concurrent.Interruptible.State.INTERRUPTED;
 import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static 
org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
+import static org.apache.cassandra.utils.AccordGenerators.fromQT;
 
 public class SimulatedExecutorFactory implements ExecutorFactory, Clock
 {
@@ -96,6 +99,16 @@ public class SimulatedExecutorFactory implements 
ExecutorFactory, Clock
     private long nowNanos;
     private int repeatedTasks = 0;
 
+    public SimulatedExecutorFactory(RandomSource rs, Consumer<Throwable> 
onError)
+    {
+        this(rs, 
fromQT(Generators.TIMESTAMP_GEN.map(Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs),
 onError);
+    }
+
+    public SimulatedExecutorFactory(RandomSource rs)
+    {
+        this(rs, null);
+    }
+
     public SimulatedExecutorFactory(RandomSource rs, long startTimeNanos)
     {
         this(rs, startTimeNanos, null);
@@ -117,6 +130,11 @@ public class SimulatedExecutorFactory implements 
ExecutorFactory, Clock
         });
     }
 
+    public boolean hasWork()
+    {
+        return queue.size() > repeatedTasks;
+    }
+
     public boolean processOne()
     {
         // if we count the repeated tasks, then processAll will never complete
diff --git 
a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java 
b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
index 02221eb200..c4085c6083 100644
--- a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
@@ -257,11 +257,11 @@ public class AccordIndexStressTest extends CQLTester
                 {
                     case Key:
                     {
-                        store = 
rs.pick(storeToTableToRoutingKeysToTxns.keySet());
+                        store = 
rs.pickUnorderedSet(storeToTableToRoutingKeysToTxns.keySet());
                         var actual = 
this.storeToTableToRoutingKeysToTxns.get(store);
                         var tableToTokens = store2Table2Tokens.get(store);
 
-                        table = rs.pick(actual.keySet());
+                        table = rs.pickUnorderedSet(actual.keySet());
                         var tokens = tableToTokens.get(table);
 
                         var offset = rs.nextInt(0, tokens.length);
@@ -274,11 +274,11 @@ public class AccordIndexStressTest extends CQLTester
                     break;
                     case Range:
                     {
-                        store = rs.pick(storeToTableToRangesToTxns.keySet());
+                        store = 
rs.pickUnorderedSet(storeToTableToRangesToTxns.keySet());
                         var tableToRangesToTxns = 
storeToTableToRangesToTxns.get(store);
                         var tableToRanges = store2Table2Ranges.get(store);
 
-                        table = rs.pick(tableToRangesToTxns.keySet());
+                        table = 
rs.pickUnorderedSet(tableToRangesToTxns.keySet());
                         var wrapper = tableToRanges.get(table);
                         var ranges = wrapper.ranges;
                         var tree = wrapper.tree;
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java 
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index 84c64bcb45..5200316a2f 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -125,7 +126,7 @@ public class RouteIndexTest extends CQLTester.InMemory
             @Override
             public Gen<Command<State, ColumnFamilyStore, ?>> commands(State 
state)
             {
-                Map<Gen<Command<State, ColumnFamilyStore, ?>>, Integer> 
possible = new HashMap<>();
+                Map<Gen<Command<State, ColumnFamilyStore, ?>>, Integer> 
possible = new LinkedHashMap<>();
                 possible.put(ignore -> FLUSH, 1);
                 possible.put(ignore -> COMPACT, 1);
                 possible.put(rs -> {
@@ -139,9 +140,9 @@ public class RouteIndexTest extends CQLTester.InMemory
                 if (!state.storeToTableToRangesToTxns.isEmpty())
                 {
                     possible.put(rs -> {
-                        int storeId = 
rs.pick(state.storeToTableToRangesToTxns.keySet());
+                        int storeId = 
rs.pickUnorderedSet(state.storeToTableToRangesToTxns.keySet());
                         var tables = 
state.storeToTableToRangesToTxns.get(storeId);
-                        TableId tableId = rs.pick(tables.keySet());
+                        TableId tableId = rs.pickUnorderedSet(tables.keySet());
                         var ranges = tables.get(tableId);
                         TreeSet<TokenRange> distinctRanges = 
ranges.stream().map(Map.Entry::getKey).collect(Collectors.toCollection(() -> 
new TreeSet<>(TokenRange::compareTo)));
                         TokenRange range;
@@ -154,14 +155,14 @@ public class RouteIndexTest extends CQLTester.InMemory
                             switch (rs.nextInt(0, 2))
                             {
                                 case 0: // perfect match
-                                    range = rs.pick(distinctRanges);
+                                    range = rs.pickOrderedSet(distinctRanges);
                                     break;
                                 case 1: // mutli-match
                                 {
-                                    TokenRange a = rs.pick(distinctRanges);
-                                    TokenRange b = rs.pick(distinctRanges);
+                                    TokenRange a = 
rs.pickOrderedSet(distinctRanges);
+                                    TokenRange b = 
rs.pickOrderedSet(distinctRanges);
                                     while (a.equals(b))
-                                        b = rs.pick(distinctRanges);
+                                        b = rs.pickOrderedSet(distinctRanges);
                                     if (b.compareTo(a) < 0)
                                     {
                                         TokenRange tmp = a;
diff --git a/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java 
b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
new file mode 100644
index 0000000000..ed24832974
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class SimulatedMessageDelivery implements MessageDelivery
+{
+    public enum Action { DELIVER, DELIVER_WITH_FAILURE, DROP, 
DROP_PARTITIONED, FAILURE }
+
+    public interface ActionSupplier
+    {
+        Action get(InetAddressAndPort self, Message<?> message, 
InetAddressAndPort to);
+    }
+
+    public interface Scheduler
+    {
+        void schedule(Runnable command, long delay, TimeUnit unit);
+    }
+
+    public interface DropListener
+    {
+        void onDrop(Action action, InetAddressAndPort from, Message<?> msg);
+    }
+
+    private final InetAddressAndPort self;
+    private final ActionSupplier actions;
+    private final BiConsumer<InetAddressAndPort, Message<?>> reciever;
+    private final DropListener onDropped;
+    private final Scheduler scheduler;
+    private final Consumer<Throwable> onError;
+    private final Map<CallbackKey, CallbackContext> callbacks = new 
HashMap<>();
+    private enum Status { Up, Down}
+    private Status status = Status.Up;
+
+    public SimulatedMessageDelivery(InetAddressAndPort self,
+                                    ActionSupplier actions,
+                                    BiConsumer<InetAddressAndPort, Message<?>> 
reciever,
+                                    DropListener onDropped,
+                                    Scheduler scheduler,
+                                    Consumer<Throwable> onError)
+    {
+        this.self = self;
+        this.actions = actions;
+        this.reciever = reciever;
+        this.onDropped = onDropped;
+        this.scheduler = scheduler;
+        this.onError = onError;
+    }
+
+    public void stop()
+    {
+        callbacks.clear();
+        status = Status.Down;
+    }
+
+    @Override
+    public <REQ> void send(Message<REQ> message, InetAddressAndPort to)
+    {
+        message = message.withFrom(self);
+        maybeEnqueue(message, to, null);
+    }
+
+    @Override
+    public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb)
+    {
+        message = message.withFrom(self);
+        maybeEnqueue(message, to, cb);
+    }
+
+    @Override
+    public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection)
+    {
+        message = message.withFrom(self);
+        maybeEnqueue(message, to, cb);
+    }
+
+    @Override
+    public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to)
+    {
+        AsyncPromise<Message<RSP>> promise = new AsyncPromise<>();
+        sendWithCallback(message, to, new RequestCallback<RSP>()
+        {
+            @Override
+            public void onResponse(Message<RSP> msg)
+            {
+                promise.trySuccess(msg);
+            }
+
+            @Override
+            public void onFailure(InetAddressAndPort from, RequestFailure 
failure)
+            {
+                promise.tryFailure(new 
MessagingService.FailureResponseException(from, failure.reason));
+            }
+
+            @Override
+            public boolean invokeOnFailure()
+            {
+                return true;
+            }
+        });
+        return promise;
+    }
+
+    @Override
+    public <V> void respond(V response, Message<?> message)
+    {
+        send(message.responseWith(response), message.respondTo());
+    }
+
+    private <REQ, RSP> void maybeEnqueue(Message<REQ> message, 
InetAddressAndPort to, @Nullable RequestCallback<RSP> callback)
+    {
+        if (status != Status.Up)
+            return;
+        CallbackContext cb;
+        if (callback != null)
+        {
+            CallbackKey key = new CallbackKey(message.id(), to);
+            if (callbacks.containsKey(key))
+                throw new AssertionError("Message id " + message.id() + " to " 
+ to + " already has a callback");
+            cb = new CallbackContext(callback);
+            callbacks.put(key, cb);
+        }
+        else
+        {
+            cb = null;
+        }
+        Action action = actions.get(self, message, to);
+        switch (action)
+        {
+            case DELIVER:
+                reciever.accept(to, message);
+                break;
+            case DROP:
+            case DROP_PARTITIONED:
+                onDropped.onDrop(action, to, message);
+                break;
+            case DELIVER_WITH_FAILURE:
+                reciever.accept(to, message);
+            case FAILURE:
+                if (action == Action.FAILURE)
+                    onDropped.onDrop(action, to, message);
+                if (callback != null)
+                    scheduler.schedule(() -> callback.onFailure(to, 
RequestFailure.UNKNOWN),
+                                       message.verb().expiresAfterNanos(), 
TimeUnit.NANOSECONDS);
+                return;
+            default:
+                throw new UnsupportedOperationException("Unknown action type: 
" + action);
+        }
+        if (cb != null)
+        {
+            scheduler.schedule(() -> {
+                CallbackContext ctx = callbacks.remove(new 
CallbackKey(message.id(), to));
+                if (ctx != null)
+                {
+                    assert ctx == cb;
+                    try
+                    {
+                        ctx.onFailure(to, RequestFailure.TIMEOUT);
+                    }
+                    catch (Throwable t)
+                    {
+                        onError.accept(t);
+                    }
+                }
+            }, message.verb().expiresAfterNanos(), TimeUnit.NANOSECONDS);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    public SimulatedMessageReceiver reciver(IVerbHandler onMessage)
+    {
+        return new SimulatedMessageReceiver(onMessage);
+    }
+
+    public class SimulatedMessageReceiver
+    {
+        @SuppressWarnings("rawtypes")
+        final IVerbHandler onMessage;
+
+        @SuppressWarnings("rawtypes")
+        public SimulatedMessageReceiver(IVerbHandler onMessage)
+        {
+            this.onMessage = onMessage;
+        }
+
+        public void recieve(Message<?> msg)
+        {
+            if (status != Status.Up)
+                return;
+            if (msg.verb().isResponse())
+            {
+                CallbackKey key = new CallbackKey(msg.id(), msg.from());
+                if (callbacks.containsKey(key))
+                {
+                    CallbackContext callback = callbacks.remove(key);
+                    if (callback == null)
+                        return;
+                    try
+                    {
+                        if (msg.isFailureResponse())
+                            callback.onFailure(msg.from(), (RequestFailure) 
msg.payload);
+                        else callback.onResponse(msg);
+                    }
+                    catch (Throwable t)
+                    {
+                        onError.accept(t);
+                    }
+                }
+            }
+            else
+            {
+                try
+                {
+                    //noinspection unchecked
+                    onMessage.doVerb(msg);
+                }
+                catch (Throwable t)
+                {
+                    onError.accept(t);
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static class SimpleVerbHandler implements IVerbHandler
+    {
+        private final Map<Verb, IVerbHandler<?>> handlers;
+
+        public SimpleVerbHandler(Map<Verb, IVerbHandler<?>> handlers)
+        {
+            this.handlers = handlers;
+        }
+
+        @Override
+        public void doVerb(Message msg) throws IOException
+        {
+            IVerbHandler<?> handler = handlers.get(msg.verb());
+            if (handler == null)
+                throw new AssertionError("Unexpected verb: " + msg.verb());
+            //noinspection unchecked
+            handler.doVerb(msg);
+        }
+    }
+
+    private static class CallbackContext
+    {
+        @SuppressWarnings("rawtypes")
+        final RequestCallback callback;
+
+        @SuppressWarnings("rawtypes")
+        private CallbackContext(RequestCallback callback)
+        {
+            this.callback = Objects.requireNonNull(callback);
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public void onResponse(Message msg)
+        {
+            callback.onResponse(msg);
+        }
+
+        public void onFailure(InetAddressAndPort from, RequestFailure failure)
+        {
+            if (callback.invokeOnFailure()) callback.onFailure(from, failure);
+        }
+    }
+
+    private static class CallbackKey
+    {
+        private final long id;
+        private final InetAddressAndPort peer;
+
+        private CallbackKey(long id, InetAddressAndPort peer)
+        {
+            this.id = id;
+            this.peer = peer;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            CallbackKey that = (CallbackKey) o;
+            return id == that.id && peer.equals(that.peer);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(id, peer);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "CallbackKey{" +
+                   "id=" + id +
+                   ", peer=" + peer +
+                   '}';
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java 
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index c97fc8b28a..7cd6b5f3ab 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -84,7 +83,6 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
-import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.HeartBeatState;
@@ -98,12 +96,12 @@ import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.net.ConnectionType;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.SimulatedMessageDelivery;
+import 
org.apache.cassandra.net.SimulatedMessageDelivery.SimulatedMessageReceiver;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.repair.messages.ValidationResponse;
@@ -139,6 +137,7 @@ import 
org.apache.cassandra.streaming.StreamingDataInputPlus;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tools.nodetool.Repair;
 import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.apache.cassandra.utils.AccordGenerators;
 import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.Closeable;
@@ -149,8 +148,6 @@ import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 import org.assertj.core.api.Assertions;
@@ -688,7 +685,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
         {
             ClockAccess.includeThreadAsOwner();
             this.rs = rs;
-            globalExecutor = new SimulatedExecutorFactory(rs, 
fromQT(Generators.TIMESTAMP_GEN.map(Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs));
+            globalExecutor = new SimulatedExecutorFactory(rs);
             orderedExecutor = 
globalExecutor.configureSequential("ignore").build();
             unorderedScheduled = globalExecutor.scheduled("ignored");
 
@@ -802,169 +799,23 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             }
         }
 
-        private class CallbackContext
+        private SimulatedMessageDelivery.Action action(InetAddressAndPort 
self, Message<?> msg, InetAddressAndPort to)
         {
-            final RequestCallback callback;
-
-            private CallbackContext(RequestCallback callback)
-            {
-                this.callback = Objects.requireNonNull(callback);
-            }
-
-            public void onResponse(Message msg)
-            {
-                callback.onResponse(msg);
-            }
-
-            public void onFailure(InetAddressAndPort from, RequestFailure 
failure)
-            {
-                if (callback.invokeOnFailure()) callback.onFailure(from, 
failure);
-            }
+            boolean toSelf = self.equals(to);
+            Node node = nodes.get(to);
+            Set<Faults> allowedFaults = allowedMessageFaults.apply(node, msg);
+            if (allowedFaults.contains(Faults.DROP) && !toSelf && 
networkDrops(self, to)) return SimulatedMessageDelivery.Action.DROP;
+            return SimulatedMessageDelivery.Action.DELIVER;
         }
 
-        private static class CallbackKey
+        private boolean networkDrops(InetAddressAndPort self, 
InetAddressAndPort to)
         {
-            private final long id;
-            private final InetAddressAndPort peer;
-
-            private CallbackKey(long id, InetAddressAndPort peer)
-            {
-                this.id = id;
-                this.peer = peer;
-            }
-
-            @Override
-            public boolean equals(Object o)
-            {
-                if (this == o) return true;
-                if (o == null || getClass() != o.getClass()) return false;
-                CallbackKey that = (CallbackKey) o;
-                return id == that.id && peer.equals(that.peer);
-            }
-
-            @Override
-            public int hashCode()
-            {
-                return Objects.hash(id, peer);
-            }
-
-            @Override
-            public String toString()
-            {
-                return "CallbackKey{" +
-                       "id=" + id +
-                       ", peer=" + peer +
-                       '}';
-            }
+            return networkDrops.computeIfAbsent(new Connection(self, to), 
ignore -> Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 100.0D, 
rs.nextInt(3, 15)).asSupplier(rs)).get();
         }
 
-        private class Messaging implements MessageDelivery
-        {
-            final InetAddressAndPort broadcastAddressAndPort;
-            final Map<CallbackKey, CallbackContext> callbacks = new 
HashMap<>();
-
-            private Messaging(InetAddressAndPort broadcastAddressAndPort)
+            private long networkJitterNanos(InetAddressAndPort self, 
InetAddressAndPort to)
             {
-                this.broadcastAddressAndPort = broadcastAddressAndPort;
-            }
-
-            @Override
-            public <REQ> void send(Message<REQ> message, InetAddressAndPort to)
-            {
-                message = message.withFrom(broadcastAddressAndPort);
-                maybeEnqueue(message, to, null);
-            }
-
-            @Override
-            public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb)
-            {
-                message = message.withFrom(broadcastAddressAndPort);
-                maybeEnqueue(message, to, cb);
-            }
-
-            @Override
-            public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection)
-            {
-                message = message.withFrom(broadcastAddressAndPort);
-                maybeEnqueue(message, to, cb);
-            }
-
-            private <REQ, RSP> void maybeEnqueue(Message<REQ> message, 
InetAddressAndPort to, @Nullable RequestCallback<RSP> callback)
-            {
-                CallbackContext cb;
-                if (callback != null)
-                {
-                    CallbackKey key = new CallbackKey(message.id(), to);
-                    if (callbacks.containsKey(key))
-                        throw new AssertionError("Message id " + message.id() 
+ " to " + to + " already has a callback");
-                    cb = new CallbackContext(callback);
-                    callbacks.put(key, cb);
-                }
-                else
-                {
-                    cb = null;
-                }
-                boolean toSelf = this.broadcastAddressAndPort.equals(to);
-                Node node = nodes.get(to);
-                Set<Faults> allowedFaults = allowedMessageFaults.apply(node, 
message);
-                if (allowedFaults.isEmpty())
-                {
-                    // enqueue so stack overflow doesn't happen with the 
inlining
-                    unorderedScheduled.submit(() -> node.handle(message));
-                }
-                else
-                {
-                    Runnable enqueue = () -> {
-                        if (!allowedFaults.contains(Faults.DELAY))
-                        {
-                            unorderedScheduled.submit(() -> 
node.handle(message));
-                        }
-                        else
-                        {
-                            if (toSelf) unorderedScheduled.submit(() -> 
node.handle(message));
-                            else
-                                unorderedScheduled.schedule(() -> 
node.handle(message), networkJitterNanos(to), TimeUnit.NANOSECONDS);
-                        }
-                    };
-
-                    if (!allowedFaults.contains(Faults.DROP)) enqueue.run();
-                    else
-                    {
-                        if (!toSelf && networkDrops(to))
-                        {
-//                            logger.warn("Dropped message {}", message);
-                            // drop
-                        }
-                        else
-                        {
-                            enqueue.run();
-                        }
-                    }
-
-                    if (cb != null)
-                    {
-                        unorderedScheduled.schedule(() -> {
-                            CallbackContext ctx = callbacks.remove(new 
CallbackKey(message.id(), to));
-                            if (ctx != null)
-                            {
-                                assert ctx == cb;
-                                try
-                                {
-                                    ctx.onFailure(to, RequestFailure.TIMEOUT);
-                                }
-                                catch (Throwable t)
-                                {
-                                    failures.add(t);
-                                }
-                            }
-                        }, message.verb().expiresAfterNanos(), 
TimeUnit.NANOSECONDS);
-                    }
-                }
-            }
-
-            private long networkJitterNanos(InetAddressAndPort to)
-            {
-                return networkLatencies.computeIfAbsent(new 
Connection(broadcastAddressAndPort, to), ignore -> {
+                return networkLatencies.computeIfAbsent(new Connection(self, 
to), ignore -> {
                     long min = TimeUnit.MICROSECONDS.toNanos(500);
                     long maxSmall = TimeUnit.MILLISECONDS.toNanos(5);
                     long max = TimeUnit.SECONDS.toNanos(5);
@@ -974,42 +825,23 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 }).getAsLong();
             }
 
-            private boolean networkDrops(InetAddressAndPort to)
-            {
-                return networkDrops.computeIfAbsent(new 
Connection(broadcastAddressAndPort, to), ignore -> 
Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 
15)).asSupplier(rs)).get();
-            }
-
-            @Override
-            public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to)
-            {
-                AsyncPromise<Message<RSP>> promise = new AsyncPromise<>();
-                sendWithCallback(message, to, new RequestCallback<RSP>()
-                {
-                    @Override
-                    public void onResponse(Message<RSP> msg)
-                    {
-                        promise.trySuccess(msg);
-                    }
-
-                    @Override
-                    public void onFailure(InetAddressAndPort from, 
RequestFailure failure)
-                    {
-                        promise.tryFailure(new 
MessagingService.FailureResponseException(from, failure.reason));
-                    }
-
-                    @Override
-                    public boolean invokeOnFailure()
-                    {
-                        return true;
-                    }
-                });
-                return promise;
-            }
-
-            @Override
-            public <V> void respond(V response, Message<?> message)
+        private class Messaging extends SimulatedMessageDelivery
+        {
+            private Messaging(InetAddressAndPort broadcastAddressAndPort)
             {
-                send(message.responseWith(response), message.respondTo());
+                super(broadcastAddressAndPort,
+                      Cluster.this::action,
+                      (to, msg) -> {
+                          Node node = nodes.get(to);
+                          Set<Faults> allowedFaults = 
allowedMessageFaults.apply(node, msg);
+                          if (!allowedFaults.contains(Faults.DELAY) || 
broadcastAddressAndPort.equals(to))
+                              unorderedScheduled.submit(() -> 
node.handle(msg));
+                          else
+                              unorderedScheduled.schedule(() -> 
node.handle(msg), networkJitterNanos(broadcastAddressAndPort, to), 
TimeUnit.NANOSECONDS);
+                      },
+                      (action, to, msg) -> logger.warn("{} message {}", 
action, msg),
+                      unorderedScheduled::schedule,
+                      failures::add);
             }
         }
 
@@ -1059,7 +891,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             final InetAddressAndPort addressAndPort;
             final Collection<Token> tokens;
             final ActiveRepairService activeRepairService;
-            final IVerbHandler verbHandler;
+            final SimulatedMessageReceiver receiver;
             final Messaging messaging;
             final IValidationManager validationManager;
             private FailingBiConsumer<ColumnFamilyStore, Validator> 
doValidation = DEFAULT_VALIDATION;
@@ -1093,7 +925,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                         validator.fail(e);
                     }
                 });
-                this.verbHandler = new IVerbHandler<>()
+                this.receiver = messaging.reciver(new IVerbHandler<>()
                 {
                     private final RepairMessageVerbHandler repairVerbHandler = 
new RepairMessageVerbHandler(Node.this);
                     private final 
IVerbHandler<PaxosStartPrepareCleanup.Request> paxosStartPrepareCleanup = 
PaxosStartPrepareCleanup.createVerbHandler(Node.this);
@@ -1125,7 +957,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                                 repairVerbHandler.doVerb(message);
                         }
                     }
-                };
+                });
 
                 activeRepairService.start();
             }
@@ -1165,38 +997,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 }
                 for (MessageListener l : listeners)
                     l.preHandle(this, msg);
-                if (msg.verb().isResponse())
-                {
-                    // handle callbacks
-                    CallbackKey key = new CallbackKey(msg.id(), msg.from());
-                    if (messaging.callbacks.containsKey(key))
-                    {
-                        CallbackContext callback = 
messaging.callbacks.remove(key);
-                        if (callback == null)
-                            return;
-                        try
-                        {
-                            if (msg.isFailureResponse())
-                                callback.onFailure(msg.from(), 
(RequestFailure) msg.payload);
-                            else callback.onResponse(msg);
-                        }
-                        catch (Throwable t)
-                        {
-                            failures.add(t);
-                        }
-                    }
-                }
-                else
-                {
-                    try
-                    {
-                        verbHandler.doVerb(msg);
-                    }
-                    catch (Throwable e)
-                    {
-                        failures.add(e);
-                    }
-                }
+                receiver.recieve(msg);
             }
 
             public UUID hostId()
@@ -1379,10 +1180,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
 
     private static <T> Gen<T> fromQT(org.quicktheories.core.Gen<T> qt)
     {
-        return rs -> {
-            JavaRandom r = new JavaRandom(rs.asJdkRandom());
-            return qt.generate(r);
-        };
+        return AccordGenerators.fromQT(qt);
     }
 
     public static class HackStrat extends LocalStrategy
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index 7b77791c6e..2f689187ac 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -40,6 +40,7 @@ import accord.topology.Shard;
 import accord.topology.Topology;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -176,7 +177,7 @@ public class AccordConfigurationServiceTest
     @Test
     public void initialEpochTest() throws Throwable
     {
-        AccordConfigurationService service = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector());
+        AccordConfigurationService service = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), 
AccordConfigurationService.SystemTableDiskStateManager.instance, 
ScheduledExecutors.scheduledTasks);
         Assert.assertEquals(null, AccordKeyspace.loadEpochDiskState());
         service.start();
         Assert.assertEquals(null, AccordKeyspace.loadEpochDiskState());
@@ -201,7 +202,7 @@ public class AccordConfigurationServiceTest
     @Test
     public void loadTest() throws Throwable
     {
-        AccordConfigurationService service = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector());
+        AccordConfigurationService service = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), 
AccordConfigurationService.SystemTableDiskStateManager.instance, 
ScheduledExecutors.scheduledTasks);
         service.start();
 
         Topology topology1 = new Topology(1, new 
Shard(AccordTopology.fullRange(TBL1), ID_LIST, ID_SET));
@@ -221,7 +222,7 @@ public class AccordConfigurationServiceTest
         service.reportTopology(topology3);
         service.acknowledgeEpoch(EpochReady.done(3), true);
 
-        AccordConfigurationService loaded = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector());
+        AccordConfigurationService loaded = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), 
AccordConfigurationService.SystemTableDiskStateManager.instance, 
ScheduledExecutors.scheduledTasks);
         
loaded.updateMapping(mappingForEpoch(ClusterMetadata.current().epoch.getEpoch() 
+ 1));
         AbstractConfigurationServiceTest.TestListener listener = new 
AbstractConfigurationServiceTest.TestListener(loaded, true);
         loaded.registerListener(listener);
@@ -240,7 +241,7 @@ public class AccordConfigurationServiceTest
     @Test
     public void truncateTest()
     {
-        AccordConfigurationService service = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector());
+        AccordConfigurationService service = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), 
AccordConfigurationService.SystemTableDiskStateManager.instance, 
ScheduledExecutors.scheduledTasks);
         TestListener serviceListener = new TestListener(service, true);
         service.registerListener(serviceListener);
         service.start();
@@ -258,7 +259,7 @@ public class AccordConfigurationServiceTest
         Assert.assertEquals(EpochDiskState.create(3), service.diskState());
         serviceListener.assertTruncates(3L);
 
-        AccordConfigurationService loaded = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector());
+        AccordConfigurationService loaded = new 
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), 
AccordConfigurationService.SystemTableDiskStateManager.instance, 
ScheduledExecutors.scheduledTasks);
         
loaded.updateMapping(mappingForEpoch(ClusterMetadata.current().epoch.getEpoch() 
+ 1));
         TestListener loadListener = new TestListener(loaded, true);
         loaded.registerListener(loadListener);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
index 55d1e7e117..9c613624fe 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
@@ -186,7 +186,7 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
                 // else this will loop forever...
                 for (int attempt = 0; attempt < 10; attempt++)
                 {
-                    TableId tableId = rs.pick(tables.keySet());
+                    TableId tableId = 
rs.pickOrderedSet(tables.navigableKeySet());
                     IPartitioner partitioner = tables.get(tableId);
                     ByteBuffer data = !(partitioner instanceof 
LocalPartitioner) ? Int32Type.instance.decompose(rs.nextInt())
                                                                                
  : fromQT(getTypeSupport(partitioner.getTokenValidator()).bytesGen()).next(rs);
@@ -258,7 +258,7 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
 
                 for (int i = 0, queries = rs.nextInt(1, 5); i < queries; i++)
                 {
-                    int store = rs.pick(storesToKeys.keySet());
+                    int store = 
rs.pickOrderedSet(storesToKeys.navigableKeySet());
                     var keysForStore = new 
ArrayList<>(storesToKeys.get(store));
 
                     int offset;
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
index f57a3c1238..12d5c75c01 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
@@ -60,6 +60,9 @@ import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.HeartBeatState;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -94,6 +97,10 @@ public class AccordSyncPropagatorTest
         Gen<Ranges> rangesGen = AccordGenerators.ranges().filter(r -> 
!r.isEmpty());
         Gen<List<Node.Id>> nodesGen = 
Gens.lists(AccordGens.nodes()).unique().ofSizeBetween(1, 40);
         qt().withExamples(100).check(rs -> {
+            // when gossip and cluster metadata don't know an endpoint, 
retries are avoided (node removed)
+            // so when instances are created here they are added to gossip to 
trick the membership check...
+            Gossiper.instance.clearUnsafe();
+
             List<Node.Id> nodes = nodesGen.next(rs);
             Set<Node.Id> nodesAsSet = ImmutableSet.copyOf(nodes);
 
@@ -214,6 +221,7 @@ public class AccordSyncPropagatorTest
                 Sink sink = new Sink(id);
                 IFailureDetector fd = new FailureDetector(address);
                 instances.put(id, new Instace(id, address, cs, sink, fd, cs, 
new AccordSyncPropagator(id, Cluster.this, sink, fd, scheduler, cs)));
+                Gossiper.instance.endpointStateMap.put(address, new 
EndpointState(HeartBeatState.empty()));
             }
             this.nodeToAddress = nodeToAddress.build();
             this.instances = instances.build();
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java 
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
new file mode 100644
index 0000000000..b0e2fec47d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -0,0 +1,753 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.api.ConfigurationService.EpochReady;
+import accord.impl.SizeOfIntersectionSorter;
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.Topology;
+import accord.topology.TopologyManager;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property.Command;
+import accord.utils.Property.Commands;
+import accord.utils.Property.UnitCommand;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.SimulatedExecutorFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.SimulatedMessageDelivery;
+import org.apache.cassandra.net.SimulatedMessageDelivery.Action;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.Tables;
+import 
org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.StubClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.Pair;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.stateful;
+
+public class EpochSyncTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(EpochSyncTest.class);
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        
ClusterMetadataService.setInstance(StubClusterMetadataService.forTesting());
+    }
+
+    @Test
+    public void test()
+    {
+        stateful().withExamples(50).check(new Commands<Cluster, Void>()
+        {
+            @Override
+            public Gen<Cluster> genInitialState()
+            {
+                return Cluster::new;
+            }
+
+            @Override
+            public Void createSut(Cluster Cluster)
+            {
+                return null;
+            }
+
+            @Override
+            public Gen<Command<Cluster, Void, ?>> commands(Cluster cluster)
+            {
+                List<Node.Id> alive = cluster.alive();
+                Map<Gen<Command<Cluster, Void, ?>>, Integer> possible = new 
LinkedHashMap<>();
+                if (alive.size() < cluster.maxNodes)
+                {
+                    // add node
+                    possible.put(rs -> {
+                        Node.Id id = new Node.Id(++cluster.nodeCounter);
+                        long token = cluster.tokenGen.nextLong(rs);
+                        while (cluster.tokens.contains(token))
+                            token = cluster.tokenGen.nextLong(rs);
+                        long epoch = cluster.current.epoch.getEpoch() + 1;
+                        long finalToken = token;
+                        return new SimpleCommand("Add Node " + id + "; token=" 
+ token + ", epoch=" + epoch,
+                                                 c -> c.addNode(id, 
finalToken));
+                    }, 5);
+                }
+                if (alive.size() > cluster.minNodes)
+                {
+                    possible.put(rs -> {
+                        Node.Id pick = rs.pick(alive);
+                        long token = cluster.instances.get(pick).token;
+                        long epoch = cluster.current.epoch.getEpoch() + 1;
+                        return new SimpleCommand("Remove Node " + pick + "; 
token=" + token + "; epoch=" + epoch, c -> c.removeNode(pick));
+                    }, 3);
+                }
+                if (cluster.hasWork())
+                {
+                    possible.put(rs -> new SimpleCommand("Process Some",
+                                                         c -> {//noinspection 
StatementWithEmptyBody
+                                                             for (int i = 0, 
attempts = rs.nextInt(1, 100); i < attempts && c.processOne(); i++)
+                                                             {
+                                                             }
+                                                         }), 10);
+                }
+
+                possible.put(rs -> new SimpleCommand("Validate",
+                                                     c -> c.validate(false)), 
1);
+                possible.put(rs -> new SimpleCommand("Bump Epoch " + 
(cluster.current.epoch.getEpoch() + 1),
+                                                     Cluster::bumpEpoch), 10);
+                return Gens.oneOf(possible);
+            }
+
+            @Override
+            public void destroyState(Cluster cluster)
+            {
+                cluster.processAll();
+                cluster.validate(true);
+            }
+        });
+    }
+
+    private static class SimpleCommand implements UnitCommand<Cluster, Void>
+    {
+        private final String name;
+        private final Consumer<Cluster> fn;
+
+        private SimpleCommand(String name, Consumer<Cluster> fn)
+        {
+            this.name = name;
+            this.fn = fn;
+        }
+
+        @Override
+        public String detailed(Cluster Cluster)
+        {
+            return name;
+        }
+
+        @Override
+        public void applyUnit(Cluster Cluster)
+        {
+            fn.accept(Cluster);
+        }
+
+        @Override
+        public void runUnit(Void Void)
+        {
+            
+        }
+    }
+
+    private static class Cluster
+    {
+        private static final int rf = 2;
+        private static final ReplicationParams replication_params = 
ReplicationParams.simple(rf);
+        private static final ReplicationParams meta = 
ReplicationParams.simpleMeta(1, Collections.singleton("dc1"));
+
+        private final RandomSource rs;
+        private final int minNodes, maxNodes;
+        private final Gen.LongGen tokenGen;
+        private final SortedSet<Long> tokens = new TreeSet<>();
+        private final Map<Node.Id, Instance> instances = new HashMap<>();
+        private final Set<Node.Id> removed = new HashSet<>();
+        private final List<Throwable> failures = new ArrayList<>();
+        private final SimulatedExecutorFactory globalExecutor;
+        private final ScheduledExecutorPlus scheduler;
+        private int nodeCounter = 0;
+        private ClusterMetadata current = new 
ClusterMetadata(Murmur3Partitioner.instance, Directory.EMPTY,
+                                                              new 
DistributedSchema(Keyspaces.of(
+                                                              
DistributedMetadataLogKeyspace.initialMetadata(Collections.singleton("dc1")),
+                                                              
KeyspaceMetadata.create("test", KeyspaceParams.simple(rf), 
Tables.of(TableMetadata.minimal("test", 
"tb1").unbuild().params(TableParams.builder().transactionalMode(TransactionalMode.full).build()).build())))));
+        private final IFailureDetector fd = new IFailureDetector()
+        {
+            @Override
+            public boolean isAlive(InetAddressAndPort ep)
+            {
+                return !removed.contains(nodeId(ep));
+            }
+
+            @Override
+            public void interpret(InetAddressAndPort ep)
+            {
+
+            }
+
+            @Override
+            public void report(InetAddressAndPort ep)
+            {
+
+            }
+
+            @Override
+            public void remove(InetAddressAndPort ep)
+            {
+
+            }
+
+            @Override
+            public void forceConviction(InetAddressAndPort ep)
+            {
+
+            }
+
+            @Override
+            public void 
registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+            {
+
+            }
+
+            @Override
+            public void 
unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+            {
+
+            }
+        };
+
+        private static InetAddressAndPort address(Node.Id id)
+        {
+            try
+            {
+                return 
InetAddressAndPort.getByAddress(ByteArrayUtil.bytes(id.id));
+            }
+            catch (UnknownHostException e)
+            {
+                throw new AssertionError("Unable to create address for id " + 
id, e);
+            }
+        }
+
+        public enum EpochTracker { topologyManager, accordSyncPropagator, 
configurationService}
+
+        Set<EpochTracker> globalSynced(long epoch)
+        {
+            return alive().stream()
+                   .filter(n -> instances.get(n).epoch.getEpoch() <= epoch)
+                   .map(n -> instances.get(n).synced(epoch))
+                   .reduce(EnumSet.allOf(EpochTracker.class), 
Sets::intersection);
+        }
+
+        boolean allSynced(long epoch)
+        {
+            Set<EpochTracker> done = globalSynced(epoch);
+            return done.contains(EpochTracker.topologyManager);
+        }
+
+        private static Node.Id nodeId(InetAddressAndPort address)
+        {
+            return new Node.Id(ByteArrayUtil.getInt(address.addressBytes));
+        }
+
+        public Cluster(RandomSource rs)
+        {
+            this.rs = rs;
+            this.minNodes = 3;
+            this.maxNodes = 10;
+            this.tokenGen = rs2 -> rs2.nextLong(Long.MIN_VALUE + 1, 
Long.MAX_VALUE);
+
+            this.globalExecutor = new SimulatedExecutorFactory(rs, 
failures::add);
+            this.scheduler = globalExecutor.scheduled("ignored");
+            Stage.MISC.unsafeSetExecutor(scheduler);
+
+            scheduler.scheduleWithFixedDelay(() -> {
+                if (aliveCount() < 2) return;
+                if (!partitions.isEmpty() && rs.nextBoolean())
+                {
+                    // remove partition
+                    if (partitions.size() == 1)
+                    {
+                        partitions.clear();
+                        return;
+                    }
+                    partitions.remove(rs.pickOrderedSet(partitions));
+                }
+                else
+                {
+                    // add partition
+                    List<Node.Id> alive = alive();
+                    InetAddressAndPort a = address(rs.pick(alive));
+                    InetAddressAndPort b = address(rs.pick(alive));
+                    while (a.equals(b))
+                        b = address(rs.pick(alive));
+                    partitions.add(new Connection(a, b));
+                }
+            }, 1, 1, TimeUnit.MINUTES);
+        }
+
+        void validate(boolean isDone)
+        {
+            for (Node.Id id : alive())
+            {
+                Instance inst = instances.get(id);
+                if (removed.contains(id)) continue; // ignore removed nodes
+                AccordConfigurationService conf = inst.config;
+                TopologyManager tm = inst.topology;
+                for (long epoch = inst.epoch.getEpoch(); epoch <= 
current.epoch.getEpoch(); epoch++)
+                {
+                    // validate config
+                    EpochSnapshot snapshot = conf.getEpochSnapshot(epoch);
+                    if (isDone)
+                    {
+                        Assertions.assertThat(snapshot).describedAs("node%s 
does not have epoch %d", id, epoch).isNotNull();
+                        
Assertions.assertThat(snapshot.syncStatus).isEqualTo(AccordConfigurationService.SyncStatus.COMPLETED);
+
+                        // validate topology manager
+                        
Assertions.assertThat(tm.hasEpoch(epoch)).describedAs("node%s does not have 
epoch %d", id, epoch).isTrue();
+                        Ranges ranges = 
tm.globalForEpoch(epoch).ranges().mergeTouching();
+                        Ranges actual = tm.syncComplete(epoch).mergeTouching();
+                        Assertions.assertThat(actual).describedAs("node%s does 
not have all expected sync ranges for epoch %d; missing %s", id, epoch, 
ranges.subtract(actual)).isEqualTo(ranges);
+                    }
+                    else
+                    {
+                        if (snapshot == null || snapshot.syncStatus != 
AccordConfigurationService.SyncStatus.COMPLETED) continue;
+
+                        if (!allSynced(epoch))
+                            continue;
+
+                        
Assertions.assertThat(tm.hasEpoch(epoch)).describedAs("node%s does not have 
epoch %d", id, epoch).isTrue();
+                        Topology topology = tm.globalForEpoch(epoch);
+                        Ranges ranges = topology.ranges().mergeTouching();
+                        Ranges actual = tm.syncComplete(epoch).mergeTouching();
+                        // TopologyManager defines syncComplete for an epoch 
as (epoch - 1).syncComplete.  This means that an epoch has reached quorum, but 
will still miss ranges as previous epochs have not
+                        if (!ranges.equals(actual) && tm.minEpoch() != epoch 
&& !ranges.equals(tm.syncComplete(epoch - 1).mergeTouching()))
+                            continue;
+                        Assertions.assertThat(actual)
+                                  .describedAs("node%s does not have all 
expected sync ranges for epoch %d; missing %s; peers=%s; previous epochs %s", 
id, epoch, ranges.subtract(actual), topology.nodes(),
+                                               
LongStream.range(inst.epoch.getEpoch(), epoch + 1).mapToObj(e -> e + " -> " + 
conf.getEpochSnapshot(e).syncStatus + "(synced=" + globalSynced(e) + "): " + 
tm.syncComplete(e)).collect(Collectors.joining("\n")))
+                                  .isEqualTo(ranges);
+                    }
+                }
+            }
+        }
+
+        String displayTopology()
+        {
+            List<Node.Id> alive = alive();
+            List<Pair<Node.Id, Long>> withToken = new 
ArrayList<>(alive.size());
+            for (Node.Id n : alive)
+                withToken.add(Pair.create(n, instances.get(n).token));
+            withToken.sort(Comparator.comparing(a -> a.right));
+            StringBuilder sb = new StringBuilder();
+            for (var p : withToken)
+                sb.append(p.left).append('\t').append(p.right).append('\n');
+            return sb.toString();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Topology:\n" + displayTopology();
+        }
+
+        boolean hasWork()
+        {
+            return globalExecutor.hasWork();
+        }
+
+        boolean processOne()
+        {
+            boolean result = globalExecutor.processOne();
+            checkFailures();
+            return result;
+        }
+
+        @SuppressWarnings("StatementWithEmptyBody")
+        void processAll()
+        {
+            while (processOne())
+            {
+            }
+        }
+
+        public void checkFailures()
+        {
+            if (Thread.interrupted())
+                failures.add(new InterruptedException());
+            if (failures.isEmpty()) return;
+            AssertionError error = new AssertionError("Unexpected exceptions 
found");
+            failures.forEach(error::addSuppressed);
+            failures.clear();
+            throw error;
+        }
+
+        List<Node.Id> alive()
+        {
+            ArrayList<Node.Id> ids = new 
ArrayList<>(Sets.difference(instances.keySet(), removed));
+            ids.sort(Comparator.naturalOrder());
+            return ids;
+        }
+
+        int aliveCount()
+        {
+            return instances.size() - removed.size();
+        }
+
+        private final NavigableSet<Connection> partitions = new TreeSet<>();
+
+        private boolean partitioned(InetAddressAndPort self, 
InetAddressAndPort to)
+        {
+            return partitions.contains(new Connection(self, to));
+        }
+
+        private SimulatedMessageDelivery createMessaging(Node.Id id)
+        {
+            InetAddressAndPort address = address(id);
+            return new SimulatedMessageDelivery(address,
+                                                (self, msg, to) -> {
+                                                    if 
(removed.contains(nodeId(self)) || removed.contains(nodeId(to)))
+                                                        return Action.DROP;
+                                                    if (!self.equals(to) && 
partitioned(self, to))
+                                                        return 
Action.DROP_PARTITIONED;
+                                                    if (rs.decide(.01))
+                                                        return 
rs.nextBoolean() ? Action.DELIVER_WITH_FAILURE : Action.FAILURE;
+                                                    return Action.DELIVER;
+                                                },
+                                                (to, msg) -> 
instances.get(nodeId(to)).reciver.recieve(msg),
+                                                (action, to, msg) -> 
logger.warn("{} message {}", action, msg),
+                                                scheduler::schedule,
+                                                failures::add);
+        }
+
+        void addNode(Node.Id id, long token)
+        {
+            Invariants.checkState(!tokens.contains(token), "Attempted to add 
token %d for node %s but token is already taken", token, id);
+            Epoch epoch = Epoch.create(current.epoch.getEpoch() + 1);
+
+            Instance instance = new Instance(id, token, epoch, 
createMessaging(id), fd);
+            instances.put(id, instance);
+            tokens.add(token);
+
+            current = current.forceEpoch(epoch)
+                             .withPlacements(DataPlacements.builder(2)
+                                                           .with(meta, 
DataPlacement.empty())
+                                                           
.with(replication_params, rebuildPlacements(epoch))
+                                                           .build())
+                             .withDirectory(current.directory.with(new 
NodeAddresses(address(id)), new Location("dc1", "r1")));
+            notify(current);
+        }
+
+        void removeNode(Node.Id pick)
+        {
+            Instance inst = Objects.requireNonNull(instances.get(pick), 
"Unknown id " + pick);
+            Invariants.checkState(!removed.contains(pick), "Can not remove 
node twice; node " + pick);
+            tokens.remove(inst.token);
+            removed.add(pick);
+            inst.stop();
+            current = current.forceEpoch(Epoch.create(current.epoch.getEpoch() 
+ 1))
+                             .withDirectory(current.directory.without(new 
NodeId(pick.id)));
+
+            current = current.withPlacements(DataPlacements.builder(2)
+                                                           .with(meta, 
DataPlacement.empty())
+                                                           
.with(replication_params, rebuildPlacements(current.epoch))
+                                                           .build());
+            notify(current);
+        }
+
+        private DataPlacement rebuildPlacements(Epoch epoch)
+        {
+            DataPlacement.Builder builder = DataPlacement.builder();
+            for (Node.Id inst : alive())
+                for (Replica replica : instances.get(inst).replica())
+                    builder.withReadReplica(epoch, 
replica).withWriteReplica(epoch, replica);
+            return builder.build();
+        }
+
+        void bumpEpoch()
+        {
+            current = current.forceEpoch(Epoch.create(current.epoch.getEpoch() 
+ 1));
+            notify(current);
+        }
+
+        private void notify(ClusterMetadata current)
+        {
+            Ranges ranges = 
AccordTopology.createAccordTopology(current).ranges().mergeTouching();
+            if (!current.directory.isEmpty())
+                Assertions.assertThat(ranges).hasSize(1);
+            ((StubClusterMetadataService) 
ClusterMetadataService.instance()).setMetadata(current);
+            for (Node.Id id : alive())
+            {
+                Instance inst = instances.get(id);
+                inst.maybeStart();
+                inst.config.maybeReportMetadata(current);
+            }
+        }
+
+        @SuppressWarnings("SameParameterValue")
+        private <T> AsyncChain<T> schedule(long time, TimeUnit unit, 
Callable<T> task)
+        {
+            return new AsyncChains.Head<>()
+            {
+                @Override
+                protected void start(BiConsumer<? super T, Throwable> callback)
+                {
+                    scheduler.schedule(() -> {
+                        T value;
+                        try
+                        {
+                            value = task.call();
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                            return;
+                        }
+                        callback.accept(value, null);
+                    }, time, unit);
+                }
+            };
+        }
+
+        private enum Status { Init, Started}
+        private class Instance
+        {
+            private final Node.Id id;
+            private final long token;
+            private final AccordConfigurationService config;
+            private final SimulatedMessageDelivery messaging;
+            private final SimulatedMessageDelivery.SimulatedMessageReceiver 
reciver;
+            private final TopologyManager topology;
+            private final Epoch epoch;
+            private Status status = Status.Init;
+
+            Instance(Node.Id node, long token, Epoch epoch, 
SimulatedMessageDelivery messagingService, IFailureDetector failureDetector)
+            {
+                this.id = node;
+                this.token = token;
+                this.epoch = epoch;
+                this.topology = new 
TopologyManager(SizeOfIntersectionSorter.SUPPLIER, id);
+                AccordConfigurationService.DiskStateManager instance = 
MockDiskStateManager.instance;
+                config = new AccordConfigurationService(node, 
messagingService, failureDetector, instance, scheduler);
+                config.registerListener(new ConfigurationService.Listener()
+                {
+                    @Override
+                    public AsyncResult<Void> onTopologyUpdate(Topology 
topology, boolean startSync)
+                    {
+//                        EpochReady ready = EpochReady.done(topology.epoch());
+                        AsyncResult<Void> metadata = schedule(rs.nextInt(1, 
10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult();
+                        AsyncResult<Void> coordination = 
metadata.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, 
(Callable<Void>) () -> null)).beginAsResult();
+                        AsyncResult<Void> data = coordination.flatMap(ignore 
-> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> 
null)).beginAsResult();
+                        AsyncResult<Void> reads = data.flatMap(ignore -> 
schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> 
null)).beginAsResult();
+                        EpochReady ready = new EpochReady(topology.epoch(), 
metadata, coordination, data, reads);
+
+                        topology().onTopologyUpdate(topology, () -> ready);
+                        ready.coordination.addCallback(() -> 
topology().onEpochSyncComplete(id, topology.epoch()));
+                        if (topology().minEpoch() == topology.epoch() && 
topology().epoch() != topology.epoch())
+                            return ready.coordination;
+                        config.acknowledgeEpoch(ready, startSync);
+                        return ready.coordination;
+                    }
+
+                    @Override
+                    public void onRemoteSyncComplete(Node.Id node, long epoch)
+                    {
+                        topology.onEpochSyncComplete(node, epoch);
+                    }
+
+                    @Override
+                    public void onRemoveNodes(long epoch, Collection<Node.Id> 
removed)
+                    {
+                        topology.onRemoveNodes(epoch, removed);
+                    }
+
+                    @Override
+                    public void truncateTopologyUntil(long epoch)
+                    {
+                        topology.truncateTopologyUntil(epoch);
+                    }
+
+                    @Override
+                    public void onEpochClosed(Ranges ranges, long epoch)
+                    {
+                        topology.onEpochClosed(ranges, epoch);
+                    }
+
+                    @Override
+                    public void onEpochRedundant(Ranges ranges, long epoch)
+                    {
+                        topology.onEpochRedundant(ranges, epoch);
+                    }
+                });
+
+                Map<Verb, IVerbHandler<?>> handlers = new 
EnumMap<>(Verb.class);
+                //noinspection unchecked
+                handlers.put(Verb.ACCORD_SYNC_NOTIFY_REQ, msg -> 
AccordService.receive(messagingService, config, 
(Message<List<AccordSyncPropagator.Notification>>) (Message<?>) msg));
+                this.messaging = messagingService;
+                this.reciver = messagingService.reciver(new 
SimulatedMessageDelivery.SimpleVerbHandler(handlers));
+            }
+
+            void maybeStart()
+            {
+                if (status == Status.Init)
+                {
+                    start();
+                    status = Status.Started;
+                }
+            }
+
+            private void start()
+            {
+                config.start();
+            }
+
+            TopologyManager topology()
+            {
+                return topology;
+            }
+
+            Collection<Replica> replica()
+            {
+                InetAddressAndPort address = Cluster.address(id);
+                SortedSet<Long> lessThan = tokens.headSet(token);
+                if (lessThan.isEmpty())
+                {
+                    // wrap around
+                    return Arrays.asList(new Replica(address, new 
LongToken(Long.MIN_VALUE), new LongToken(token), true),
+                                         new Replica(address, new 
LongToken(tokens.last()), new LongToken(Long.MIN_VALUE), true));
+                }
+
+                return Collections.singletonList(new Replica(address, new 
LongToken(lessThan.last()), new LongToken(token), true));
+            }
+
+            Set<EpochTracker> synced(long epoch)
+            {
+                if (epoch < this.epoch.getEpoch()) throw new 
IllegalArgumentException("Asked for epoch before this instance existed");
+                EnumSet<EpochTracker> done = 
EnumSet.noneOf(EpochTracker.class);
+                EpochSnapshot snapshot = config.getEpochSnapshot(epoch);
+                if (snapshot != null && snapshot.syncStatus == 
AccordConfigurationService.SyncStatus.COMPLETED)
+                    done.add(EpochTracker.configurationService);
+                if (topology.hasReachedQuorum(epoch))
+                    done.add(EpochTracker.topologyManager);
+                if (!config.syncPropagator().hasPending(epoch))
+                    done.add(EpochTracker.accordSyncPropagator);
+                return done;
+            }
+
+            void stop()
+            {
+                messaging.stop();
+            }
+        }
+    }
+
+    private static class Connection implements Comparable<Connection>
+    {
+        final InetAddressAndPort from, to;
+
+        private Connection(InetAddressAndPort from, InetAddressAndPort to)
+        {
+            this.from = from;
+            this.to = to;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Connection that = (Connection) o;
+            return from.equals(that.from) && to.equals(that.to);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(from, to);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Connection{" + "from=" + from + ", to=" + to + '}';
+        }
+
+        @Override
+        public int compareTo(Connection o)
+        {
+            int rc = from.compareTo(o.from);
+            if (rc == 0)
+                rc = to.compareTo(o.to);
+            return rc;
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/LoggingDiskStateManager.java 
b/test/unit/org/apache/cassandra/service/accord/LoggingDiskStateManager.java
new file mode 100644
index 0000000000..7b8ce0e335
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/LoggingDiskStateManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.Topology;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * When trying to inspect the order in which disk state is modified, this 
class can aid by adding logging.  This class
+ * mostly exists for testing to aid in debugging.
+ */
+@SuppressWarnings("unused")
+@VisibleForTesting
+public class LoggingDiskStateManager implements 
AccordConfigurationService.DiskStateManager {
+    private static final Logger logger = 
LoggerFactory.getLogger(LoggingDiskStateManager.class);
+    private final Node.Id self;
+    private final AccordConfigurationService.DiskStateManager delegate;
+
+    public LoggingDiskStateManager(Node.Id self, 
AccordConfigurationService.DiskStateManager delegate) {
+        this.self = self;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState 
loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer) {
+        logger.info("[node={}] Calling loadTopologies()", self);
+        return delegate.loadTopologies(consumer);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState setNotifyingLocalSync(long epoch, 
Set<Node.Id> pending, AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling setNotifyingLocalSync({}, {}, {})", 
self, epoch, pending, diskState);
+        return delegate.setNotifyingLocalSync(epoch, pending, diskState);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState setCompletedLocalSync(long epoch, 
AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling setCompletedLocalSync({}, {})", self, 
epoch, diskState);
+        return delegate.setCompletedLocalSync(epoch, diskState);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState markLocalSyncAck(Node.Id id, long 
epoch, AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling markLocalSyncAck({}, {}, {})", self, 
id, epoch, diskState);
+        return delegate.markLocalSyncAck(id, epoch, diskState);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState saveTopology(Topology topology, 
AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling saveTopology({}, {})", self, 
topology.epoch(), diskState);
+        return delegate.saveTopology(topology, diskState);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState markRemoteTopologySync(Node.Id id, 
long epoch, AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling markRemoteTopologySync({}, {}, {})", 
self, id, epoch, diskState);
+        return delegate.markRemoteTopologySync(id, epoch, diskState);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState markClosed(Ranges ranges, long epoch, 
AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling markClosed({}, {}, {})", self, ranges, 
epoch, diskState);
+        return delegate.markClosed(ranges, epoch, diskState);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState truncateTopologyUntil(long epoch, 
AccordKeyspace.EpochDiskState diskState) {
+        logger.info("[node={}] Calling truncateTopologyUntil({}, {})", self, 
epoch, diskState);
+        return delegate.truncateTopologyUntil(epoch, diskState);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/MockDiskStateManager.java 
b/test/unit/org/apache/cassandra/service/accord/MockDiskStateManager.java
new file mode 100644
index 0000000000..9e37602634
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/MockDiskStateManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+
+import java.util.Set;
+
+public enum MockDiskStateManager implements 
AccordConfigurationService.DiskStateManager {
+    instance;
+
+    @Override
+    public AccordKeyspace.EpochDiskState 
loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer) {
+        return AccordKeyspace.EpochDiskState.EMPTY;
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState setNotifyingLocalSync(long epoch, 
Set<Node.Id> pending, AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, epoch);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState setCompletedLocalSync(long epoch, 
AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, epoch);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState markLocalSyncAck(Node.Id id, long 
epoch, AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, epoch);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState saveTopology(Topology topology, 
AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, topology.epoch());
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState markRemoteTopologySync(Node.Id node, 
long epoch, AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, epoch);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState markClosed(Ranges ranges, long epoch, 
AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, epoch);
+    }
+
+    @Override
+    public AccordKeyspace.EpochDiskState truncateTopologyUntil(long epoch, 
AccordKeyspace.EpochDiskState diskState) {
+        return maybeUpdateMaxEpoch(diskState, epoch);
+    }
+
+    private static AccordKeyspace.EpochDiskState 
maybeUpdateMaxEpoch(AccordKeyspace.EpochDiskState diskState, long epoch) {
+        if (diskState.isEmpty())
+            return AccordKeyspace.EpochDiskState.create(epoch);
+        Invariants.checkArgument(epoch >= diskState.minEpoch, "Epoch %d < %d 
(min)", epoch, diskState.minEpoch);
+        if (epoch > diskState.maxEpoch)
+            diskState = diskState.withNewMaxEpoch(epoch);
+        return diskState;
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java 
b/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java
index ceed706236..e3e471b550 100644
--- a/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.utils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
@@ -99,7 +99,7 @@ public class StatefulRangeTreeTest
             @Override
             public Gen<Command<State, Sut, ?>> commands(State state)
             {
-                Map<Gen<Command<State, Sut, ?>>, Integer> possible = new 
HashMap<>();
+                Map<Gen<Command<State, Sut, ?>>, Integer> possible = new 
LinkedHashMap<>();
                 possible.put(rs -> new Create(state.newRange(rs), 
SMALL_INT_GEN.nextInt(rs)), state.createWeight);
                 possible.put(rs -> new Read(state.newRange(rs)), 
state.readWeight);
                 possible.put(rs -> new 
KeyRead(IntKey.routing(state.tokenGen.nextInt(rs))), state.readWeight);
@@ -108,15 +108,15 @@ public class StatefulRangeTreeTest
                 possible.put(ignore -> Clear.instance, state.clearWeight);
                 if (!state.uniqRanges.isEmpty())
                 {
-                    possible.put(rs -> new Read(rs.pick(state.uniqRanges)), 
state.readWeight);
+                    possible.put(rs -> new 
Read(rs.pickOrderedSet(state.uniqRanges)), state.readWeight);
                     possible.put(rs -> {
-                        Range range = rs.pick(state.uniqRanges);
+                        Range range = rs.pickOrderedSet(state.uniqRanges);
                         int token = rs.nextInt(((IntKey.Routing) 
range.start()).key, ((IntKey.Routing) range.end()).key) + 1;
                         return new KeyRead(IntKey.routing(token));
                     }, state.readWeight);
-                    possible.put(rs -> new 
RangeRead(rs.pick(state.uniqRanges)), state.readWeight);
-                    possible.put(rs -> new Update(rs.pick(state.uniqRanges), 
SMALL_INT_GEN.nextInt(rs)), state.updateWeight);
-                    possible.put(rs -> new Delete(rs.pick(state.uniqRanges)), 
state.deleteWeight);
+                    possible.put(rs -> new 
RangeRead(rs.pickOrderedSet(state.uniqRanges)), state.readWeight);
+                    possible.put(rs -> new 
Update(rs.pickOrderedSet(state.uniqRanges), SMALL_INT_GEN.nextInt(rs)), 
state.updateWeight);
+                    possible.put(rs -> new 
Delete(rs.pickOrderedSet(state.uniqRanges)), state.deleteWeight);
                 }
                 return Gens.oneOf(possible);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to