This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit cc6c12cb914a03826bfc65635cdd17317eade84e
Author: Ariel Weisberg <aweisb...@apple.com>
AuthorDate: Thu Sep 19 16:09:36 2024 -0400

    Persists metadata syncrhonously
---
 .../java/accord/impl/InMemoryCommandStore.java     | 148 ++++++----
 .../java/accord/impl/InMemoryCommandStores.java    |  24 +-
 .../src/main/java/accord/local/Bootstrap.java      |  12 +-
 .../src/main/java/accord/local/CommandStore.java   | 307 +++++++++++++++++----
 .../src/main/java/accord/local/CommandStores.java  |  22 +-
 .../src/main/java/accord/local/Commands.java       |  40 ++-
 accord-core/src/main/java/accord/local/Node.java   |   2 +-
 .../java/accord/messages/SetGloballyDurable.java   |   5 +-
 .../main/java/accord/messages/SetShardDurable.java |   2 +-
 .../src/main/java/accord/utils/TriConsumer.java    |  24 ++
 .../main/java/accord/utils/async/AsyncChain.java   |  11 +
 .../test/java/accord/impl/RemoteListenersTest.java |  15 +-
 .../accord/impl/basic/DelayedCommandStores.java    |  16 +-
 .../java/accord/local/BootstrapLocalTxnTest.java   |  20 +-
 .../java/accord/local/cfk/CommandsForKeyTest.java  |  14 +-
 15 files changed, 494 insertions(+), 168 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 872a6f25..c74e9f27 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -86,6 +86,8 @@ import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH;
@@ -123,9 +125,34 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
     private InMemorySafeStore current;
 
-    public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
+    // To simulate the delay in simulatedAsyncPersist
+    private final Scheduler scheduler;
+    private static <T> FieldPersister<T> 
simulatedAsyncPersistFactory(Scheduler scheduler)
     {
-        super(id, time, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
+        return (commandStore, toPersist) -> simulatedAsyncPersist(scheduler, 
commandStore, toPersist);
+    }
+
+    private static <T> AsyncResult<?> simulatedAsyncPersist(Scheduler 
scheduler, CommandStore store, T toPersist)
+    {
+        AsyncResult.Settable<?> result = AsyncResults.settable();
+        scheduler.once(() -> result.trySuccess(null), 100, 
TimeUnit.MICROSECONDS);
+        return result;
+    }
+
+    public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
+    {
+        super(id,
+              time,
+              agent,
+              store,
+              progressLogFactory,
+              listenersFactory,
+              epochUpdateHolder,
+              simulatedAsyncPersistFactory(scheduler),
+              simulatedAsyncPersistFactory(scheduler),
+              simulatedAsyncPersistFactory(scheduler),
+              simulatedAsyncPersistFactory(scheduler));
+        this.scheduler = scheduler;
     }
 
     protected boolean canExposeUnloaded()
@@ -367,57 +394,59 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     }
 
     @Override
