This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit cf62c3071c7a0fbdb50598f4b98e8f296b3dbc46
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Sep 25 17:20:12 2024 +0100

    wip
---
 .../src/main/java/accord/api/DataStore.java        |   3 +-
 .../java/accord/impl/InMemoryCommandStore.java     | 179 +++++---------
 .../java/accord/impl/InMemoryCommandStores.java    |  17 +-
 .../java/accord/impl/progresslog/HomeState.java    |   2 +-
 .../java/accord/impl/progresslog/WaitingState.java |   4 +-
 .../src/main/java/accord/local/Bootstrap.java      |  34 +--
 .../src/main/java/accord/local/Cleanup.java        |   3 +-
 .../src/main/java/accord/local/Command.java        |   3 +-
 .../src/main/java/accord/local/CommandStore.java   | 265 +++++----------------
 .../src/main/java/accord/local/CommandStores.java  |  14 +-
 .../src/main/java/accord/local/Commands.java       |  51 ++--
 accord-core/src/main/java/accord/local/Node.java   |   2 +-
 .../main/java/accord/local/RedundantBefore.java    |  41 ++--
 .../main/java/accord/local/SafeCommandStore.java   |   2 +-
 .../main/java/accord/local/cfk/CommandsForKey.java |   4 +-
 .../src/main/java/accord/local/cfk/Updating.java   |   6 +-
 .../java/accord/messages/SetGloballyDurable.java   |   2 +-
 .../main/java/accord/messages/SetShardDurable.java |   2 +-
 .../test/java/accord/impl/RemoteListenersTest.java |   7 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   2 +-
 .../accord/impl/basic/DelayedCommandStores.java    |  14 +-
 .../src/test/java/accord/impl/list/ListStore.java  | 107 +++++++--
 .../java/accord/local/BootstrapLocalTxnTest.java   | 131 ----------
 .../java/accord/local/cfk/CommandsForKeyTest.java  |   6 +-
 24 files changed, 288 insertions(+), 613 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/DataStore.java 
b/accord-core/src/main/java/accord/api/DataStore.java
index 259b39cd..a93ff073 100644
--- a/accord-core/src/main/java/accord/api/DataStore.java
+++ b/accord-core/src/main/java/accord/api/DataStore.java
@@ -26,6 +26,7 @@ import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
 import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 
 /**
  * A marker interface for a shard instance's storage, that is passed to
@@ -111,6 +112,6 @@ public interface DataStore
     }
 
     FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, 
SyncPoint syncPoint, FetchRanges callback);
-    default void snapshot() {};
+    default AsyncResult<Void> snapshot(Ranges ranges) { return 
AsyncResults.success(null); };
     default void restoreFromSnapshot() {};
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 65bc1618..1b474779 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -42,7 +42,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,19 +50,16 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.api.Scheduler;
 import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.Commands;
-import accord.local.DurableBefore;
 import accord.local.KeyHistory;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
-import accord.local.RedundantBefore;
 import accord.local.RedundantStatus;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
@@ -90,8 +86,6 @@ import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
 
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH;
@@ -129,54 +123,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
     private InMemorySafeStore current;
 
-    // To simulate the delay in simulatedAsyncPersist
-    private final Scheduler scheduler;
-
-    private static final class SimulatedFieldPersister<T> implements 
FieldPersister<T>
-    {
-        private T lastValue;
-        private final Scheduler scheduler;
-        private final Node node;
-        private final int id;
-        public SimulatedFieldPersister(Scheduler scheduler, T defaultValue, 
Node node, int id)
-        {
-            this.scheduler = scheduler;
-            this.lastValue = defaultValue;
-            this.node = node;
-            this.id = id;
-        }
-
-        public AsyncResult<?> persist(CommandStore store, T toPersist)
-        {
-            System.out.println("Persisting for " + node.id() + "-store-" + id);
-            AsyncResult.Settable<?> result = AsyncResults.settable();
-            scheduler.once(() -> {
-                lastValue = toPersist;
-                result.trySuccess(null);
-            }, 100, TimeUnit.MICROSECONDS);
-            return result;
-        }
-
-        public T restore()
-        {
-            return lastValue;
-        }
-    }
-
-    public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
+    public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
     {
-        super(id,
-              time,
-              agent,
-              store,
-              progressLogFactory,
-              listenersFactory,
-              epochUpdateHolder,
-              new SimulatedFieldPersister<>(scheduler, DurableBefore.EMPTY, 
(Node) time, id),
-              new SimulatedFieldPersister<>(scheduler, RedundantBefore.EMPTY, 
(Node) time, id),
-              new SimulatedFieldPersister<>(scheduler, 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id),
-              new SimulatedFieldPersister<>(scheduler, 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id));
-        this.scheduler = scheduler;
+        super(id, time, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
     }
 
     protected boolean canExposeUnloaded()
@@ -418,59 +367,57 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     }
 
     @Override
-    public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore, TxnId 
syncId, Ranges ranges)
+    public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, 
Ranges ranges)
     {
-        // We know it completes immediately for InMemoryCommandStore
-        AsyncChain<Void> markShardDurableChain = 
super.markShardDurable(safeStore, syncId, ranges);
-        markShardDurableChain = markShardDurableChain.map(ignored ->
-         {
-             if (!rangeCommands.containsKey(syncId))
-                 historicalRangeCommands.merge(syncId, ranges, Ranges::with);
-
-             // TODO (now): apply on retrieval
-             historicalRangeCommands.entrySet().removeIf(next -> 
next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges));
-             rangeCommands.entrySet().removeIf(tx -> {
-                 if (tx.getKey().compareTo(syncId) >= 0)
-                     return false;
-                 Ranges newRanges = tx.getValue().ranges.without(ranges);
-                 if (!newRanges.isEmpty())
-                 {
-                     tx.getValue().ranges = newRanges;
-                     return false;
-                 }
-                 else
-                 {
-                     maxRedundant = Timestamp.nonNullOrMax(maxRedundant, 
tx.getValue().command.value().executeAt());
-                     return true;
-                 }
-             });
-
-             // verify we're clearing the progress log
-             ((Node)time).scheduler().once(() -> {
-                 DefaultProgressLog progressLog = (DefaultProgressLog) 
this.progressLog;
-                 commands.headMap(syncId, false).forEach((id, cmd) -> {
-                     Command command = cmd.value();
-                     if (!command.hasBeen(PreCommitted)) return;
-                     if (!command.txnId().kind().isGloballyVisible()) return;
-
-                     Ranges allRanges = 
unsafeRangesForEpoch().allBetween(id.epoch(), 
command.executeAtOrTxnId().epoch());
-                     boolean done = command.hasBeen(Truncated);
-                     if (!done)
-                     {
-                         if (redundantBefore().status(cmd.txnId, 
command.executeAtOrTxnId(), command.route()) == 
RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
-                             return;
-
-                         Route<?> route = cmd.value().route().slice(allRanges);
-                         done = !route.isEmpty() && ranges.containsAll(route);
-                     }
-
-                     if (done) Invariants.checkState(progressLog.get(id) == 
null);
-                 });
-             }, 5L, TimeUnit.SECONDS);
-             return null;
-         }, this);
-
-        return markShardDurableChain;
+        super.markShardDurable(safeStore, syncId, ranges);
+        markShardDurable(syncId, ranges);
+    }
+
+    private void markShardDurable(TxnId syncId, Ranges ranges)
+    {
+        if (!rangeCommands.containsKey(syncId))
+            historicalRangeCommands.merge(syncId, ranges, Ranges::with);
+
+        // TODO (now): apply on retrieval
+        historicalRangeCommands.entrySet().removeIf(next -> 
next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges));
+        rangeCommands.entrySet().removeIf(tx -> {
+            if (tx.getKey().compareTo(syncId) >= 0)
+                return false;
+            Ranges newRanges = tx.getValue().ranges.without(ranges);
+            if (!newRanges.isEmpty())
+            {
+                tx.getValue().ranges = newRanges;
+                return false;
+            }
+            else
+            {
+                maxRedundant = Timestamp.nonNullOrMax(maxRedundant, 
tx.getValue().command.value().executeAt());
+                return true;
+            }
+        });
+
+        // verify we're clearing the progress log
+        ((Node)time).scheduler().once(() -> {
+            DefaultProgressLog progressLog = (DefaultProgressLog) 
this.progressLog;
+            commands.headMap(syncId, false).forEach((id, cmd) -> {
+                Command command = cmd.value();
+                if (!command.hasBeen(PreCommitted)) return;
+                if (!command.txnId().kind().isGloballyVisible()) return;
+
+                Ranges allRanges = 
unsafeRangesForEpoch().allBetween(id.epoch(), 
command.executeAtOrTxnId().epoch());
+                boolean done = command.hasBeen(Truncated);
+                if (!done)
+                {
+                    if (redundantBefore().status(cmd.txnId, 
command.executeAtOrTxnId(), command.route()) == 
RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
+                        return;
+
+                    Route<?> route = cmd.value().route().slice(allRanges);
+                    done = !route.isEmpty() && ranges.containsAll(route);
+                }
+
+                if (done) Invariants.checkState(progressLog.get(id) == null);
+            });
+        }, 5L, TimeUnit.SECONDS);
     }
 
     protected InMemorySafeStore createSafeStore(PreLoadContext context, 
RangesForEpoch ranges,
@@ -1081,9 +1028,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         Runnable active = null;
         final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
 
-        public Synchronized(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
+        public Synchronized(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
         }
 
         private synchronized void maybeRun()
@@ -1173,9 +1120,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         private Thread thread; // when run in the executor this will be 
non-null, null implies not running in this store
         private final ExecutorService executor;
 
-        public SingleThread(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
+        public SingleThread(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
             this.executor = Executors.newSingleThreadExecutor(r -> {
                 Thread thread = new Thread(r);
                 thread.setName(CommandStore.class.getSimpleName() + '[' + 
time.id() + ']');
@@ -1257,9 +1204,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             }
         }
 
-        public Debug(int id, NodeTimeService time, Agent agent, DataStore 
store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
+        public Debug(int id, NodeTimeService time, Agent agent, DataStore 
store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
         }
 
         @Override
@@ -1382,16 +1329,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         commandsForKey.clear();
         rangeCommands.clear();
         historicalRangeCommands.clear();
-
-        durableBeforePersistentField.clearAndRestore();
-        redundantBeforePersistentField.clearAndRestore();
-        bootstrapBeganAtPersistentField.clearAndRestore();
-        safeToReadPersistentField.clearAndRestore();
-    }
-
-    protected void setRedundantBefore(RedundantBefore newRedundantBefore)
-    {
-        super.setRedundantBefore(newRedundantBefore);
     }
 
     public interface Loader
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index 9c7f56e0..1596bb19 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -22,7 +22,6 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.api.Scheduler;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.NodeTimeService;
@@ -33,30 +32,30 @@ public class InMemoryCommandStores
 {
     public static class Synchronized extends CommandStores
     {
-        public Synchronized(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, Scheduler scheduler)
+        public Synchronized(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new, 
scheduler);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new);
         }
     }
 
     public static class SingleThread extends CommandStores
     {
-        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, Scheduler scheduler)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new, 
scheduler);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new);
         }
 
-        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, CommandStore.Factory shardFactory)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, shardFactory, scheduler);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, shardFactory);
         }
     }
 
     public static class Debug extends InMemoryCommandStores.SingleThread
     {
-        public Debug(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler 
scheduler)
+        public Debug(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, LocalListeners.Factory listenersFactory)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new, 
scheduler);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java 
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index cf68e186..d9268b61 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -131,7 +131,7 @@ abstract class HomeState extends WaitingState
     {
         Invariants.checkState(!isHomeDoneOrUninitialised());
         Command command = safeCommand.current();
-        Invariants.checkState(!safeStore.isTruncated(command), () -> 
String.format("Command %s is truncated", command));
+        Invariants.checkState(!safeStore.isTruncated(command), "Command %s is 
truncated", command);
 
         // TODO (expected): when invalidated, safer to maintain HomeState 
until known to be globally invalidated
         // TODO (now): validate that we clear HomeState when we receive a 
Durable reply, to replace the token check logic
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java 
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index 58980feb..d60f2a28 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -304,9 +304,7 @@ abstract class WaitingState extends BaseTxnState
         Command command = safeCommand.current();
         Invariants.checkState(!owner.hasActive(Waiting, txnId));
         
Invariants.checkState(command.saveStatus().compareTo(blockedUntil.minSaveStatus)
 < 0,
-                              () -> String.format("Command has met desired 
criteria (%s) but progress log entry has not been cancelled: %s",
-                                                  blockedUntil.minSaveStatus,
-                                                  command));
+                              "Command has met desired criteria (%s) but 
progress log entry has not been cancelled: %s", blockedUntil.minSaveStatus, 
command);
 
         set(safeStore, owner, blockedUntil, Querying);
         TxnId txnId = safeCommand.txnId();
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index b1ed9846..d77df6f5 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -41,7 +41,6 @@ import accord.utils.Invariants;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
-import static accord.local.PreLoadContext.contextFor;
 import static accord.local.PreLoadContext.empty;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
@@ -96,7 +95,7 @@ class Bootstrap
     }
 
     // an attempt to fetch some portion of the range we are bootstrapping
-    class Attempt implements FetchRanges, BiConsumer<Void, Throwable>
+    class Attempt implements FetchRanges, BiConsumer<Object, Throwable>
     {
         final List<SafeToRead> states = new ArrayList<>();
         Runnable cancel;
@@ -117,7 +116,7 @@ class Bootstrap
             this.valid = ranges;
         }
 
-        void start(SafeCommandStore safeStore0)
+        void start(SafeCommandStore safeStore)
         {
             if (valid.isEmpty())
             {
@@ -142,23 +141,16 @@ class Bootstrap
             // we fix here the ranges we use for the synthetic command, even 
though we may end up only finishing a subset
             // of these ranges as part of this attempt
             Ranges commitRanges = valid;
-            store.markBootstrapping(safeStore0.commandStore(), globalSyncId, 
valid).flatMap(ignore ->
-            CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
-                               // ATM all known implementations store ranges 
in-memory, but this will not be true soon, so this will need to be addressed
-                               .flatMap(syncPoint -> node.withEpoch(epoch, () 
-> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), 
KeyHistory.COMMANDS), safeStore1 -> {
-                                   if (valid.isEmpty()) // we've lost 
ownership of the range
-                                       return 
AsyncResults.success(Ranges.EMPTY);
-
-                                   
Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, 
valid);
-                                   
safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, 
safeStore1);
-                                   return fetch = 
safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this);
-                               })))
-                               .flatMap(i -> i)
-                               .flatMap(ranges -> 
store.execute(contextFor(localSyncId), safeStore -> {
-                                   if (!ranges.isEmpty())
-                                       
Commands.markBootstrapComplete(safeStore, localSyncId, ranges);
-                               })))
-                 .begin(this);
+            safeStore = safeStore;
+            // we submit a separate execution so that we know 
markBootstrapping is durable before we initiate the fetch
+            safeStore.commandStore().submit(empty(), safeStore0 -> {
+                store.markBootstrapping(safeStore0, globalSyncId, 
commitRanges);
+                return CoordinateSyncPoint.exclusive(node, globalSyncId, 
commitRanges);
+            }).flatMap(i -> i).flatMap(syncPoint -> node.withEpoch(epoch, () 
-> store.submit(empty(), safeStore1 -> {
+                if (valid.isEmpty()) // we've lost ownership of the range
+                    return AsyncResults.success(Ranges.EMPTY);
+                return fetch = safeStore1.dataStore().fetch(node, safeStore1, 
valid, syncPoint, this);
+            }))).flatMap(i -> i).begin(this);
         }
 
         // we no longer want to fetch these ranges (perhaps we no longer own 
them)
@@ -379,7 +371,7 @@ class Bootstrap
         }
 
         @Override
-        public void accept(Void success, Throwable failure)
+        public void accept(Object success, Throwable failure)
         {
             if (completed)
                 return;
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index c86192bd..578912a6 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -98,7 +98,6 @@ public enum Cleanup
                              commandStore.redundantBefore(), 
commandStore.durableBefore(), enforceInvariants);
     }
 
-
     public static Cleanup shouldCleanup(TxnId txnId, Status status, Durability 
durability, EpochSupplier toEpoch, Route<?> route, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
         return shouldCleanup(txnId, status, durability, toEpoch, route, 
redundantBefore, durableBefore, true);
@@ -135,7 +134,7 @@ public enum Cleanup
                 //      - we can impose additional validations here IF we 
receive an epoch upper bound
                 //      - we should be more robust to the presence/absence of 
executeAt
                 //      - be cognisant of future epochs that participated only 
for PreAccept/Accept, but where txn was not committed to execute in the epoch 
(this is why we provide null toEpoch here)
-                illegalState(String.format("Command %s that is being loaded is 
not owned by this shard on route %s. Redundant before: %s", txnId, route, 
redundantBefore));
+                illegalState("Command %s that is being loaded is not owned by 
this shard on route %s. Redundant before: %s", txnId, route, redundantBefore);
             }
         }
         switch (redundant)
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index f1876624..ddc4b5a8 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -340,8 +340,7 @@ public abstract class Command implements CommonAttributes
                         Invariants.checkState(result != null, "Result is 
null");
                         break;
                     case Invalidated:
-                        
Invariants.checkState(validate.durability().isMaybeInvalidated(),
-                                              () -> String.format("%s is not 
invalidated", validate.durability()));
+                        
Invariants.checkState(validate.durability().isMaybeInvalidated(), "%s is not 
invalidated", validate.durability());
                     case Unknown:
                         Invariants.checkState(validate.durability() != Local);
                     case Erased:
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index b57569ac..298d0c04 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -28,15 +28,11 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +42,6 @@ import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.api.Scheduler;
 import accord.api.VisibleForImplementationTesting;
 import accord.coordinate.CollectCalculatedDeps;
 import accord.local.Command.WaitingOn;
@@ -68,7 +63,6 @@ import accord.utils.DeterministicIdentitySet;
 import accord.utils.Invariants;
 import accord.utils.ReducingRangeMap;
 import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import org.agrona.collections.Int2ObjectHashMap;
@@ -82,7 +76,6 @@ import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static accord.utils.Invariants.checkState;
 import static accord.utils.Invariants.illegalState;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Single threaded internal shard of accord transaction metadata
@@ -119,13 +112,13 @@ public abstract class CommandStore implements 
AgentExecutor
         // TODO (desired): can better encapsulate by accepting only the 
newRangesForEpoch and deriving the add/remove ranges
         public void add(long epoch, RangesForEpoch newRangesForEpoch, Ranges 
addRanges)
         {
-            RedundantBefore addRedundantBefore = 
RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.NONE, 
TxnId.NONE, TxnId.minForEpoch(epoch));
+            RedundantBefore addRedundantBefore = 
RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.NONE, 
TxnId.NONE, TxnId.NONE, TxnId.minForEpoch(epoch));
             update(newRangesForEpoch, addRedundantBefore);
         }
 
         public void remove(long epoch, RangesForEpoch newRangesForEpoch, 
Ranges removeRanges)
         {
-            RedundantBefore addRedundantBefore = 
RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, 
TxnId.NONE, TxnId.NONE);
+            RedundantBefore addRedundantBefore = 
RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, 
TxnId.NONE, TxnId.NONE, TxnId.NONE);
             update(newRangesForEpoch, addRedundantBefore);
         }
 
@@ -146,8 +139,7 @@ public abstract class CommandStore implements AgentExecutor
                             DataStore store,
                             ProgressLog.Factory progressLogFactory,
                             LocalListeners.Factory listenersFactory,
-                            EpochUpdateHolder rangesForEpoch,
-                            Scheduler scheduler);
+                            EpochUpdateHolder rangesForEpoch);
     }
 
     private static final ThreadLocal<CommandStore> CURRENT_STORE = new 
ThreadLocal<>();
@@ -189,22 +181,13 @@ public abstract class CommandStore implements 
AgentExecutor
     private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new 
DeterministicIdentitySet<>());
     @Nullable private ReducingRangeMap<Timestamp> rejectBefore;
 
-    protected final PersistentField<DurableBefore, DurableBefore> 
durableBeforePersistentField;
-    protected final PersistentField<RedundantBefore, RedundantBefore> 
redundantBeforePersistentField;
-    protected final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, 
Ranges>> bootstrapBeganAtPersistentField;
-    protected final PersistentField<NavigableMap<Timestamp, Ranges>, 
NavigableMap<Timestamp, Ranges>> safeToReadPersistentField;
-
     protected CommandStore(int id,
                            NodeTimeService time,
                            Agent agent,
                            DataStore store,
                            ProgressLog.Factory progressLogFactory,
                            LocalListeners.Factory listenersFactory,
-                           EpochUpdateHolder epochUpdateHolder,
-                           FieldPersister<DurableBefore> persistDurableBefore,
-                           FieldPersister<RedundantBefore> 
persistRedundantBefore,
-                           FieldPersister<NavigableMap<TxnId, Ranges>> 
persistBootstrapBeganAt,
-                           FieldPersister<NavigableMap<Timestamp, Ranges>> 
persistSafeToReadAt)
+                           EpochUpdateHolder epochUpdateHolder)
     {
         this.id = id;
         this.time = time;
@@ -213,11 +196,6 @@ public abstract class CommandStore implements AgentExecutor
         this.progressLog = progressLogFactory.create(this);
         this.listeners = listenersFactory.create(this);
         this.epochUpdateHolder = epochUpdateHolder;
-        this.durableBeforePersistentField = new 
PersistentField<>(this::durableBefore, DurableBefore::merge, 
persistDurableBefore, durableBefore -> setDurableBefore(durableBefore));
-        this.redundantBeforePersistentField = new 
PersistentField<>(this::redundantBefore, RedundantBefore::merge, 
persistRedundantBefore, redundantBefore -> setRedundantBefore(redundantBefore));
-        this.bootstrapBeganAtPersistentField = new 
PersistentField<>(this::bootstrapBeganAt, CommandStore::bootstrap, 
persistBootstrapBeganAt, bootstrapBeganAt -> 
setBootstrapBeganAt(bootstrapBeganAt));
-        this.safeToReadPersistentField = new 
PersistentField<>(this::safeToRead, null, persistSafeToReadAt, safeToRead -> 
setSafeToRead(safeToRead));
-
     }
 
     public final int id()
@@ -238,18 +216,12 @@ public abstract class CommandStore implements 
AgentExecutor
             return rangesForEpoch;
 
         update = epochUpdateHolder.getAndSet(null);
-
         if (!update.addGlobalRanges.isEmpty())
-            // Intentionally don't care if this persists since it will be 
replayed from topology at startup
             setDurableBefore(DurableBefore.merge(durableBefore, 
DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)));
-
         if (update.addRedundantBefore.size() > 0)
-            // Intentionally don't care if this persists since it will be 
replayed from topology at startup
             setRedundantBefore(RedundantBefore.merge(redundantBefore, 
update.addRedundantBefore));
-
         if (update.newRangesForEpoch != null)
             rangesForEpoch = update.newRangesForEpoch;
-
         return rangesForEpoch;
     }
 
@@ -279,12 +251,6 @@ public abstract class CommandStore implements AgentExecutor
         this.rejectBefore = newRejectBefore;
     }
 
-    protected AsyncResult<?> mergeAndUpdateBootstrapBeganAt(BootstrapSyncPoint 
globalSyncPoint)
-    {
-        return bootstrapBeganAtPersistentField.mergeAndUpdate(globalSyncPoint, 
null, null, false);
-    }
-
-    // This will not work correctly if called outside PersistentField without 
remerging
     protected final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
     {
         this.bootstrapBeganAt = newBootstrapBeganAt;
@@ -295,23 +261,21 @@ public abstract class CommandStore implements 
AgentExecutor
         return durableBefore;
     }
 
-    public final AsyncResult<?> mergeAndUpdateDurableBefore(DurableBefore 
newDurableBefore)
+    public final void upsertDurableBefore(DurableBefore addDurableBefore)
     {
-        return durableBeforePersistentField.mergeAndUpdate(newDurableBefore, 
null, null, true);
+        durableBefore = DurableBefore.merge(durableBefore, addDurableBefore);
     }
 
-    // For implementations to use after persistence
     protected final void setDurableBefore(DurableBefore newDurableBefore)
     {
         durableBefore = newDurableBefore;
     }
 
-    protected final AsyncResult<?> 
mergeAndUpdateRedundantBefore(RedundantBefore newRedundantBefore, Timestamp 
gcBefore, Ranges updatedRanges)
+    protected void upsertRedundantBefore(RedundantBefore addRedundantBefore)
     {
-        return 
redundantBeforePersistentField.mergeAndUpdate(newRedundantBefore, gcBefore, 
updatedRanges, true);
+        redundantBefore = RedundantBefore.merge(redundantBefore, 
addRedundantBefore);
     }
 
-    // For implementations to use after persistence
     protected void setRedundantBefore(RedundantBefore newRedundantBefore)
     {
         redundantBefore = newRedundantBefore;
@@ -336,13 +300,6 @@ public abstract class CommandStore implements AgentExecutor
         setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt));
     }
 
-    protected AsyncResult<?> 
mergeAndUpdateSafeToRead(Function<NavigableMap<Timestamp, Ranges>, 
NavigableMap<Timestamp, Ranges>> computeNewValue)
-    {
-        // The input values are bound into the merge function to satisfy the 
fact that there are two different sets of inputs types to the merge function
-        // depending on whether it is purgeHistory or purgeAndInsert
-        return safeToReadPersistentField.mergeAndUpdate(computeNewValue);
-    }
-
     /**
      * This method may be invoked on a non-CommandStore thread
      */