-    public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, 
Ranges ranges)
+    public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore, TxnId 
syncId, Ranges ranges)
     {
-        super.markShardDurable(safeStore, syncId, ranges);
-        markShardDurable(syncId, ranges);
-    }
-
-    private void markShardDurable(TxnId syncId, Ranges ranges)
-    {
-        if (!rangeCommands.containsKey(syncId))
-            historicalRangeCommands.merge(syncId, ranges, Ranges::with);
-
-        // TODO (now): apply on retrieval
-        historicalRangeCommands.entrySet().removeIf(next -> 
next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges));
-        rangeCommands.entrySet().removeIf(tx -> {
-            if (tx.getKey().compareTo(syncId) >= 0)
-                return false;
-            Ranges newRanges = tx.getValue().ranges.without(ranges);
-            if (!newRanges.isEmpty())
-            {
-                tx.getValue().ranges = newRanges;
-                return false;
-            }
-            else
-            {
-                maxRedundant = Timestamp.nonNullOrMax(maxRedundant, 
tx.getValue().command.value().executeAt());
-                return true;
-            }
-        });
-
-        // verify we're clearing the progress log
-        ((Node)time).scheduler().once(() -> {
-            DefaultProgressLog progressLog = (DefaultProgressLog) 
this.progressLog;
-            commands.headMap(syncId, false).forEach((id, cmd) -> {
-                Command command = cmd.value();
-                if (!command.hasBeen(PreCommitted)) return;
-                if (!command.txnId().kind().isGloballyVisible()) return;
-
-                Ranges allRanges = 
unsafeRangesForEpoch().allBetween(id.epoch(), 
command.executeAtOrTxnId().epoch());
-                boolean done = command.hasBeen(Truncated);
-                if (!done)
-                {
-                    if (redundantBefore().status(cmd.txnId, 
command.executeAtOrTxnId(), command.route()) == 
RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
-                        return;
-
-                    Route<?> route = cmd.value().route().slice(allRanges);
-                    done = !route.isEmpty() && ranges.containsAll(route);
-                }
-
-                if (done) Invariants.checkState(progressLog.get(id) == null);
-            });
-        }, 5L, TimeUnit.SECONDS);
+        // We know it completes immediately for InMemoryCommandStore
+        AsyncChain<Void> markShardDurableChain = 
super.markShardDurable(safeStore, syncId, ranges);
+        markShardDurableChain = markShardDurableChain.map(ignored ->
+         {
+             if (!rangeCommands.containsKey(syncId))
+                 historicalRangeCommands.merge(syncId, ranges, Ranges::with);
+
+             // TODO (now): apply on retrieval
+             historicalRangeCommands.entrySet().removeIf(next -> 
next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges));
+             rangeCommands.entrySet().removeIf(tx -> {
+                 if (tx.getKey().compareTo(syncId) >= 0)
+                     return false;
+                 Ranges newRanges = tx.getValue().ranges.without(ranges);
+                 if (!newRanges.isEmpty())
+                 {
+                     tx.getValue().ranges = newRanges;
+                     return false;
+                 }
+                 else
+                 {
+                     maxRedundant = Timestamp.nonNullOrMax(maxRedundant, 
tx.getValue().command.value().executeAt());
+                     return true;
+                 }
+             });
+
+             // verify we're clearing the progress log
+             ((Node)time).scheduler().once(() -> {
+                 DefaultProgressLog progressLog = (DefaultProgressLog) 
this.progressLog;
+                 commands.headMap(syncId, false).forEach((id, cmd) -> {
+                     Command command = cmd.value();
+                     if (!command.hasBeen(PreCommitted)) return;
+                     if (!command.txnId().kind().isGloballyVisible()) return;
+
+                     Ranges allRanges = 
unsafeRangesForEpoch().allBetween(id.epoch(), 
command.executeAtOrTxnId().epoch());
+                     boolean done = command.hasBeen(Truncated);
+                     if (!done)
+                     {
+                         if (redundantBefore().status(cmd.txnId, 
command.executeAtOrTxnId(), command.route()) == 
RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
+                             return;
+
+                         Route<?> route = cmd.value().route().slice(allRanges);
+                         done = !route.isEmpty() && ranges.containsAll(route);
+                     }
+
+                     if (done) Invariants.checkState(progressLog.get(id) == 
null);
+                 });
+             }, 5L, TimeUnit.SECONDS);
+             return null;
+         }, this);
+
+        return markShardDurableChain;
     }
 
     protected InMemorySafeStore createSafeStore(PreLoadContext context, 
RangesForEpoch ranges,
@@ -485,7 +514,8 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     {
         if (current != null)
             throw illegalState("Another operation is in progress or it's store 
was not cleared");
-        current = createSafeStore(context, updateRangesForEpoch());
+        RangesForEpoch rangesForEpoch = updateRangesForEpoch();
+        current = createSafeStore(context, rangesForEpoch);
         return current;
     }
 
@@ -1027,9 +1057,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         Runnable active = null;
         final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
 
-        public Synchronized(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
+        public Synchronized(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
         }
 
         private synchronized void maybeRun()
@@ -1119,9 +1149,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         private Thread thread; // when run in the executor this will be 
non-null, null implies not running in this store
         private final ExecutorService executor;
 
-        public SingleThread(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
+        public SingleThread(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
             this.executor = Executors.newSingleThreadExecutor(r -> {
                 Thread thread = new Thread(r);
                 thread.setName(CommandStore.class.getSimpleName() + '[' + 
time.id() + ']');
@@ -1203,9 +1233,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             }
         }
 
-        public Debug(int id, NodeTimeService time, Agent agent, DataStore 
store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
+        public Debug(int id, NodeTimeService time, Agent agent, DataStore 
store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index fdb94d8a..9c7f56e0 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -18,41 +18,45 @@
 
 package accord.impl;
 
-import accord.api.LocalListeners;
-import accord.local.*;
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.LocalListeners;
 import accord.api.ProgressLog;
+import accord.api.Scheduler;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.NodeTimeService;
+import accord.local.ShardDistributor;
 import accord.utils.RandomSource;
 
 public class InMemoryCommandStores
 {
     public static class Synchronized extends CommandStores
     {
-        public Synchronized(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
+        public Synchronized(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, Scheduler scheduler)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new, 
scheduler);
         }
     }
 
     public static class SingleThread extends CommandStores
     {
-        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, Scheduler scheduler)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new, 
scheduler);
         }
 
-        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, CommandStore.Factory shardFactory)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, shardFactory);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, shardFactory, scheduler);
         }
     }
 
     public static class Debug extends InMemoryCommandStores.SingleThread
     {
-        public Debug(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, LocalListeners.Factory listenersFactory)
+        public Debug(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler 
scheduler)
         {
-            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new, 
scheduler);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index cae6da0e..fe2fa9ed 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -142,12 +142,12 @@ class Bootstrap
             // we fix here the ranges we use for the synthetic command, even 
though we may end up only finishing a subset
             // of these ranges as part of this attempt
             Ranges commitRanges = valid;
-            store.markBootstrapping(safeStore0, globalSyncId, valid);
-            CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
-               // ATM all known implementations store ranges in-memory, but 
this will not be true soon, so this will need to be addressed
-               .flatMap(syncPoint -> node.withEpoch(epoch, () -> 
store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), 
KeyHistory.COMMANDS), safeStore1 -> {
-                   if (valid.isEmpty()) // we've lost ownership of the range
-                       return AsyncResults.success(Ranges.EMPTY);
+            store.markBootstrapping(safeStore0.commandStore(), globalSyncId, 
valid).flatMap(ignore ->
+                CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
+                   // ATM all known implementations store ranges in-memory, 
but this will not be true soon, so this will need to be addressed
+                   .flatMap(syncPoint -> node.withEpoch(epoch, () -> 
store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), 
KeyHistory.COMMANDS), safeStore1 -> {
+                       if (valid.isEmpty()) // we've lost ownership of the 
range
+                           return AsyncResults.success(Ranges.EMPTY);
 
                    
Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, 
valid);
                    
safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, 
safeStore1);
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index a6326a8a..ab91b32c 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -52,20 +52,47 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSortedMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
+import accord.api.ConfigurationService.EpochReady;
+import accord.api.DataStore;
+import accord.api.LocalListeners;
+import accord.api.ProgressLog;
+import accord.api.Scheduler;
+import accord.api.VisibleForImplementationTesting;
+import accord.coordinate.CollectCalculatedDeps;
+import accord.local.Command.WaitingOn;
+import accord.local.CommandStores.RangesForEpoch;
 import accord.primitives.FullRoute;
+import accord.primitives.KeyDeps;
+import accord.primitives.Keys;
 import accord.primitives.Participants;
+import accord.primitives.Range;
 import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
+import accord.primitives.Routables;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
+import accord.utils.DeterministicIdentitySet;
+import accord.utils.Invariants;
+import accord.utils.ReducingRangeMap;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import org.agrona.collections.Int2ObjectHashMap;
 
@@ -76,13 +103,17 @@ import static accord.local.PreLoadContext.empty;
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Invariants.checkState;
 import static accord.utils.Invariants.illegalState;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Single threaded internal shard of accord transaction metadata
  */
 public abstract class CommandStore implements AgentExecutor
 {
+    public static final Logger logger = 
LoggerFactory.getLogger(CommandStore.class);
+
     static class EpochUpdate
     {
         final RangesForEpoch newRangesForEpoch;
@@ -138,7 +169,8 @@ public abstract class CommandStore implements AgentExecutor
                             DataStore store,
                             ProgressLog.Factory progressLogFactory,
                             LocalListeners.Factory listenersFactory,
-                            EpochUpdateHolder rangesForEpoch);
+                            EpochUpdateHolder rangesForEpoch,
+                            Scheduler scheduler);
     }
 
     private static final ThreadLocal<CommandStore> CURRENT_STORE = new 
ThreadLocal<>();
@@ -151,9 +183,9 @@ public abstract class CommandStore implements AgentExecutor
     protected final LocalListeners listeners;
     protected final EpochUpdateHolder epochUpdateHolder;
 
-    // TODO (expected): schedule regular pruning of these collections
-    // bootstrapBeganAt and shardDurableAt are both canonical data sets mostly 
used for debugging / constructing
-    private NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); // additive (i.e. once 
inserted, rolled-over until invalidated, and the floor entry contains additions)
+    // Used in markShardStale to make sure the staleness includes in progresss 
bootstraps
+    private transient NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); // additive (i.e. once 
inserted, rolled-over until invalidated, and the floor entry contains additions)
+
     private RedundantBefore redundantBefore = RedundantBefore.EMPTY;
     // TODO (expected): store this only once per node
     private DurableBefore durableBefore = DurableBefore.EMPTY;