@@ -360,15 +317,13 @@ public abstract class CommandStore implements 
AgentExecutor
         setRejectBefore(newRejectBefore);
     }
 
-    public final AsyncChain<?> 
markExclusiveSyncPointLocallyApplied(CommandStore commandStore, TxnId txnId, 
Ranges ranges)
+    public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges)
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint);
-        RedundantBefore newRedundantBefore = 
RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, 
TxnId.NONE, TxnId.NONE));
-        AsyncResult<?> setRedundantBeforeChain = 
mergeAndUpdateRedundantBefore(newRedundantBefore, txnId, ranges);
-        return setRedundantBeforeChain.flatMap(
-                   ignored -> commandStore.execute(contextFor(txnId),
-                       safeStore -> updatedRedundantBefore(safeStore, txnId, 
ranges)));
+        RedundantBefore newRedundantBefore = 
RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, 
TxnId.NONE, TxnId.NONE, TxnId.NONE));
+        setRedundantBefore(newRedundantBefore);
+        updatedRedundantBefore(safeStore, txnId, ranges);
     }
 
     /**
@@ -558,35 +513,37 @@ public abstract class CommandStore implements 
AgentExecutor
         bootstraps.remove(bootstrap);
     }
 
-    final AsyncChain<?> markBootstrapping(CommandStore commandStore, TxnId 
globalSyncId, Ranges ranges)
+    final void markBootstrapping(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges)
     {
-        store.snapshot();
-        AsyncResult<?> setBootstrapBeganAtResult = 
mergeAndUpdateBootstrapBeganAt(new BootstrapSyncPoint(globalSyncId, ranges));
-        RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId);
-        // TODO (review): What is the correct txnId to provide here to 
restrict what memtables are flushed?
-        AsyncResult<?> setRedundantBeforeResult = 
mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore), globalSyncId, ranges);
-        DurableBefore addDurableBefore = DurableBefore.create(ranges, 
TxnId.NONE, TxnId.NONE);
-        AsyncResult<?> setDurableBeforeResult = 
mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, 
addDurableBefore));
-        AsyncChain<?> combinedChain = 
AsyncChains.allOf(ImmutableList.of(setBootstrapBeganAtResult, 
setRedundantBeforeResult, setDurableBeforeResult));
-        return combinedChain.flatMap(
-                   ignored -> 
commandStore.execute(PreLoadContext.contextFor(globalSyncId),
-                       safeStore -> updatedRedundantBefore(safeStore, 
globalSyncId, ranges)));
+        setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt));
+        RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, 
globalSyncId);
+        upsertRedundantBefore(addRedundantBefore);
+        upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, 
TxnId.NONE));
+        updatedRedundantBefore(safeStore, globalSyncId, ranges);
     }
 
     // TODO (expected): we can immediately truncate dependencies locally once 
an exclusiveSyncPoint applies, we don't need to wait for the whole shard
-    public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore0, 
TxnId globalSyncId, Ranges ranges)
-    {
-        store.snapshot();
-        ranges = 
ranges.slice(safeStore0.ranges().allUntil(globalSyncId.epoch()), Minimal);
-        RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE);
-        AsyncResult<?> setRedundantBeforeChain = 
mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore), globalSyncId, ranges);
-        DurableBefore addDurableBefore = DurableBefore.create(ranges, 
globalSyncId, globalSyncId);
-        AsyncResult<?> setDurableBeforeChain = 
mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, 
addDurableBefore));
-        Ranges slicedRanges = ranges;
-        AsyncChain<?> combinedChain = 
AsyncChains.allOf(ImmutableList.of(setRedundantBeforeChain, 
setDurableBeforeChain));
-        return combinedChain.flatMap(
-                   ignored -> 
safeStore0.commandStore().execute(PreLoadContext.contextFor(globalSyncId),
-                       safeStore1 -> updatedRedundantBefore(safeStore1, 
globalSyncId, slicedRanges)));
+    public void markShardDurable(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges durableRanges)
+    {
+        final Ranges slicedRanges = 
durableRanges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal);
+        RedundantBefore addShardRedundant = 
RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, 
TxnId.NONE, globalSyncId, TxnId.NONE, TxnId.NONE);
+        upsertRedundantBefore(addShardRedundant);
+        DurableBefore addDurableBefore = DurableBefore.create(slicedRanges, 
globalSyncId, globalSyncId);
+        upsertDurableBefore(addDurableBefore);
+        updatedRedundantBefore(safeStore, globalSyncId, slicedRanges);
+        safeStore = safeStore; // make unusable in lambda
+        safeStore.dataStore().snapshot(slicedRanges).begin((success, fail) -> {
+            if (fail != null)
+            {
+                logger.error("Unsuccessful dataStore snapshot; unable to 
update GC markers", fail);
+                return;
+            }
+
+            execute(PreLoadContext.empty(), safeStore0 -> {
+                RedundantBefore addGc = RedundantBefore.create(slicedRanges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId, 
TxnId.NONE);
+                upsertRedundantBefore(addGc);
+            });
+        });
     }
 
     protected void updatedRedundantBefore(SafeCommandStore safeStore, TxnId 
syncId, Ranges ranges)
@@ -598,7 +555,6 @@ public abstract class CommandStore implements AgentExecutor
     //      also: we no longer expect epochs that are losing a range to be 
marked stale, make sure logic reflects this
     public void markShardStale(SafeCommandStore safeStore, Timestamp 
staleSince, Ranges ranges, boolean isSincePrecise)
     {
-        store.snapshot();
         Timestamp staleUntilAtLeast = staleSince;
         if (isSincePrecise)
         {
@@ -612,10 +568,8 @@ public abstract class CommandStore implements AgentExecutor
         }
         agent.onStale(staleSince, ranges);
 
-        RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast);
-        // TODO (review): Is it ok for this to be asynchronous here?
-        // TODO (review): Is stale since the right txnid here?
-        mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore), staleSince, ranges).addCallback(agent);
+        RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast);
+        setRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore));
         // find which ranges need to bootstrap, subtracting those already in 
progress that cover the id
 
         markUnsafeToRead(ranges);
@@ -631,7 +585,7 @@ public abstract class CommandStore implements AgentExecutor
     {
         // Merge in a base for any ranges that needs to be covered
         DurableBefore addDurableBefore = DurableBefore.create(ranges, 
TxnId.NONE, TxnId.NONE);
-        setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore));
+        upsertDurableBefore(addDurableBefore);
         // TODO (review): Convoluted check to not overwrite existing 
bootstraps with TxnId.NONE
         // If loading from disk didn't finish before this then we might 
initialize the range at TxnId.NONE?
         // Does CommandStores.topology ensure that doesn't happen? Is it fine 
if it does because it will get superseded?
@@ -639,7 +593,7 @@ public abstract class CommandStore implements AgentExecutor
         for (Ranges existing : bootstrapBeganAt.values())
             newBootstrapRanges = newBootstrapRanges.without(existing);
         if (!newBootstrapRanges.isEmpty())
-            bootstrapBeganAt = bootstrap(new BootstrapSyncPoint(TxnId.NONE, 
newBootstrapRanges), bootstrapBeganAt);
+            bootstrapBeganAt = bootstrap(TxnId.NONE, newBootstrapRanges, 
bootstrapBeganAt);
         safeToRead = purgeAndInsert(safeToRead, TxnId.NONE, ranges);
         return () -> new EpochReady(epoch, DONE, DONE, DONE, DONE);
     }
@@ -696,9 +650,9 @@ public abstract class CommandStore implements AgentExecutor
                 Keys prev = partiallyBootstrapping.get(txnIdx);
                 Keys remaining = prev;
                 if (remaining == null) remaining = 
builder.directKeyDeps.participatingKeys(txnIdx);
-                else checkState(!remaining.isEmpty());
+                else Invariants.checkState(!remaining.isEmpty());
                 remaining = remaining.without(range);
-                if (prev == null) checkState(!remaining.isEmpty());
+                if (prev == null) Invariants.checkState(!remaining.isEmpty());
                 partiallyBootstrapping.put(txnIdx, remaining);
                 return remaining.isEmpty();
             }
@@ -761,9 +715,9 @@ public abstract class CommandStore implements AgentExecutor
                 Ranges prev = partiallyBootstrapping.get(rangeTxnIdx);
                 Ranges remaining = prev;
                 if (remaining == null) remaining = 
builder.directRangeDeps.ranges(rangeTxnIdx);
-                else checkState(!remaining.isEmpty());
+                else Invariants.checkState(!remaining.isEmpty());
                 remaining = remaining.without(Ranges.of(range));
-                if (prev == null) checkState(!remaining.isEmpty());
+                if (prev == null) Invariants.checkState(!remaining.isEmpty());
                 partiallyBootstrapping.put(rangeTxnIdx, remaining);
                 return remaining.isEmpty();
             }
@@ -817,19 +771,19 @@ public abstract class CommandStore implements 
AgentExecutor
     final synchronized void markUnsafeToRead(Ranges ranges)
     {
         if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges)))
-            mergeAndUpdateSafeToRead(safeToRead -> purgeHistory(safeToRead, 
ranges));
+            setSafeToRead(purgeHistory(safeToRead, ranges));
     }
 
     final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp 
at, Ranges ranges)
     {
         Ranges validatedSafeToRead = 
redundantBefore.validateSafeToRead(forBootstrapAt, ranges);
-        mergeAndUpdateSafeToRead(safeToRead -> purgeAndInsert(safeToRead, at, 
validatedSafeToRead));
+        setSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead));
     }
 
     protected static class BootstrapSyncPoint
     {
-        TxnId syncTxnId;
-        Ranges ranges;
+        final TxnId syncTxnId;
+        final Ranges ranges;
 
         protected BootstrapSyncPoint(TxnId syncTxnId, Ranges ranges)
         {
@@ -838,14 +792,12 @@ public abstract class CommandStore implements 
AgentExecutor
         }
     }
 
-    protected static ImmutableSortedMap<TxnId, Ranges> 
bootstrap(BootstrapSyncPoint syncPoint, NavigableMap<TxnId, Ranges> 
bootstrappedAt)
+    protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, 
Ranges ranges, NavigableMap<TxnId, Ranges> bootstrappedAt)
     {
-        TxnId at = syncPoint.syncTxnId;
-        Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0 || 
syncPoint.syncTxnId == TxnId.NONE);
-        if (syncPoint.syncTxnId == TxnId.NONE)
-            for (Ranges ranges : bootstrappedAt.values())
-                checkState(!syncPoint.ranges.intersects(ranges));
-        Ranges ranges = syncPoint.ranges;
+        Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0 || 
at == TxnId.NONE);
+        if (at == TxnId.NONE)
+            for (Ranges rs : bootstrappedAt.values())
+                checkState(!ranges.intersects(rs));
         Invariants.checkArgument(!ranges.isEmpty());
         // if we're bootstrapping these ranges, then any period we previously 
owned the ranges for is effectively invalidated
         return purgeAndInsert(bootstrappedAt, at, ranges);
@@ -882,109 +834,4 @@ public abstract class CommandStore implements 
AgentExecutor
             return in;
         return new SimpleImmutableEntry<>(in.getKey(), without);
     }
-
-    protected interface FieldPersister<T>
-    {
-        default AsyncResult<?> persist(CommandStore store, Timestamp 
timestamp, Ranges ranges, T toPersist)
-        {
-            return persist(store, toPersist);
-        }
-
-        AsyncResult<?> persist(CommandStore store, T toPersist);
-
-        default T restore() { return  null; };
-    }
-
-    // A helper class for implementing fields that needs to be asynchronously 
persisted and concurrent updates
-    // need to be merged and ordered
-    protected class PersistentField<I, T>
-    {
-        @Nonnull
-        private final Supplier<T> currentValue;
-        // The update can be bound into the merge function in which case it 
will be null
-        // Useful when the merge/update function takes multiple types of input 
arguments
-        @Nullable
-        private final BiFunction<I, T, T> merge;
-        @Nonnull
-        private final FieldPersister<T> persister;
-        @Nonnull
-        private final Consumer<T> set;
-
-        private T pendingValue;
-        private AsyncResult<?> pendingResult;
-
-        public PersistentField(@Nonnull Supplier<T> currentValue, @Nonnull 
BiFunction<I, T, T> merge, @Nonnull FieldPersister<T> persist, @Nullable 
Consumer<T> set)
-        {
-            checkNotNull(currentValue, "currentValue cannot be null");
-            checkNotNull(persist, "persist cannot be null");
-            checkNotNull(set, "set cannot be null");
-            this.currentValue = currentValue;
-            this.merge = merge;
-            this.persister = persist;
-            this.set = set;
-        }
-
-        public void clearAndRestore()
-        {
-            pendingValue = null;
-            set.accept(persister.restore());
-        }
-
-        public AsyncResult<?> mergeAndUpdate(@Nonnull I inputValue, @Nullable 
Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean 
remergeAfterPersistence)
-        {
-            checkNotNull(merge, "merge cannot be null");
-            checkNotNull(inputValue, "inputValue cannot be null");
-            return mergeAndUpdate(inputValue, merge, gcBefore, updatedRanges, 
remergeAfterPersistence);
-        }
-
-        public AsyncResult<?> mergeAndUpdate(@Nonnull Function<T, T> update)
-        {
-            checkNotNull(update, "merge cannot be null");
-            return mergeAndUpdate(null, (ignored, existingValue) -> 
update.apply(existingValue), null, null, false);
-        }
-
-        private AsyncResult<?> mergeAndUpdate(@Nullable I inputValue, @Nonnull 
BiFunction<I, T, T> merge, @Nullable Timestamp gcBefore, @Nullable Ranges 
updatedRanges, boolean remergeAfterPersistence)
-        {
-            checkNotNull(merge, "merge cannot be null");
-            AsyncResult.Settable<Void> result = AsyncResults.settable();
-            AsyncResult<?> oldPendingResult = pendingResult;
-            T startingValue = currentValue.get();
-            T newValue = pendingValue != null ? merge.apply(inputValue, 
pendingValue) : merge.apply(inputValue, startingValue);
-            this.pendingResult = result;
-            this.pendingValue= newValue;
-
-            AsyncResult<?> pendingWrite = persister.persist(CommandStore.this, 
gcBefore, updatedRanges, newValue);
-
-            final T newValueFinal = newValue;
-            BiConsumer<Object, Throwable> callback = (ignored, failure) -> {
-                if (PersistentField.this.pendingResult == result)
-                {
-                    PersistentField.this.pendingResult = null;
-                    PersistentField.this.pendingValue = null;
-                }
-                if (failure != null)
-                    result.tryFailure(failure);
-                else
-                {
-                    // DurableBefore and RedundantBefore can have initial 
values set non-persistently in updateRangesForEpoch so remerge them here
-                    // updateRangesForEpoch really doesn't integrate well with 
is callers if it is asynchronous updating these values
-                    // so this complexity is better than the alternative
-                    if (remergeAfterPersistence && currentValue.get() != 
startingValue)
-                        // I and T will have to be the same for remerge to work
-                        set.accept(merge.apply((I)newValueFinal, 
currentValue.get()));
-                    else
-                        set.accept(newValueFinal);
-                    result.trySuccess(null);
-                }
-            };
-
-            // Order completion after previous updates, this is probably 
stricter than necessary but easy to implement
-            if (oldPendingResult != null)
-                oldPendingResult.addCallback(() -> 
pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent));
-            else
-                
pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent);
-
-            return result;
-        }
-    }
 }
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 4c8efabc..a6b7b030 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -42,7 +42,6 @@ import accord.api.Key;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
-import accord.api.Scheduler;
 import accord.local.CommandStore.EpochUpdateHolder;
 import accord.primitives.EpochSupplier;
 import accord.primitives.Participants;
@@ -86,8 +85,7 @@ public abstract class CommandStores
                              RandomSource random,
                              ShardDistributor shardDistributor,
                              ProgressLog.Factory progressLogFactory,
-                             LocalListeners.Factory listenersFactory,
-                             Scheduler scheduler);
+                             LocalListeners.Factory listenersFactory);
     }
 
     private static class StoreSupplier
@@ -99,9 +97,8 @@ public abstract class CommandStores
         private final LocalListeners.Factory listenersFactory;
         private final CommandStore.Factory shardFactory;
         private final RandomSource random;
-        private final Scheduler scheduler;
 
-        StoreSupplier(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, 
Scheduler scheduler)
+        StoreSupplier(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
         {
             this.time = time;
             this.agent = agent;
@@ -110,12 +107,11 @@ public abstract class CommandStores
             this.progressLogFactory = progressLogFactory;
             this.listenersFactory = listenersFactory;
             this.shardFactory = shardFactory;
-            this.scheduler = scheduler;
         }
 
         CommandStore create(int id, EpochUpdateHolder rangesForEpoch)
         {
-            return shardFactory.create(id, time, agent, this.store, 
progressLogFactory, listenersFactory, rangesForEpoch, scheduler);
+            return shardFactory.create(id, time, agent, this.store, 
progressLogFactory, listenersFactory, rangesForEpoch);
         }
     }
 
@@ -369,9 +365,9 @@ public abstract class CommandStores
     }
 
     public CommandStores(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor,
-                         ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, 
Scheduler scheduler)
+                         ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
     {
-        this(new StoreSupplier(time, agent, store, random, progressLogFactory, 
listenersFactory, shardFactory, scheduler), shardDistributor);
+        this(new StoreSupplier(time, agent, store, random, progressLogFactory, 
listenersFactory, shardFactory), shardDistributor);
     }
 
     public Topology local()
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 5d83e33d..e164ec11 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -551,9 +551,7 @@ public class Commands
     {
         Command.Executed command = safeStore.get(txnId).current().asExecuted();
         if (command.hasBeen(Applied))
-        {
             return AsyncChains.success(null);
-        }
         return apply(safeStore, context, txnId);
     }
 
@@ -564,7 +562,8 @@ public class Commands
         //  that was pre-bootstrap for some range (so redundant and we may 
have gone ahead of), but had to be executed locally
         //  for another range
         CommandStore unsafeStore = safeStore.commandStore();
-        // TODO (required): avoid allocating a timestamp here
+        // TODO (required, API): do we care about tracking the write 
persistence latency, when this is just a memtable write?
+        //  the only reason it will be slow is because Memtable flushes are 
backed-up (which will be reported elsewhere)
         long t0 = safeStore.time().now();
         return command.writes().apply(safeStore, applyRanges(safeStore, 
command.executeAt()), command.partialTxn())
                .flatMap(unused -> unsafeStore.submit(context, ss -> {
@@ -617,7 +616,7 @@ public class Commands
         return maybeExecute(safeStore, safeCommand, safeCommand.current(), 
alwaysNotifyListeners, notifyWaitingOn);
     }
 
-    public static boolean maybeExecute(SafeCommandStore safeStore0, 
SafeCommand safeCommand0, Command command, boolean alwaysNotifyListeners, 
boolean notifyWaitingOn)
+    public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand 
safeCommand, Command command, boolean alwaysNotifyListeners, boolean 
notifyWaitingOn)
     {
         if (logger.isTraceEnabled())
             logger.trace("{}: Maybe executing with status {}. Will notify 
listeners on noop: {}", command.txnId(), command.status(), 
alwaysNotifyListeners);
@@ -625,7 +624,7 @@ public class Commands
         if (command.status() != Stable && command.status() != PreApplied)
         {
             if (alwaysNotifyListeners)
-                safeStore0.notifyListeners(safeCommand0, command);
+                safeStore.notifyListeners(safeCommand, command);
             return false;
         }
 
@@ -633,10 +632,10 @@ public class Commands
         if (waitingOn.isWaiting())
         {
             if (alwaysNotifyListeners)
-                safeStore0.notifyListeners(safeCommand0, command);
+                safeStore.notifyListeners(safeCommand, command);
 
             if (notifyWaitingOn && waitingOn.isWaitingOnCommand())
-                new NotifyWaitingOn(safeCommand0).accept(safeStore0);
+                new NotifyWaitingOn(safeCommand).accept(safeStore);
             return false;
         }
 
@@ -649,23 +648,23 @@ public class Commands
                 // TODO (required): we can have dangling transactions in some 
cases when proposing in a future epoch but
                 //   later deciding on an earlier epoch. We should probably 
turn this into an erased vestigial command,
                 //   but we should tighten up our semantics there in general.
-                safeCommand0.readyToExecute(safeStore0);
+                safeCommand.readyToExecute(safeStore);
                 logger.trace("{}: set to ReadyToExecute", command.txnId());
-                safeStore0.notifyListeners(safeCommand0, command);
+                safeStore.notifyListeners(safeCommand, command);
                 return true;
 
             case PreApplied:
-                Ranges executeRanges = executeRanges(safeStore0, 
command.executeAt());
+                Ranges executeRanges = executeRanges(safeStore, 
command.executeAt());
                 Command.Executed executed = command.asExecuted();
                 boolean intersects = 
executed.writes().keys.intersects(executeRanges);
 
                 if (intersects)
                 {
                     // TODO (now): we should set applying within apply to 
avoid applying multiple times
-                    safeCommand0.applying(safeStore0);
-                    safeStore0.notifyListeners(safeCommand0, command);
+                    safeCommand.applying(safeStore);
+                    safeStore.notifyListeners(safeCommand, command);
                     logger.trace("{}: applying", command.txnId());
-                    apply(safeStore0, executed);
+                    apply(safeStore, executed);
                     return true;
                 }
                 else
@@ -673,26 +672,8 @@ public class Commands
                     // TODO (desirable, performance): This could be performed 
immediately upon Committed
                     //      but: if we later support transitive dependency 
elision this could be dangerous
                     logger.trace("{}: applying no-op", command.txnId());
-                    if (command.txnId().kind() == ExclusiveSyncPoint)
-                    {
-                        Ranges ranges = 
safeStore0.ranges().allAt(command.txnId().epoch());
-                        ranges = command.route().slice(ranges, 
Minimal).participants().toRanges();
-                        CommandStore commandStore = safeStore0.commandStore();
-                        safeCommand0.applying(safeStore0);
-                        safeStore0.notifyListeners(safeCommand0, command);
-                        
commandStore.markExclusiveSyncPointLocallyApplied(commandStore, 
command.txnId(), ranges).flatMap(ignored ->
-                            
commandStore.execute(PreLoadContext.contextFor(command.txnId()), safeStore1 -> {
-                                    SafeCommand safeCommand1 = 
safeStore1.get(command.txnId());
-                                    safeCommand1.applied(safeStore1);
-                                    safeStore1.notifyListeners(safeCommand1, 
command);
-                                })
-                        ).begin(commandStore.agent);
-                    }
-                    else
-                    {
-                        safeCommand0.applied(safeStore0);
-                        safeStore0.notifyListeners(safeCommand0, command);
-                    }
+                    safeCommand.applied(safeStore);
+                    safeStore.notifyListeners(safeCommand, command);
                     return true;
                 }
             default:
@@ -910,7 +891,7 @@ public class Commands
                 break;
 
             case TRUNCATE_WITH_OUTCOME:
-                Invariants.checkArgument(!command.hasBeen(Truncated), 
command.toString());
+                Invariants.checkArgument(!command.hasBeen(Truncated), "%s", 
command);
                 Invariants.checkState(command.hasBeen(PreApplied));
                 result = truncatedApplyWithOutcome(command.asExecuted());
                 break;
@@ -1206,7 +1187,7 @@ public class Commands
     public static Command updateRouteOrParticipants(SafeCommandStore 
safeStore, SafeCommand safeCommand, Unseekables<?> participants)
     {
         Command current = safeCommand.current();
-        if (current.hasBeen(Invalidated) || current.hasBeen(Truncated))
+        if (current.saveStatus().compareTo(Erased) >= 0)
             return current;
 
         CommonAttributes updated = current;
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 22e300f7..f008edf1 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -192,7 +192,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         this.now = new 
AtomicReference<>(Timestamp.fromValues(topology.epoch(), 
nowSupplier.getAsLong(), id));
         this.agent = agent;
         this.random = random;
-        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this), scheduler);
+        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this));
         // TODO review these leak a reference to an object that hasn't 
finished construction, possibly to other threads
         configService.registerListener(this);
     }
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 26942bf6..a39cdf98 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -95,6 +95,12 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
          */
         public final @Nonnull TxnId shardAppliedOrInvalidatedBefore;
 