@@ -180,7 +212,22 @@ public abstract class CommandStore implements AgentExecutor
     private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new 
DeterministicIdentitySet<>());
     @Nullable private ReducingRangeMap<Timestamp> rejectBefore;
 
-    protected CommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder)
+    private final PersistentField<DurableBefore, DurableBefore> 
durableBeforePersistentField;
+    private final PersistentField<RedundantBefore, RedundantBefore> 
redundantBeforePersistentField;
+    private final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, 
Ranges>> bootstrapBeganAtPersistentField;
+    private final PersistentField<NavigableMap<Timestamp, Ranges>, 
NavigableMap<Timestamp, Ranges>> safeToReadPersistentField;
+
+    protected CommandStore(int id,
+                           NodeTimeService time,
+                           Agent agent,
+                           DataStore store,
+                           ProgressLog.Factory progressLogFactory,
+                           LocalListeners.Factory listenersFactory,
+                           EpochUpdateHolder epochUpdateHolder,
+                           FieldPersister<DurableBefore> persistDurableBefore,
+                           FieldPersister<RedundantBefore> 
persistRedundantBefore,
+                           FieldPersister<NavigableMap<TxnId, Ranges>> 
persistBootstrapBeganAt,
+                           FieldPersister<NavigableMap<Timestamp, Ranges>> 
persistSafeToReadAt)
     {
         this.id = id;
         this.time = time;
@@ -189,6 +236,11 @@ public abstract class CommandStore implements AgentExecutor
         this.progressLog = progressLogFactory.create(this);
         this.listeners = listenersFactory.create(this);
         this.epochUpdateHolder = epochUpdateHolder;
+        this.durableBeforePersistentField = new 
PersistentField<>(this::durableBefore, DurableBefore::merge, 
persistDurableBefore, durableBefore -> setDurableBefore(durableBefore));
+        this.redundantBeforePersistentField = new 
PersistentField<>(this::redundantBefore, RedundantBefore::merge, 
persistRedundantBefore, redundantBefore -> setRedundantBefore(redundantBefore));
+        this.bootstrapBeganAtPersistentField = new 
PersistentField<>(this::bootstrapBeganAt, CommandStore::bootstrap, 
persistBootstrapBeganAt, bootstrapBeganAt -> 
setBootstrapBeganAt(bootstrapBeganAt));
+        this.safeToReadPersistentField = new 
PersistentField<>(this::safeToRead, null, persistSafeToReadAt, safeToRead -> 
setSafeToRead(safeToRead));
+
     }
 
     public final int id()
@@ -209,12 +261,18 @@ public abstract class CommandStore implements 
AgentExecutor
             return rangesForEpoch;
 
         update = epochUpdateHolder.getAndSet(null);
+
         if (!update.addGlobalRanges.isEmpty())
+            // Intentionally don't care if this persists since it will be 
replayed from topology at startup
             setDurableBefore(DurableBefore.merge(durableBefore, 
DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)));
+
         if (update.addRedundantBefore.size() > 0)
+            // Intentionally don't care if this persists since it will be 
replayed from topology at startup
             setRedundantBefore(RedundantBefore.merge(redundantBefore, 
update.addRedundantBefore));
+
         if (update.newRangesForEpoch != null)
             rangesForEpoch = update.newRangesForEpoch;
+
         return rangesForEpoch;
     }
 
@@ -244,14 +302,13 @@ public abstract class CommandStore implements 
AgentExecutor
         this.rejectBefore = newRejectBefore;
     }
 
-    /**
-     * To be overridden by implementations, to ensure the new state is 
persisted
-     *
-     * TODO (required): consider handling asynchronicity of persistence
-     *  (could leave to impls to call this parent method once persisted)
-     * TODO (desired): compact Ranges, merging overlaps
-     */
-    protected void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
+    protected AsyncResult<?> mergeAndUpdateBootstrapBeganAt(BootstrapSyncPoint 
globalSyncPoint)
+    {
+        return bootstrapBeganAtPersistentField.mergeAndUpdate(globalSyncPoint, 
null, null, false);
+    }
+
+    // This will not work correctly if called outside PersistentField without 
remerging
+    protected final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
     {
         this.bootstrapBeganAt = newBootstrapBeganAt;
     }
@@ -261,20 +318,26 @@ public abstract class CommandStore implements 
AgentExecutor
         return durableBefore;
     }
 
-    /**
-     * To be overridden by implementations, to ensure the new state is 
persisted.
-     */
-    public void setDurableBefore(DurableBefore durableBefore)
+    public final AsyncResult<?> mergeAndUpdateDurableBefore(DurableBefore 
newDurableBefore)
     {
-        this.durableBefore = durableBefore;
+        return durableBeforePersistentField.mergeAndUpdate(newDurableBefore, 
null, null, true);
     }
 
-    /**
-     * To be overridden by implementations, to ensure the new state is 
persisted.
-     */
-    protected void setRedundantBefore(RedundantBefore newRedundantBefore)
+    // For implementations to use after persistence
+    protected final void setDurableBefore(DurableBefore newDurableBefore)
+    {
+        durableBefore = newDurableBefore;
+    }
+
+    protected final AsyncResult<?> 
mergeAndUpdateRedundantBefore(RedundantBefore newRedundantBefore, Timestamp 
gcBefore, Ranges updatedRanges)
+    {
+        return 
redundantBeforePersistentField.mergeAndUpdate(newRedundantBefore, gcBefore, 
updatedRanges, true);
+    }
+
+    // For implementations to use after persistence
+    protected final void setRedundantBefore(RedundantBefore newRedundantBefore)
     {
-        this.redundantBefore = newRedundantBefore;
+        redundantBefore = newRedundantBefore;
     }
 
     /**
@@ -296,10 +359,17 @@ public abstract class CommandStore implements 
AgentExecutor
         setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt));
     }
 
+    protected AsyncResult<?> 
mergeAndUpdateSafeToRead(Function<NavigableMap<Timestamp, Ranges>, 
NavigableMap<Timestamp, Ranges>> computeNewValue)
+    {
+        // The input values are bound into the merge function to satisfy the 
fact that there are two different sets of inputs types to the merge function
+        // depending on whether it is purgeHistory or purgeAndInsert
+        return safeToReadPersistentField.mergeAndUpdate(computeNewValue);
+    }
+
     /**
      * This method may be invoked on a non-CommandStore thread
      */