+        /**
+         * Represents the maximum TxnId we know to have fully executed until 
across all healthy replicas for the range in question.
+         * Unless we are stale or pre-bootstrap, in which case no such 
guarantees can be made.
+         */
+        public final @Nonnull TxnId gcBefore;
+
         /**
          * bootstrappedAt defines the txnId bounds we expect to maintain data 
for locally.
          *
@@ -116,12 +122,12 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
          */
         public final @Nullable Timestamp staleUntilAtLeast;
 
-        public Entry(Range range, long startEpoch, long endEpoch, @Nonnull 
TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable 
Timestamp staleUntilAtLeast)
+        public Entry(Range range, long startEpoch, long endEpoch, @Nonnull 
TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt, @Nullable Timestamp staleUntilAtLeast)
         {
-            this(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast);
+            this(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast);
         }
 
-        public Entry(Range range, long startEpoch, long endEpoch, @Nonnull 
TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
locallyDecidedAndAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable 
Timestamp staleUntilAtLeast)
+        public Entry(Range range, long startEpoch, long endEpoch, @Nonnull 
TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
locallyDecidedAndAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt, @Nullable Timestamp staleUntilAtLeast)
         {
             this.range = range;
             this.startEpoch = startEpoch;
@@ -129,11 +135,14 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             this.locallyAppliedOrInvalidatedBefore = 
locallyAppliedOrInvalidatedBefore;
             this.locallyDecidedAndAppliedOrInvalidatedBefore = 
locallyDecidedAndAppliedOrInvalidatedBefore;
             this.shardAppliedOrInvalidatedBefore = 
shardAppliedOrInvalidatedBefore;
+            this.gcBefore = gcBefore;
             this.bootstrappedAt = bootstrappedAt;
             this.staleUntilAtLeast = staleUntilAtLeast;
             
Invariants.checkArgument(locallyAppliedOrInvalidatedBefore.equals(TxnId.NONE) 
|| locallyAppliedOrInvalidatedBefore.domain().isRange());
             
Invariants.checkArgument(locallyDecidedAndAppliedOrInvalidatedBefore.equals(TxnId.NONE)
 || locallyDecidedAndAppliedOrInvalidatedBefore.domain().isRange());
             
Invariants.checkArgument(shardAppliedOrInvalidatedBefore.equals(TxnId.NONE) || 
shardAppliedOrInvalidatedBefore.domain().isRange());
+            Invariants.checkArgument(gcBefore.equals(TxnId.NONE) || 
gcBefore.domain().isRange());
+            
Invariants.checkArgument(gcBefore.compareTo(shardAppliedOrInvalidatedBefore) <= 
0);
         }
 
         public static Entry reduce(Entry a, Entry b)
@@ -154,6 +163,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             int cl = 
cur.locallyAppliedOrInvalidatedBefore.compareTo(add.locallyAppliedOrInvalidatedBefore);
             int cd = 
cur.locallyDecidedAndAppliedOrInvalidatedBefore.compareTo(add.locallyDecidedAndAppliedOrInvalidatedBefore);
             int cs = 
cur.shardAppliedOrInvalidatedBefore.compareTo(add.shardAppliedOrInvalidatedBefore);
+            int cg = cur.gcBefore.compareTo(add.gcBefore);
             int cb = cur.bootstrappedAt.compareTo(add.bootstrappedAt);
             int csu = compareStaleUntilAtLeast(cur.staleUntilAtLeast, 
add.staleUntilAtLeast);
 
@@ -165,6 +175,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             TxnId locallyAppliedOrInvalidatedBefore = cl >= 0 ? 
cur.locallyAppliedOrInvalidatedBefore : add.locallyAppliedOrInvalidatedBefore;
             TxnId locallyDecidedAndAppliedOrInvalidatedBefore = cd >= 0 ? 
cur.locallyDecidedAndAppliedOrInvalidatedBefore : 
add.locallyDecidedAndAppliedOrInvalidatedBefore;
             TxnId shardAppliedOrInvalidatedBefore = cs >= 0 ? 