-    protected synchronized void setSafeToRead(NavigableMap<Timestamp, Ranges> 
newSafeToRead)
+    protected final synchronized void setSafeToRead(NavigableMap<Timestamp, 
Ranges> newSafeToRead)
     {
         this.safeToRead = newSafeToRead;
     }
@@ -313,13 +383,15 @@ public abstract class CommandStore implements 
AgentExecutor
         setRejectBefore(newRejectBefore);
     }
 
-    public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges)
+    public final AsyncChain<?> 
markExclusiveSyncPointLocallyApplied(CommandStore commandStore, TxnId txnId, 
Ranges ranges)
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint);
         RedundantBefore newRedundantBefore = 
RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, 
TxnId.NONE, TxnId.NONE));
-        setRedundantBefore(newRedundantBefore);
-        updatedRedundantBefore(safeStore, txnId, ranges);
+        AsyncResult<?> setRedundantBeforeChain = 
mergeAndUpdateRedundantBefore(newRedundantBefore, txnId, ranges);
+        return setRedundantBeforeChain.flatMap(
+                   ignored -> commandStore.execute(contextFor(txnId),
+                       safeStore -> updatedRedundantBefore(safeStore, txnId, 
ranges)));
     }
 
     /**
@@ -509,27 +581,35 @@ public abstract class CommandStore implements 
AgentExecutor
         bootstraps.remove(bootstrap);
     }
 
-    final void markBootstrapping(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges)
+    final AsyncChain<?> markBootstrapping(CommandStore commandStore, TxnId 
globalSyncId, Ranges ranges)
     {
         store.snapshot();
-        setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt));
+        AsyncResult<?> setBootstrapBeganAtResult = 
mergeAndUpdateBootstrapBeganAt(new BootstrapSyncPoint(globalSyncId, ranges));
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId);
-        setRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore));
+        // TODO (review): What is the correct txnId to provide here to 
restrict what memtables are flushed?
+        AsyncResult<?> setRedundantBeforeResult = 
mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore), globalSyncId, ranges);
         DurableBefore addDurableBefore = DurableBefore.create(ranges, 
TxnId.NONE, TxnId.NONE);
-        setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore));
-        updatedRedundantBefore(safeStore, globalSyncId, ranges);
+        AsyncResult<?> setDurableBeforeResult = 
mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, 
addDurableBefore));
+        AsyncChain<?> combinedChain = 
AsyncChains.allOf(ImmutableList.of(setBootstrapBeganAtResult, 
setRedundantBeforeResult, setDurableBeforeResult));
+        return combinedChain.flatMap(
+                   ignored -> 
commandStore.execute(PreLoadContext.contextFor(globalSyncId),
+                       safeStore -> updatedRedundantBefore(safeStore, 
globalSyncId, ranges)));
     }
 
     // TODO (expected): we can immediately truncate dependencies locally once 
an exclusiveSyncPoint applies, we don't need to wait for the whole shard
-    public void markShardDurable(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges)
+    public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore0, 
TxnId globalSyncId, Ranges ranges)
     {
         store.snapshot();
-        ranges = 
ranges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal);
+        ranges = 
ranges.slice(safeStore0.ranges().allUntil(globalSyncId.epoch()), Minimal);
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE);
-        setRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore));
+        AsyncResult<?> setRedundantBeforeChain = 
mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore), globalSyncId, ranges);
         DurableBefore addDurableBefore = DurableBefore.create(ranges, 
globalSyncId, globalSyncId);
-        setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore));
-        updatedRedundantBefore(safeStore, globalSyncId, ranges);
+        AsyncResult<?> setDurableBeforeChain = 
mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, 
addDurableBefore));
+        Ranges slicedRanges = ranges;
+        AsyncChain<?> combinedChain = 
AsyncChains.allOf(ImmutableList.of(setRedundantBeforeChain, 
setDurableBeforeChain));
+        return combinedChain.flatMap(
+                   ignored -> 
safeStore0.commandStore().execute(PreLoadContext.contextFor(globalSyncId),
+                       safeStore1 -> updatedRedundantBefore(safeStore1, 
globalSyncId, slicedRanges)));
     }
 
     protected void updatedRedundantBefore(SafeCommandStore safeStore, TxnId 
syncId, Ranges ranges)
@@ -556,19 +636,34 @@ public abstract class CommandStore implements 
AgentExecutor
         agent.onStale(staleSince, ranges);
 
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast);
-        setRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore));
+        // TODO (review): Is it ok for this to be asynchronous here?
+        // TODO (review): Is stale since the right txnid here?
+        mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore), staleSince, ranges).addCallback(agent);
         // find which ranges need to bootstrap, subtracting those already in 
progress that cover the id
 
         markUnsafeToRead(ranges);
     }
 
     // MUST be invoked before CommandStore reference leaks to anyone
+    // The integration may have already loaded persisted values for these 
fields before this is called
+    // so it must be a merge for each field with the initialization values. 
These starting values don't need to be
+    // persisted since we can synthesize them at startup every time
+    // TODO (review): This needs careful thought about not persisting and that 
purgeAndInsert is doing the right thing
+    // with safeToRead
     Supplier<EpochReady> initialise(long epoch, Ranges ranges)
     {
+        // Merge in a base for any ranges that needs to be covered
         DurableBefore addDurableBefore = DurableBefore.create(ranges, 
TxnId.NONE, TxnId.NONE);
         setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore));
-        setBootstrapBeganAt(ImmutableSortedMap.of(TxnId.NONE, ranges));
-        setSafeToRead(ImmutableSortedMap.of(Timestamp.NONE, ranges));
+        // TODO (review): Convoluted check to not overwrite existing 
bootstraps with TxnId.NONE
+        // If loading from disk didn't finish before this then we might 
initialize the range at TxnId.NONE?
+        // Does CommandStores.topology ensure that doesn't happen? Is it fine 
if it does because it will get superseded?
+        Ranges newBootstrapRanges = ranges;
+        for (Ranges existing : bootstrapBeganAt.values())
+            newBootstrapRanges = newBootstrapRanges.without(existing);
+        if (!newBootstrapRanges.isEmpty())
+            bootstrapBeganAt = bootstrap(new BootstrapSyncPoint(TxnId.NONE, 
newBootstrapRanges), bootstrapBeganAt);
+        safeToRead = purgeAndInsert(safeToRead, TxnId.NONE, ranges);
         return () -> new EpochReady(epoch, DONE, DONE, DONE, DONE);
     }
 
@@ -589,7 +684,7 @@ public abstract class CommandStore implements AgentExecutor
     }
 
     @VisibleForImplementationTesting
-    public NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return 
bootstrapBeganAt; }
+    public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return 
bootstrapBeganAt; }
 
     @VisibleForImplementationTesting
     public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; }
@@ -624,9 +719,9 @@ public abstract class CommandStore implements AgentExecutor
                 Keys prev = partiallyBootstrapping.get(txnIdx);
                 Keys remaining = prev;
                 if (remaining == null) remaining = 
builder.directKeyDeps.participatingKeys(txnIdx);
-                else Invariants.checkState(!remaining.isEmpty());
+                else checkState(!remaining.isEmpty());
                 remaining = remaining.without(range);
-                if (prev == null) Invariants.checkState(!remaining.isEmpty());
+                if (prev == null) checkState(!remaining.isEmpty());
                 partiallyBootstrapping.put(txnIdx, remaining);
                 return remaining.isEmpty();
             }
@@ -689,9 +784,9 @@ public abstract class CommandStore implements AgentExecutor
                 Ranges prev = partiallyBootstrapping.get(rangeTxnIdx);
                 Ranges remaining = prev;
                 if (remaining == null) remaining = 
builder.directRangeDeps.ranges(rangeTxnIdx);
-                else Invariants.checkState(!remaining.isEmpty());
+                else checkState(!remaining.isEmpty());
                 remaining = remaining.without(Ranges.of(range));
-                if (prev == null) Invariants.checkState(!remaining.isEmpty());
+                if (prev == null) checkState(!remaining.isEmpty());
                 partiallyBootstrapping.put(rangeTxnIdx, remaining);
                 return remaining.isEmpty();
             }
@@ -745,18 +840,35 @@ public abstract class CommandStore implements 
AgentExecutor
     final synchronized void markUnsafeToRead(Ranges ranges)
     {
         if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges)))
-            setSafeToRead(purgeHistory(safeToRead, ranges));
+            mergeAndUpdateSafeToRead(safeToRead -> purgeHistory(safeToRead, 
ranges));
     }
 
     final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp 
at, Ranges ranges)
     {
         Ranges validatedSafeToRead = 
redundantBefore.validateSafeToRead(forBootstrapAt, ranges);
-        setSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead));
+        mergeAndUpdateSafeToRead(safeToRead -> purgeAndInsert(safeToRead, at, 
validatedSafeToRead));
     }
 
-    private static <T extends Timestamp> ImmutableSortedMap<T, Ranges> 
bootstrap(T at, Ranges ranges, NavigableMap<T, Ranges> bootstrappedAt)
+    protected static class BootstrapSyncPoint
     {
-        Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0);
+        TxnId syncTxnId;
+        Ranges ranges;
+
+        protected BootstrapSyncPoint(TxnId syncTxnId, Ranges ranges)
+        {
+            this.syncTxnId = syncTxnId;
+            this.ranges = ranges;
+        }
+    }
+
+    protected static ImmutableSortedMap<TxnId, Ranges> 
bootstrap(BootstrapSyncPoint syncPoint, NavigableMap<TxnId, Ranges> 
bootstrappedAt)
+    {
+        TxnId at = syncPoint.syncTxnId;
+        Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0 || 
syncPoint.syncTxnId == TxnId.NONE);
+        if (syncPoint.syncTxnId == TxnId.NONE)
+            for (Ranges ranges : bootstrappedAt.values())
+                checkState(!syncPoint.ranges.intersects(ranges));
+        Ranges ranges = syncPoint.ranges;
         Invariants.checkArgument(!ranges.isEmpty());
         // if we're bootstrapping these ranges, then any period we previously 
owned the ranges for is effectively invalidated
         return purgeAndInsert(bootstrappedAt, at, ranges);
@@ -793,4 +905,101 @@ public abstract class CommandStore implements 
AgentExecutor
             return in;
         return new SimpleImmutableEntry<>(in.getKey(), without);
     }
+
+    protected interface FieldPersister<T>
+    {
+        default AsyncResult<?> persist(CommandStore store, Timestamp 
timestamp, Ranges ranges, T toPersist)
+        {
+            return persist(store, toPersist);
+        }
+
+        AsyncResult<?> persist(CommandStore store, T toPersist);
+    }
+
+    // A helper class for implementing fields that needs to be asynchronously 
persisted and concurrent updates
+    // need to be merged and ordered
+    protected class PersistentField<I, T>
+    {
+        @Nonnull
+        private final Supplier<T> currentValue;
+        // The update can be bound into the merge function in which case it 
will be null
+        // Useful when the merge/update function takes multiple types of input 
arguments
+        @Nullable
+        private final BiFunction<I, T, T> merge;
+        @Nonnull
+        private final FieldPersister<T> persister;
+        @Nonnull
+        private final Consumer<T> set;
+
+        private T pendingValue;
+        private AsyncResult<?> pendingResult;
+
+        public PersistentField(@Nonnull Supplier<T> currentValue, @Nonnull 
BiFunction<I, T, T> merge, @Nonnull FieldPersister<T> persist, @Nullable 
Consumer<T> set)
+        {
+            checkNotNull(currentValue, "currentValue cannot be null");
+            checkNotNull(persist, "persist cannot be null");
+            checkNotNull(set, "set cannot be null");
+            this.currentValue = currentValue;
+            this.merge = merge;
+            this.persister = persist;
+            this.set = set;
+        }
+
+        public AsyncResult<?> mergeAndUpdate(@Nonnull I inputValue, @Nullable 
Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean 
remergeAfterPersistence)
+        {
+            checkNotNull(merge, "merge cannot be null");
+            checkNotNull(inputValue, "inputValue cannot be null");
+            return mergeAndUpdate(inputValue, merge, gcBefore, updatedRanges, 
remergeAfterPersistence);
+        }
+
+        public AsyncResult<?> mergeAndUpdate(@Nonnull Function<T, T> update)
+        {
+            checkNotNull(update, "merge cannot be null");
+            return mergeAndUpdate(null, (ignored, existingValue) -> 
update.apply(existingValue), null, null, false);
+        }
+
+        private AsyncResult<?> mergeAndUpdate(@Nullable I inputValue, @Nonnull 
BiFunction<I, T, T> merge, @Nullable Timestamp gcBefore, @Nullable Ranges 
updatedRanges, boolean remergeAfterPersistence)
+        {
+            checkNotNull(merge, "merge cannot be null");
+            AsyncResult.Settable<Void> result = AsyncResults.settable();
+            AsyncResult<?> oldPendingResult = pendingResult;
+            T startingValue = currentValue.get();
+            T newValue = pendingValue != null ? merge.apply(inputValue, 
pendingValue) : merge.apply(inputValue, startingValue);
+            this.pendingResult = result;
+            this.pendingValue= newValue;
+
+            AsyncResult<?> pendingWrite = persister.persist(CommandStore.this, 
gcBefore, updatedRanges, newValue);
+
+            final T newValueFinal = newValue;
+            BiConsumer<Object, Throwable> callback = (ignored, failure) -> {
+                if (PersistentField.this.pendingResult == result)
+                {
+                    PersistentField.this.pendingResult = null;
+                    PersistentField.this.pendingValue = null;
+                }
+                if (failure != null)
+                    result.tryFailure(failure);
+                else
+                {
+                    // DurableBefore and RedundantBefore can have initial 
values set non-persistently in updateRangesForEpoch so remerge them here
+                    // updateRangesForEpoch really doesn't integrate well with 
is callers if it is asynchronous updating these values
+                    // so this complexity is better than the alternative
+                    if (remergeAfterPersistence && currentValue.get() != 
startingValue)
+                        // I and T will have to be the same for remerge to work
+                        set.accept(merge.apply((I)newValueFinal, 
currentValue.get()));
+                    else
+                        set.accept(newValueFinal);
+                    result.trySuccess(null);
+                }
+            };
+
+            // Order completion after previous updates, this is probably 
stricter than necessary but easy to implement
+            if (oldPendingResult != null)
+                oldPendingResult.addCallback(() -> 
pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent));
+            else
+                
pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent);
+
+            return result;
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 55a923a3..4c8efabc 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -29,8 +29,11 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import javax.annotation.Nonnull;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.ConfigurationService.EpochReady;
@@ -39,6 +42,7 @@ import accord.api.Key;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
+import accord.api.Scheduler;
 import accord.local.CommandStore.EpochUpdateHolder;
 import accord.primitives.EpochSupplier;
 import accord.primitives.Participants;
@@ -56,14 +60,9 @@ import accord.utils.MapReduceConsume;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
-import javax.annotation.Nonnull;
-
 import org.agrona.collections.Hashing;
 import org.agrona.collections.Int2ObjectHashMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static accord.api.ConfigurationService.EpochReady.done;
 import static accord.local.PreLoadContext.empty;
 import static accord.primitives.Routables.Slice.Minimal;
@@ -87,7 +86,8 @@ public abstract class CommandStores
                              RandomSource random,
                              ShardDistributor shardDistributor,
                              ProgressLog.Factory progressLogFactory,
-                             LocalListeners.Factory listenersFactory);
+                             LocalListeners.Factory listenersFactory,
+                             Scheduler scheduler);
     }
 
     private static class StoreSupplier
@@ -99,8 +99,9 @@ public abstract class CommandStores
         private final LocalListeners.Factory listenersFactory;
         private final CommandStore.Factory shardFactory;
         private final RandomSource random;
+        private final Scheduler scheduler;
 
-        StoreSupplier(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
+        StoreSupplier(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, 
Scheduler scheduler)
         {
             this.time = time;
             this.agent = agent;
@@ -109,11 +110,12 @@ public abstract class CommandStores
             this.progressLogFactory = progressLogFactory;
             this.listenersFactory = listenersFactory;
             this.shardFactory = shardFactory;
+            this.scheduler = scheduler;
         }
 
         CommandStore create(int id, EpochUpdateHolder rangesForEpoch)
         {
-            return shardFactory.create(id, time, agent, this.store, 
progressLogFactory, listenersFactory, rangesForEpoch);
+            return shardFactory.create(id, time, agent, this.store, 
progressLogFactory, listenersFactory, rangesForEpoch, scheduler);
         }
     }
 
@@ -367,9 +369,9 @@ public abstract class CommandStores
     }
 
     public CommandStores(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor,
-                         ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
+                         ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, 
Scheduler scheduler)
     {
-        this(new StoreSupplier(time, agent, store, random, progressLogFactory, 
listenersFactory, shardFactory), shardDistributor);
+        this(new StoreSupplier(time, agent, store, random, progressLogFactory, 
listenersFactory, shardFactory, scheduler), shardDistributor);
     }
 
     public Topology local()
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 1ae4c582..956edf77 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -623,7 +623,7 @@ public class Commands
         if (command.status() != Stable && command.status() != PreApplied)
         {
             if (alwaysNotifyListeners)
-                safeStore.notifyListeners(safeCommand, command);
+                safeStore0.notifyListeners(safeCommand0, command);
             return false;
         }
 
@@ -631,10 +631,10 @@ public class Commands
         if (waitingOn.isWaiting())
         {
             if (alwaysNotifyListeners)
-                safeStore.notifyListeners(safeCommand, command);
+                safeStore0.notifyListeners(safeCommand0, command);
 
             if (notifyWaitingOn && waitingOn.isWaitingOnCommand())
-                new NotifyWaitingOn(safeCommand).accept(safeStore);
+                new NotifyWaitingOn(safeCommand0).accept(safeStore0);
             return false;
         }
 
@@ -647,23 +647,23 @@ public class Commands
                 // TODO (required): we can have dangling transactions in some 
cases when proposing in a future epoch but
                 //   later deciding on an earlier epoch. We should probably 
turn this into an erased vestigial command,
                 //   but we should tighten up our semantics there in general.
-                safeCommand.readyToExecute(safeStore);
+                safeCommand0.readyToExecute(safeStore0);
                 logger.trace("{}: set to ReadyToExecute", command.txnId());
-                safeStore.notifyListeners(safeCommand, command);
+                safeStore0.notifyListeners(safeCommand0, command);
                 return true;
 
             case PreApplied:
-                Ranges executeRanges = executeRanges(safeStore, 
command.executeAt());
+                Ranges executeRanges = executeRanges(safeStore0, 
command.executeAt());
                 Command.Executed executed = command.asExecuted();
                 boolean intersects = 
executed.writes().keys.intersects(executeRanges);
 
                 if (intersects)
                 {
                     // TODO (now): we should set applying within apply to 
avoid applying multiple times
-                    safeCommand.applying(safeStore);
-                    safeStore.notifyListeners(safeCommand, command);
+                    safeCommand0.applying(safeStore0);
+                    safeStore0.notifyListeners(safeCommand0, command);
                     logger.trace("{}: applying", command.txnId());
-                    apply(safeStore, executed);
+                    apply(safeStore0, executed);
                     return true;
                 }
                 else
@@ -671,8 +671,26 @@ public class Commands
                     // TODO (desirable, performance): This could be performed 
immediately upon Committed
                     //      but: if we later support transitive dependency 
elision this could be dangerous
                     logger.trace("{}: applying no-op", command.txnId());
-                    safeCommand.applied(safeStore);
-                    safeStore.notifyListeners(safeCommand, command);
+                    if (command.txnId().kind() == ExclusiveSyncPoint)
+                    {
+                        Ranges ranges = 
safeStore0.ranges().allAt(command.txnId().epoch());
+                        ranges = command.route().slice(ranges, 
Minimal).participants().toRanges();
+                        CommandStore commandStore = safeStore0.commandStore();
+                        safeCommand0.applying(safeStore0);
+                        safeStore0.notifyListeners(safeCommand0, command);
+                        
commandStore.markExclusiveSyncPointLocallyApplied(commandStore, 
command.txnId(), ranges).flatMap(ignored ->
+                            
commandStore.execute(PreLoadContext.contextFor(command.txnId()), safeStore1 -> {
+                                    SafeCommand safeCommand1 = 
safeStore1.get(command.txnId());
+                                    safeCommand1.applied(safeStore1);
+                                    safeStore1.notifyListeners(safeCommand1, 
command);
+                                })
+                        ).begin(commandStore.agent);
+                    }
+                    else
+                    {
+                        safeCommand0.applied(safeStore0);
+                        safeStore0.notifyListeners(safeCommand0, command);
+                    }
                     return true;
                 }
             default:
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index f008edf1..22e300f7 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -192,7 +192,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         this.now = new 
AtomicReference<>(Timestamp.fromValues(topology.epoch(), 
nowSupplier.getAsLong(), id));
         this.agent = agent;
         this.random = random;
-        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this));
+        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this), scheduler);
         // TODO review these leak a reference to an object that hasn't 
finished construction, possibly to other threads
         configService.registerListener(this);
     }
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java 
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index 3fa39321..61a08bca 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -18,8 +18,8 @@
 
 package accord.messages;
 
-import accord.local.PreLoadContext;
 import accord.local.DurableBefore;
+import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.primitives.TxnId;
 
@@ -51,7 +51,8 @@ public class SetGloballyDurable extends 
AbstractEpochRequest<SimpleReply>
     {
         DurableBefore cur = safeStore.commandStore().durableBefore();
         DurableBefore upd = DurableBefore.merge(durableBefore, cur);
-        safeStore.commandStore().setDurableBefore(upd);
+        // This is done asynchronously
+        
safeStore.commandStore().mergeAndUpdateDurableBefore(upd).begin(node.agent());
         return Ok;
     }
 
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java 
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index 09c7b236..c0c39069 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -46,7 +46,7 @@ public class SetShardDurable extends 
AbstractEpochRequest<SimpleReply>
     @Override
     public SimpleReply apply(SafeCommandStore safeStore)
     {
-        safeStore.commandStore().markShardDurable(safeStore, 
exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges);
+        safeStore.commandStore().markShardDurable(safeStore, 
exclusiveSyncPoint.syncId, 
(Ranges)exclusiveSyncPoint.keysOrRanges).begin(node.agent());
         return Ok;
     }
 
diff --git a/accord-core/src/main/java/accord/utils/TriConsumer.java 
b/accord-core/src/main/java/accord/utils/TriConsumer.java
new file mode 100644
index 00000000..72d1ec03
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/TriConsumer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+public interface TriConsumer<P1, P2, P3>
+{
+    void consume(P1 p1, P2 p2, P3 p3);
+}
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index 66f97f16..a642321b 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -21,6 +21,7 @@ package accord.utils.async;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -48,6 +49,16 @@ public interface AsyncChain<V>
         return AsyncChains.flatMap(this, mapper, executor);
     }
 
+    default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper, 
BooleanSupplier inExecutor, Executor executor)
+    {
+        return flatMap(input -> {
+            if (inExecutor.getAsBoolean())
+                return AsyncChains.success(mapper.apply(input));
+            else
+                return AsyncChains.ofCallable(executor, () -> 
mapper.apply(input));
+        });
+    }
+
     /**
      * When the chain has failed, this allows the chain to attempt to recover 
if possible.  The provided function may return a {@code null} to represent
      * that recovery was not possible and that the original exception should 
propgate.
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 4329114f..60f59895 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -31,7 +31,6 @@ import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
-
 import javax.annotation.Nullable;
 
 import org.junit.jupiter.api.Assertions;
@@ -65,6 +64,7 @@ import accord.utils.AccordGens;
 import accord.utils.RandomSource;
 import accord.utils.RandomTestRunner;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResults;
 import org.agrona.collections.IntHashSet;
 import org.agrona.collections.ObjectHashSet;
 
@@ -384,7 +384,18 @@ public class RemoteListenersTest
 
         protected TestCommandStore(int id)
         {
-            super(id, null, null, null, ignore -> new 
ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}), 
DefaultLocalListeners.DefaultNotifySink.INSTANCE), new EpochUpdateHolder());
+            super(id,
+                  null,
+                  null,
+                  null,
+                  ignore -> new ProgressLog.NoOpProgressLog(),
+                  ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}),
+                                                      
DefaultLocalListeners.DefaultNotifySink.INSTANCE),
+                  new EpochUpdateHolder(),
+                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID,
+                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID,
+                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID,
+                  (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID);
             this.storeId = id;
         }
 
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 0de6debc..b583b70b 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -40,6 +40,8 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
+import accord.api.Result;
+import accord.api.Scheduler;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.InMemorySafeCommand;
@@ -108,15 +110,15 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         return false;
     }
 
-    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, SimulatedDelayedExecutorService executorService, 
BooleanSupplier isLoadedCheck, Journal journal)
+    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, SimulatedDelayedExecutorService executorService, 
BooleanSupplier isLoadedCheck, Journal journal, Scheduler scheduler)
     {
-        super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, 
DelayedCommandStore.factory(executorService, isLoadedCheck, journal));
+        super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, 
DelayedCommandStore.factory(executorService, isLoadedCheck, journal), 
scheduler);
     }
 
     public static CommandStores.Factory factory(PendingQueue pending, 
BooleanSupplier isLoadedCheck, Journal journal)
     {
-        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory) ->
-               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenersFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal);
+        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, scheduler) ->
+               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenersFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal, 
scheduler);
     }
 
     @Override
@@ -167,9 +169,9 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         private final BooleanSupplier isLoadedCheck;
         private final Journal journal;
 
-        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, 
SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, 
Journal journal)
+        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, 
SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, 
Journal journal, Scheduler scheduler)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, scheduler);
             this.executor = executor;
             this.isLoadedCheck = isLoadedCheck;
             this.journal = journal;
@@ -200,7 +202,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
 
         private static CommandStore.Factory 
factory(SimulatedDelayedExecutorService executor, BooleanSupplier 
isLoadedCheck, Journal journal)
         {
-            return (id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch) -> new DelayedCommandStore(id, time, agent, 
store, progressLogFactory, listenersFactory, rangesForEpoch, executor, 
isLoadedCheck, journal);
+            return (id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch, scheduler) -> new DelayedCommandStore(id, 
time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, 
executor, isLoadedCheck, journal, scheduler);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java 
b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
index 4d598d9e..cd639f78 100644
--- a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
+++ b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
 
 import org.junit.jupiter.api.Test;
 
+import accord.api.Agent;
 import accord.impl.PrefixedIntHashKey;
 import accord.impl.basic.Cluster;
 import accord.impl.basic.DelayedCommandStores.DelayedCommandStore;
@@ -43,6 +44,8 @@ import accord.utils.AccordGens;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.assertj.core.api.Assertions;
 
 import static accord.local.PreLoadContext.contextFor;
@@ -67,7 +70,7 @@ class BootstrapLocalTxnTest
                 for (int storeId : on.commandStores().ids())
                 {
                     DelayedCommandStore store = (DelayedCommandStore) 
on.commandStores().forId(storeId);
-                    // this is a bit redudent but here to make the test easier 
to maintain.  Pre/Post execute we validate each command to make sure everything 
is fine
+                    // this is a bit redundent but here to make the test 
easier to maintain.  Pre/Post execute we validate each command to make sure 
everything is fine
                     // but that logic could be changed and this test has a 
dependency on validation the command... so to make this dependency explicit
                     // the test will call the validation logic within the test 
even though it will be called again in the background...
                     Consumer<Command> validate = store::validateRead;
@@ -80,23 +83,24 @@ class BootstrapLocalTxnTest
                     SyncPoint<Ranges> syncPoint = new 
SyncPoint<>(globalSyncId, Deps.NONE, ranges, route);
                     Ranges valid = AccordGens.rangesInsideRanges(ranges, (rs2, 
r) -> rs2.nextInt(1, 4)).next(rs);
                     
Invariants.checkArgument(syncPoint.keysOrRanges.containsAll(valid));
+                    Agent agent = store.agent;
                     store.execute(contextFor(localSyncId, 
syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safe -> 
Commands.createBootstrapCompleteMarkerTransaction(safe, localSyncId, valid))
                          .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
                          .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
Commands.markBootstrapComplete(safe, localSyncId, ranges)))
                          .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
                          // cleanup txn
-                         .flatMap(ignore -> 
store.submit(PreLoadContext.empty(), safe -> {
+                         .withExecutor(store).flatMap(ignore -> {
                              Cleanup target = cleanupGen.next(rs);
                              if (target == Cleanup.NO)
-                                 return Cleanup.NO;
-                             
safe.commandStore().setRedundantBefore(RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, 
TxnId.NONE));
+                                 return AsyncResults.success(Cleanup.NO);
+                             AsyncResult<?> result = 
store.mergeAndUpdateRedundantBefore(RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, 
TxnId.NONE), nextGlobalSyncId, ranges);
                              switch (target)
                              {
                                  case ERASE:
-                                     
safe.commandStore().setDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, nextGlobalSyncId));
+                                     result = result.flatMap(ignored -> 
store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, nextGlobalSyncId)).beginAsResult()).beginAsResult();
                                      break;
                                  case TRUNCATE:
-                                     
safe.commandStore().setDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, globalSyncId));
+                                     result = result.flatMap(ignored -> 
store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, globalSyncId)).beginAsResult()).beginAsResult();
                                      break;
                                  case TRUNCATE_WITH_OUTCOME:
                                  case INVALIDATE:
@@ -105,8 +109,8 @@ class BootstrapLocalTxnTest
                                  default:
                                      throw new 
UnsupportedOperationException(target.name());
                              }
-                             return target;
-                         }))
+                             return result.map(ignored -> target);
+                         })
                          // validateRead is called implicitly _on command 
completion_
                          .flatMap(target -> 
store.execute(contextFor(localSyncId), safe -> {
                              SafeCommand cmd = safe.get(localSyncId, 
route.homeKey());
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 470d23c3..608b2add 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -47,10 +47,10 @@ import accord.api.Read;
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.Update;
-import accord.impl.IntKey;
 import accord.impl.DefaultLocalListeners;
 import accord.impl.DefaultLocalListeners.DefaultNotifySink;
 import accord.impl.DefaultRemoteListeners;
+import accord.impl.IntKey;
 import accord.local.Command;
 import accord.local.Command.AbstractCommand;
 import accord.local.CommandStore;
@@ -903,7 +903,17 @@ public class CommandsForKeyTest
 
         protected TestCommandStore(int pruneInterval, int pruneHlcDelta)
         {
-            super(0, null, null, null, ignore -> new 
ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), new 
EpochUpdateHolder());
+            super(0,
+                  null,
+                  null,
+                  null,
+                  ignore -> new ProgressLog.NoOpProgressLog(),
+                  ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE),
+                  new EpochUpdateHolder(),
+                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID,
+                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID,
+                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID,
+                  (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID);
             this.pruneInterval = pruneInterval;
             this.pruneHlcDelta = pruneHlcDelta;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to