cur.shardAppliedOrInvalidatedBefore : add.shardAppliedOrInvalidatedBefore;
+            TxnId gcBefore = cg >= 0 ? cur.gcBefore : add.gcBefore;
             TxnId bootstrappedAt = cb >= 0 ? cur.bootstrappedAt : 
add.bootstrappedAt;
             Timestamp staleUntilAtLeast = csu >= 0 ? cur.staleUntilAtLeast : 
add.staleUntilAtLeast;
 
@@ -182,7 +193,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             if (staleUntilAtLeast != null && 
bootstrappedAt.compareTo(staleUntilAtLeast) >= 0)
                 staleUntilAtLeast = null;
 
-            return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast);
+            return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast);
         }
 
         static @Nonnull RedundantStatus getAndMerge(Entry entry, @Nonnull 
RedundantStatus prev, TxnId txnId, EpochSupplier executeAt)
@@ -305,7 +316,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
 
         Entry withRange(Range range)
         {
-            return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
bootstrappedAt, staleUntilAtLeast);
+            return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         }
 
         public boolean equals(Object that)
@@ -376,27 +387,27 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         return Ranges.ofSortedAndDeoverlapped(staleRanges).mergeTouching();
     }
 
-    public static RedundantBefore create(Ranges ranges, @Nonnull TxnId 
locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt)
+    public static RedundantBefore create(Ranges ranges, @Nonnull TxnId 
locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt)
     {
-        return create(ranges, locallyAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, bootstrappedAt, null);
+        return create(ranges, locallyAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, null);
     }
 
-    public static RedundantBefore create(Ranges ranges, @Nonnull TxnId 
locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable 
Timestamp staleUntilAtLeast)
+    public static RedundantBefore create(Ranges ranges, @Nonnull TxnId 
locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt, @Nullable Timestamp staleUntilAtLeast)
     {
-        return create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
bootstrappedAt, staleUntilAtLeast);
+        return create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
     }
 
-    public static RedundantBefore create(Ranges ranges, long startEpoch, long 
endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt)
+    public static RedundantBefore create(Ranges ranges, long startEpoch, long 
endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt)
     {
-        return create(ranges, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
bootstrappedAt, null);
+        return create(ranges, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, null);
     }
 
-    public static RedundantBefore create(Ranges ranges, long startEpoch, long 
endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable 
Timestamp staleUntilAtLeast)
+    public static RedundantBefore create(Ranges ranges, long startEpoch, long 
endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt, @Nullable Timestamp staleUntilAtLeast)
     {
         if (ranges.isEmpty())
             return new RedundantBefore();
 
-        Entry entry = new Entry(null, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
bootstrappedAt, staleUntilAtLeast);
+        Entry entry = new Entry(null, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         Builder builder = new Builder(ranges.get(0).endInclusive(), 
ranges.size() * 2);
         for (int i = 0 ; i < ranges.size() ; ++i)
         {
@@ -509,7 +520,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             if (v.range.start().equals(start) && v.range.end().equals(end))
                 return v;
 
-            return new Entry(v.range.newRange(start, end), v.startEpoch, 
v.endEpoch, v.locallyAppliedOrInvalidatedBefore, 
v.shardAppliedOrInvalidatedBefore, v.bootstrappedAt, v.staleUntilAtLeast);
+            return new Entry(v.range.newRange(start, end), v.startEpoch, 
v.endEpoch, v.locallyAppliedOrInvalidatedBefore, 
v.shardAppliedOrInvalidatedBefore, v.gcBefore, v.bootstrappedAt, 
v.staleUntilAtLeast);
         }
 
         @Override
@@ -528,7 +539,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             return new Entry(a.range.newRange(
                 a.range.start().compareTo(b.range.start()) <= 0 ? 
a.range.start() : b.range.start(),
                 a.range.end().compareTo(b.range.end()) >= 0 ? a.range.end() : 
b.range.end()
-            ), a.startEpoch, a.endEpoch, a.locallyAppliedOrInvalidatedBefore, 
a.shardAppliedOrInvalidatedBefore, a.bootstrappedAt, a.staleUntilAtLeast);
+            ), a.startEpoch, a.endEpoch, a.locallyAppliedOrInvalidatedBefore, 
a.shardAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, 
a.staleUntilAtLeast);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index bcabdb90..c2a02d18 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -230,7 +230,7 @@ public abstract class SafeCommandStore
         if (newSaveStatus == Applied && oldSaveStatus != Applied)
         {
             Ranges ranges = updated.route().slice(ranges().all(), 
Minimal).toRanges();
-            
commandStore().markExclusiveSyncPointLocallyApplied(this.commandStore(), txnId, 
ranges);
+            commandStore().markExclusiveSyncPointLocallyApplied(this, txnId, 
ranges);
         }
     }
 
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index ad52b511..7cc2bb84 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -1573,12 +1573,12 @@ public class CommandsForKey extends 
CommandsForKeyUpdate implements CommandsSumm
 
     private void checkBehindCommitForLinearizabilityViolation(TxnInfo newInfo, 
TxnInfo maxAppliedWrite)
     {
-        if (!isPreBootstrap(newInfo) && 
CommandsForKey.reportLinearizabilityViolations())
+        if (!isPreBootstrap(newInfo))
         {
             for (int i = maxAppliedWriteByExecuteAt ; i >= 0 ; --i)
             {
                 TxnInfo txn = committedByExecuteAt[i];
-                if (newInfo == txn)
+                if (newInfo == txn && 
CommandsForKey.reportLinearizabilityViolations())
                 {
                     // we haven't found anything pre-bootstrap that follows 
this command, so log a linearizability violation
                     // TODO (expected): this should be a rate-limited logger; 
need to integrate with Cassandra
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java 
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index 2b653064..fdf8eb9d 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -610,13 +610,13 @@ class Updating
                 System.arraycopy(committedByExecuteAt, pos, result, pos + 1, 
committedByExecuteAt.length - pos);
 
                 int maxAppliedWriteByExecuteAt = 
cfk.maxAppliedWriteByExecuteAt;
-                if (reportLinearizabilityViolations() && pos <= 
maxAppliedWriteByExecuteAt)
+                if (pos <= maxAppliedWriteByExecuteAt)
                 {
                     if (pos < maxAppliedWriteByExecuteAt && !wasPruned && 
cfk.isPostBootstrap(newInfo))
                     {
                         for (int i = pos; i <= maxAppliedWriteByExecuteAt; ++i)
                         {
-                            if 
(committedByExecuteAt[pos].kind().witnesses(newInfo))
+                            if 
(committedByExecuteAt[pos].kind().witnesses(newInfo) && 
reportLinearizabilityViolations())
                                 logger.error("Linearizability violation on key 
{}: {} is committed to execute (at {}) before {} that should witness it but has 
already applied (at {})", cfk.key, newInfo.plainTxnId(), 
newInfo.plainExecuteAt(), committedByExecuteAt[i].plainTxnId(), 
committedByExecuteAt[i].plainExecuteAt());
                         }
                     }
@@ -674,7 +674,7 @@ class Updating
             TxnInfo[] committedByExecuteAt = cfk.committedByExecuteAt;
             for (int i = cfk.maxAppliedWriteByExecuteAt + 1; i < appliedPos ; 
++i)
             {
-                if (reportLinearizabilityViolations() && 
committedByExecuteAt[i].status != APPLIED && 
appliedKind.witnesses(committedByExecuteAt[i]))
+                if (committedByExecuteAt[i].status != APPLIED && 
appliedKind.witnesses(committedByExecuteAt[i]) && 
reportLinearizabilityViolations())
                     logger.error("Linearizability violation on key {}: {} is 
committed to execute (at {}) before {} that should witness it but has already 
applied (at {})", cfk.key, committedByExecuteAt[i].plainTxnId(), 
committedByExecuteAt[i].plainExecuteAt(), applied.plainTxnId(), 
applied.plainExecuteAt());
             }
         }
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java 
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index 61a08bca..6a295d88 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -52,7 +52,7 @@ public class SetGloballyDurable extends 
AbstractEpochRequest<SimpleReply>
         DurableBefore cur = safeStore.commandStore().durableBefore();
         DurableBefore upd = DurableBefore.merge(durableBefore, cur);
         // This is done asynchronously
-        
safeStore.commandStore().mergeAndUpdateDurableBefore(upd).begin(node.agent());
+        safeStore.commandStore().upsertDurableBefore(upd);
         return Ok;
     }
 
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java 
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index c0c39069..09c7b236 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -46,7 +46,7 @@ public class SetShardDurable extends 
AbstractEpochRequest<SimpleReply>
     @Override
     public SimpleReply apply(SafeCommandStore safeStore)
     {
-        safeStore.commandStore().markShardDurable(safeStore, 
exclusiveSyncPoint.syncId, 
(Ranges)exclusiveSyncPoint.keysOrRanges).begin(node.agent());
+        safeStore.commandStore().markShardDurable(safeStore, 
exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges);
         return Ok;
     }
 
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 60f59895..664f025a 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -64,7 +64,6 @@ import accord.utils.AccordGens;
 import accord.utils.RandomSource;
 import accord.utils.RandomTestRunner;
 import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncResults;
 import org.agrona.collections.IntHashSet;
 import org.agrona.collections.ObjectHashSet;
 
@@ -391,11 +390,7 @@ public class RemoteListenersTest
                   ignore -> new ProgressLog.NoOpProgressLog(),
                   ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}),
                                                       
DefaultLocalListeners.DefaultNotifySink.INSTANCE),
-                  new EpochUpdateHolder(),
-                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID,
-                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID,
-                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID,
-                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID);
+                  new EpochUpdateHolder());
             this.storeId = id;
         }
 
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index baa4ba6f..a938052e 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -468,7 +468,7 @@ public class Cluster implements Scheduler
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, 
nodeMap::get, topologyUpdates);
                 BooleanSupplier isLoadedCheck = 
Gens.supplier(Gens.bools().mixedDistribution().next(random), random);
                 Node node = new Node(id, messageSink, configService, 
nowSupplier, 
NodeTimeService.elapsedWrapperFromNonMonotonicSource(TimeUnit.MILLISECONDS, 
nowSupplier),
-                                     () -> new ListStore(id), new 
ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()),
+                                     () -> new ListStore(sinks, random, id), 
new ShardDistributor.EvenSplit<>(8, ignore -> new 
PrefixedIntHashKey.Splitter()),
                                      nodeExecutor.agent(),
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, 
DefaultRequestTimeouts::new,
                                      DefaultProgressLogs::new, 
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, 
isLoadedCheck, journal), new CoordinationAdapter.DefaultFactory(),
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index c1cb4fac..fd2256a3 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -109,15 +109,15 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         return false;
     }
 
-    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, SimulatedDelayedExecutorService executorService, 
BooleanSupplier isLoadedCheck, Journal journal, Scheduler scheduler)
+    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, SimulatedDelayedExecutorService executorService, 
BooleanSupplier isLoadedCheck, Journal journal)
     {
-        super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, 
DelayedCommandStore.factory(executorService, isLoadedCheck, journal), 
scheduler);
+        super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, 
DelayedCommandStore.factory(executorService, isLoadedCheck, journal));
     }
 
     public static CommandStores.Factory factory(PendingQueue pending, 
BooleanSupplier isLoadedCheck, Journal journal)
     {
-        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, scheduler) ->
-               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenersFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal, 
scheduler);
+        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory) ->
+               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenersFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal);
     }
 
     @Override
@@ -168,9 +168,9 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         private final BooleanSupplier isLoadedCheck;
         private final Journal journal;
 
-        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, 
SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, 
Journal journal, Scheduler scheduler)
+        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, 
SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, 
Journal journal)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
             this.executor = executor;
             this.isLoadedCheck = isLoadedCheck;
             this.journal = journal;
@@ -201,7 +201,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
 
         private static CommandStore.Factory 
factory(SimulatedDelayedExecutorService executor, BooleanSupplier 
isLoadedCheck, Journal journal)
         {
-            return (id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch, scheduler) -> new DelayedCommandStore(id, 
time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, 
executor, isLoadedCheck, journal, scheduler);
+            return (id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch) -> new DelayedCommandStore(id, time, agent, 
store, progressLogFactory, listenersFactory, rangesForEpoch, executor, 
isLoadedCheck, journal);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 27b40ec1..81416540 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -18,8 +18,10 @@
 
 package accord.impl.list;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -30,6 +32,7 @@ import java.util.stream.Collectors;
 
 import accord.api.DataStore;
 import accord.api.Key;
+import accord.api.Scheduler;
 import accord.coordinate.CoordinateSyncPoint;
 import accord.coordinate.ExecuteSyncPoint.SyncPointErased;
 import accord.coordinate.Invalidated;
@@ -54,9 +57,12 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.RandomSource;
 import accord.utils.Timestamped;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import org.agrona.collections.Int2ObjectHashMap;
 import org.agrona.collections.LongArrayList;
@@ -136,6 +142,9 @@ public class ListStore implements DataStore
 
     static final Timestamped<int[]> EMPTY = new Timestamped<>(Timestamp.NONE, 
new int[0], Arrays::toString);
     final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>();
+    final Scheduler scheduler;
+    final RandomSource random;
+
     private final List<ChangeAt> addedAts = new ArrayList<>();
     private final List<ChangeAt> removedAts = new ArrayList<>();
     private final List<PurgeAt> purgedAts = new ArrayList<>();
@@ -148,38 +157,82 @@ public class ListStore implements DataStore
     // when out of order epochs are detected, this holds the callbacks to try 
again
     private final List<Runnable> onRemovalDone = new ArrayList<>();
 
+    private static final class Snapshot
+    {
+        private final NavigableMap<RoutableKey, Timestamped<int[]>> data;
+        private final List<ChangeAt> addedAts;
+        private final List<ChangeAt> removedAts;
+        private final List<PurgeAt> purgedAts;
+        private final List<FetchComplete> fetchCompletes;
+        private final LongArrayList pendingRemoves;
+
+        private Snapshot(NavigableMap<RoutableKey, Timestamped<int[]>> data, 
List<ChangeAt> addedAts, List<ChangeAt> removedAts, List<PurgeAt> purgedAts, 
List<FetchComplete> fetchCompletes, LongArrayList pendingRemoves)
+        {
+            this.data = new TreeMap<>(data);
+            this.addedAts = new ArrayList<>(addedAts);
+            this.removedAts = new ArrayList<>(removedAts);
+            this.purgedAts = new ArrayList<>(purgedAts);
+            this.fetchCompletes = new ArrayList<>(fetchCompletes);
+            this.pendingRemoves = new LongArrayList();
+            this.pendingRemoves.addAll(pendingRemoves);
+        }
+    }
+
+    private static final class PendingSnapshot
+    {
+        final long delay;
+        final Runnable runnable;
+
+        private PendingSnapshot(long delay, Runnable runnable)
+        {
+            this.delay = delay;
+            this.runnable = runnable;
+        }
+    }
+
+    private Snapshot snapshot;
+    private final Deque<PendingSnapshot> pendingSnapshots = new ArrayDeque<>();
+    private long pendingDelay = 0;
 
-    private final NavigableMap<RoutableKey, Timestamped<int[]>> snapshot = new 
TreeMap<>();
-    private final List<ChangeAt> addedAtSnapshot = new ArrayList<>();
-    private final List<ChangeAt> removedAtSnapshot = new ArrayList<>();
-    private final List<PurgeAt> purgedAtSnapshot = new ArrayList<>();
-    private final List<FetchComplete> fetchCompleteSnapshot = new 
ArrayList<>();
-    private final LongArrayList pendingRemovesSnapshot = new LongArrayList();
+    public AsyncResult<Void> snapshot()
+    {
+        Snapshot snapshot = new Snapshot(data, addedAts, removedAts, 
purgedAts, fetchCompletes, pendingRemoves);
+        AsyncResult.Settable<Void> result = new 
AsyncResults.SettableResult<>();
+        long delay = Math.max(1, random.nextBiasedLong(100, 1000, 5000) - 
pendingDelay);
+        pendingDelay += delay;
+        pendingSnapshots.add(new PendingSnapshot(delay, () -> {
+            this.snapshot = snapshot;
+            result.setSuccess(null);
+        }));
+
+        if (pendingSnapshots.size() == 1)
+            scheduleRunSnapshot();
+        return result;
+    }
 
-    public void snapshot()
+    private void scheduleRunSnapshot()
     {
-        snapshot.clear();
-        snapshot.putAll(data);
-        addedAtSnapshot.clear();
-        addedAtSnapshot.addAll(addedAts);
-        removedAtSnapshot.clear();
-        removedAtSnapshot.addAll(removedAts);
-        purgedAtSnapshot.clear();
-        purgedAtSnapshot.addAll(purgedAts);
-        fetchCompleteSnapshot.clear();
-        fetchCompleteSnapshot.addAll(fetchCompletes);
-        pendingRemovesSnapshot.clear();
-        pendingRemovesSnapshot.addAll(pendingRemoves);
+        Invariants.checkState(!pendingSnapshots.isEmpty());
+        scheduler.once(() -> {
+            if (pendingSnapshots.isEmpty())
+                return;
+
+            PendingSnapshot pendingSnapshot = pendingSnapshots.pollFirst();
+            pendingSnapshot.runnable.run();
+            pendingDelay -= pendingSnapshot.delay;
+            if (!pendingSnapshots.isEmpty())
+                scheduleRunSnapshot();
+        }, pendingSnapshots.peekFirst().delay, TimeUnit.MILLISECONDS);
     }
 
     public void restoreFromSnapshot()
     {
-        data.putAll(snapshot);
-        addedAts.addAll(addedAtSnapshot);
-        removedAts.addAll(removedAtSnapshot);
-        purgedAts.addAll(purgedAtSnapshot);
-        fetchCompletes.addAll(fetchCompleteSnapshot);
-        pendingRemoves.addAll(pendingRemovesSnapshot);
+        data.putAll(snapshot.data);
+        addedAts.addAll(snapshot.addedAts);
+        removedAts.addAll(snapshot.removedAts);
+        purgedAts.addAll(snapshot.purgedAts);
+        fetchCompletes.addAll(snapshot.fetchCompletes);
+        pendingRemoves.addAll(snapshot.pendingRemoves);
     }
 
     public void clear()
@@ -195,8 +248,10 @@ public class ListStore implements DataStore
     // adding here to help trace burn test queries
     public final Node.Id node;
 
-    public ListStore(Node.Id node)
+    public ListStore(Scheduler scheduler, RandomSource random, Node.Id node)
     {
+        this.scheduler = scheduler;
+        this.random = random;
         this.node = node;
     }
 
diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java 
b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
deleted file mode 100644
index cd639f78..00000000
--- a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.local;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Consumer;
-
-import org.junit.jupiter.api.Test;
-
-import accord.api.Agent;
-import accord.impl.PrefixedIntHashKey;
-import accord.impl.basic.Cluster;
-import accord.impl.basic.DelayedCommandStores.DelayedCommandStore;
-import accord.messages.MessageType;
-import accord.messages.ReplyContext;
-import accord.messages.Request;
-import accord.primitives.Deps;
-import accord.primitives.FullRoute;
-import accord.primitives.Ranges;
-import accord.primitives.Routable;
-import accord.primitives.SyncPoint;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topology;
-import accord.topology.TopologyUtils;
-import accord.utils.AccordGens;
-import accord.utils.Gen;
-import accord.utils.Gens;
-import accord.utils.Invariants;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
-import org.assertj.core.api.Assertions;
-
-import static accord.local.PreLoadContext.contextFor;
-import static accord.utils.Property.qt;
-
-class BootstrapLocalTxnTest
-{
-    private static final Gen<Gen<Cleanup>> CLEANUP_DISTRIBUTION = 
Gens.mixedDistribution(Cleanup.NO, Cleanup.TRUNCATE, 
Cleanup.TRUNCATE_WITH_OUTCOME, Cleanup.ERASE);
-
-    @Test
-    public void localOnlyTxnLifeCycle()
-    {
-        Ranges ranges = 
Ranges.ofSortedAndDeoverlapped(PrefixedIntHashKey.ranges(0, 1));
-        List<Node.Id> nodes = Collections.singletonList(new Node.Id(42));
-        Topology t = TopologyUtils.topology(1, nodes, ranges, 2);
-        qt().check(rs -> Cluster.run(rs::fork, nodes, t, nodeMap -> new 
Request()
-        {
-            @Override
-            public void process(Node on, Node.Id from, ReplyContext 
replyContext)
-            {
-                Gen<Cleanup> cleanupGen = CLEANUP_DISTRIBUTION.next(rs);
-                for (int storeId : on.commandStores().ids())
-                {
-                    DelayedCommandStore store = (DelayedCommandStore) 
on.commandStores().forId(storeId);
-                    // this is a bit redundent but here to make the test 
easier to maintain.  Pre/Post execute we validate each command to make sure 
everything is fine
-                    // but that logic could be changed and this test has a 
dependency on validation the command... so to make this dependency explicit
-                    // the test will call the validation logic within the test 
even though it will be called again in the background...
-                    Consumer<Command> validate = store::validateRead;
-                    TxnId globalSyncId = 
on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range);
-                    TxnId localSyncId = globalSyncId.as(Txn.Kind.LocalOnly);
-                    TxnId nextGlobalSyncId = 
on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, 
Routable.Domain.Range).withEpoch(globalSyncId.epoch() + 1);
-                    Ranges ranges = 
AccordGens.rangesInsideRanges(store.updateRangesForEpoch().currentRanges(), 
(rs2, r) -> rs2.nextInt(1, 4)).next(rs);
-
-                    FullRoute<?> route = on.computeRoute(globalSyncId, ranges);
-                    SyncPoint<Ranges> syncPoint = new 
SyncPoint<>(globalSyncId, Deps.NONE, ranges, route);
-                    Ranges valid = AccordGens.rangesInsideRanges(ranges, (rs2, 
r) -> rs2.nextInt(1, 4)).next(rs);
-                    
Invariants.checkArgument(syncPoint.keysOrRanges.containsAll(valid));
-                    Agent agent = store.agent;
-                    store.execute(contextFor(localSyncId, 
syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safe -> 
Commands.createBootstrapCompleteMarkerTransaction(safe, localSyncId, valid))
-                         .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
-                         .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
Commands.markBootstrapComplete(safe, localSyncId, ranges)))
-                         .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
-                         // cleanup txn
-                         .withExecutor(store).flatMap(ignore -> {
-                             Cleanup target = cleanupGen.next(rs);
-                             if (target == Cleanup.NO)
-                                 return AsyncResults.success(Cleanup.NO);
-                             AsyncResult<?> result = 
store.mergeAndUpdateRedundantBefore(RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, 
TxnId.NONE), nextGlobalSyncId, ranges);
-                             switch (target)
-                             {
-                                 case ERASE:
-                                     result = result.flatMap(ignored -> 
store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, nextGlobalSyncId)).beginAsResult()).beginAsResult();
-                                     break;
-                                 case TRUNCATE:
-                                     result = result.flatMap(ignored -> 
store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, globalSyncId)).beginAsResult()).beginAsResult();
-                                     break;
-                                 case TRUNCATE_WITH_OUTCOME:
-                                 case INVALIDATE:
-                                     // no update to DurableBefore = 
TRUNCATE_WITH_OUTCOME
-                                     break;
-                                 default:
-                                     throw new 
UnsupportedOperationException(target.name());
-                             }
-                             return result.map(ignored -> target);
-                         })
-                         // validateRead is called implicitly _on command 
completion_
-                         .flatMap(target -> 
store.execute(contextFor(localSyncId), safe -> {
-                             SafeCommand cmd = safe.get(localSyncId, 
route.homeKey());
-                             Command current = cmd.current();
-                             
Assertions.assertThat(current.saveStatus()).isEqualTo(target == Cleanup.NO ? 
SaveStatus.Applied : target.appliesIfNot);
-                         }))
-                         .begin(on.agent());
-                }
-            }
-
-            @Override
-            public MessageType type()
-            {
-                return null;
-            }
-        }));
-    }
-}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 608b2add..6b7c0b09 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -909,11 +909,7 @@ public class CommandsForKeyTest
                   null,
                   ignore -> new ProgressLog.NoOpProgressLog(),
                   ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE),
-                  new EpochUpdateHolder(),
-                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID,
-                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID,
-                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID,
-                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID);
+                  new EpochUpdateHolder());
             this.pruneInterval = pruneInterval;
             this.pruneHlcDelta = pruneHlcDelta;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to