This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 8de6be8166fc1163c9f4fc78b0c252084f67eb72
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Oct 16 12:06:42 2024 +0100

    Use ExclusiveSyncPoints to join a new topology
    
    For correctness, the dependencies we adopt on joining a new topology must 
exclude the possibility of respondents accepting additional transactions with a 
lower TxnId, so proxying on the existing `ExclusiveSyncPoint` mechanisms is 
logical for the time-being. This patch removes the `FetchMajorityDeps` logic in 
favour of simply waiting for a suitable `ExclusiveSyncPoint` to be proposed.
    
    patch by Benedict, reviewed by Alex Petrov for CASSANDRA-20056
---
 .../src/main/java/accord/api/Scheduler.java        |   7 +
 ...tyScheduling.java => DurabilityScheduling.java} | 100 +++++---
 .../java/accord/impl/InMemoryCommandStore.java     |  43 +---
 .../main/java/accord/impl/MajorityDepsFetcher.java | 271 ---------------------
 .../src/main/java/accord/local/CommandStore.java   |  97 +++++---
 accord-core/src/main/java/accord/local/Node.java   |  10 +
 .../main/java/accord/local/RedundantBefore.java    |  32 ++-
 .../main/java/accord/local/RedundantStatus.java    |   1 +
 .../main/java/accord/local/SafeCommandStore.java   |  34 +--
 .../main/java/accord/local/cfk/CommandsForKey.java |  11 +-
 .../main/java/accord/local/cfk/PostProcess.java    |   3 +-
 .../java/accord/local/cfk/SafeCommandsForKey.java  |  11 +-
 .../java/accord/local/cfk/UpdateUnmanagedMode.java |  24 ++
 .../src/main/java/accord/local/cfk/Updating.java   |  99 +++-----
 .../java/accord/utils/ThreadPoolScheduler.java     |   6 +
 .../main/java/accord/utils/async/AsyncResults.java |   5 +-
 .../test/java/accord/impl/RemoteListenersTest.java |   9 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  39 +--
 .../accord/impl/basic/DelayedCommandStores.java    |   8 -
 .../src/test/java/accord/impl/basic/Journal.java   |  17 --
 .../java/accord/impl/basic/RandomDelayQueue.java   |  17 +-
 .../impl/basic/RecurringPendingRunnable.java       |   4 +-
 .../test/java/accord/impl/list/ListRequest.java    |   6 +-
 .../java/accord/local/cfk/CommandsForKeyTest.java  |  10 +-
 accord-core/src/test/resources/burn-logback.xml    |   2 +
 .../src/main/java/accord/maelstrom/Cluster.java    |  19 +-
 26 files changed, 335 insertions(+), 550 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Scheduler.java 
b/accord-core/src/main/java/accord/api/Scheduler.java
index 77a12c3d..f2511ae6 100644
--- a/accord-core/src/main/java/accord/api/Scheduler.java
+++ b/accord-core/src/main/java/accord/api/Scheduler.java
@@ -53,6 +53,12 @@ public interface Scheduler
             return CANCELLED;
         }
 
+        @Override
+        public Scheduled selfRecurring(Runnable run, long delay, TimeUnit 
units)
+        {
+            return CANCELLED;
+        }
+
         @Override
         public void now(Runnable run)
         {
@@ -70,5 +76,6 @@ public interface Scheduler
     Scheduled recurring(Runnable run, long delay, TimeUnit units);
 
     Scheduled once(Runnable run, long delay, TimeUnit units);
+    Scheduled selfRecurring(Runnable run, long delay, TimeUnit units);
     void now(Runnable run);
 }
diff --git 
a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java 
b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
similarity index 89%
rename from 
accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
rename to accord-core/src/main/java/accord/impl/DurabilityScheduling.java
index 7aa69073..10934617 100644
--- a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java
+++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
@@ -29,6 +29,7 @@ import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService;
 import accord.api.Scheduler;
 import accord.coordinate.CoordinateGloballyDurable;
 import accord.coordinate.CoordinationFailed;
@@ -46,12 +47,13 @@ import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+import org.agrona.BitUtil;
 
 import static accord.coordinate.CoordinateShardDurable.coordinate;
 import static accord.coordinate.CoordinateSyncPoint.exclusiveSyncPoint;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
-import static java.util.concurrent.TimeUnit.MINUTES;
 
 /**
  * Helper methods and classes to invoke coordination to propagate information 
about durability.
@@ -78,9 +80,9 @@ import static java.util.concurrent.TimeUnit.MINUTES;
  * TODO (expected): do not start new ExclusiveSyncPoint if we have more than X 
already agreed and not yet applied
  * Didn't go with recurring because it doesn't play well with async execution 
of these tasks
  */
-public class CoordinateDurabilityScheduling
+public class DurabilityScheduling implements ConfigurationService.Listener
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CoordinateDurabilityScheduling.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(DurabilityScheduling.class);
 
     private final Node node;
     private Scheduler.Scheduled scheduled;
@@ -129,9 +131,16 @@ public class CoordinateDurabilityScheduling
     private final Map<Range, ShardScheduler> shardSchedulers = new HashMap<>();
     private int globalIndex;
 
-    private long nextGlobalSyncTimeMicros;
+    boolean started;
     volatile boolean stop;
 
+    @Override
+    public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean 
isLoad, boolean startSync)
+    {
+        updateTopology(topology);
+        return AsyncResults.success(null);
+    }
+
     private class ShardScheduler
     {
         Shard shard;
@@ -196,6 +205,7 @@ public class CoordinateDurabilityScheduling
             if (defunct)
                 return;
 
+            Invariants.checkState(index < numberOfSplits);
             long nowMicros = node.elapsed(MICROSECONDS);
             long microsOffset = (index * shardCycleTimeMicros) / 
numberOfSplits;
             long scheduleAt = cycleStartedAtMicros + microsOffset;
@@ -206,7 +216,7 @@ public class CoordinateDurabilityScheduling
             if (numberOfSplits > targetShardSplits && index % 4 == 0)
             {
                 index /= 4;
-                numberOfSplits /=4;
+                numberOfSplits /= 4;
             }
             scheduleAt(nowMicros, scheduleAt);
         }
@@ -232,6 +242,7 @@ public class CoordinateDurabilityScheduling
             Range range;
             int nextIndex;
             {
+                Invariants.checkState(index < numberOfSplits);
                 int i = index;
                 Range selectRange = null;
                 while (selectRange == null)
@@ -241,12 +252,12 @@ public class CoordinateDurabilityScheduling
             }
 
             Runnable schedule = () -> {
-                // TODO (required): allocate stale HLC from a reservation of 
HLCs for this purpose
+                // TODO (expected): allocate stale HLC from a reservation of 
HLCs for this purpose
                 TxnId syncId = node.nextTxnId(ExclusiveSyncPoint, 
Domain.Range);
                 startShardSync(syncId, Ranges.of(range), nextIndex);
             };
             if (scheduleAt <= nowMicros) schedule.run();
-            else scheduled = node.scheduler().once(schedule, scheduleAt - 
nowMicros, MICROSECONDS);
+            else scheduled = node.scheduler().selfRecurring(schedule, 
scheduleAt - nowMicros, MICROSECONDS);
         }
 
         /**
@@ -255,7 +266,7 @@ public class CoordinateDurabilityScheduling
          */
         private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
         {
-            scheduled = node.scheduler().once(() -> 
node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> {
+            scheduled = node.scheduler().selfRecurring(() -> 
node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> {
                 if (withEpochFailure != null)
                 {
                     // don't wait on epoch failure - we aren't the cause of 
any problems
@@ -295,7 +306,7 @@ public class CoordinateDurabilityScheduling
 
         private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, 
SyncPoint<Range> exclusiveSyncPoint, int nextIndex)
         {
-            scheduled = node.scheduler().once(() -> {
+            scheduled = node.scheduler().selfRecurring(() -> {
                 scheduled = null;
                 node.commandStores().any().execute(() -> {
                     coordinate(node, exclusiveSyncPoint)
@@ -353,14 +364,14 @@ public class CoordinateDurabilityScheduling
         }
     }
 
-    public CoordinateDurabilityScheduling(Node node)
+    public DurabilityScheduling(Node node)
     {
         this.node = node;
     }
 
     public void setTargetShardSplits(int targetShardSplits)
     {
-        this.targetShardSplits = targetShardSplits;
+        this.targetShardSplits = 
BitUtil.findNextPositivePowerOfTwo(targetShardSplits);
     }
 
     public void setDefaultRetryDelay(long retryDelay, TimeUnit units)
@@ -399,16 +410,21 @@ public class CoordinateDurabilityScheduling
     public synchronized void start()
     {
         Invariants.checkState(!stop); // cannot currently restart safely
+        started = true;
+        updateTopology();
         long nowMicros = node.elapsed(MICROSECONDS);
-        setNextGlobalSyncTime(nowMicros);
-        scheduled = node.scheduler().recurring(this::run, 1L, MINUTES);
+        long scheduleAt = computeNextGlobalSyncTime(nowMicros);
+        scheduled = node.scheduler().selfRecurring(this::run, scheduleAt - 
nowMicros, MICROSECONDS);
     }
 
-    public void stop()
+    public synchronized void stop()
     {
         if (scheduled != null)
             scheduled.cancel();
         stop = true;
+        for (ShardScheduler scheduler : shardSchedulers.values())
+            scheduler.markDefunct();
+        shardSchedulers.clear();
     }
 
     /**
@@ -419,17 +435,18 @@ public class CoordinateDurabilityScheduling
         if (stop)
             return;
 
-        // TODO (expected): invoke this as soon as topology is updated in 
topology manager
-        updateTopology();
-        if (currentGlobalTopology == null || currentGlobalTopology.size() == 0)
-            return;
-
-        // TODO (expected): schedule this directly based on the global sync 
frequency - this is an artefact of previously scheduling shard syncs as well
         long nowMicros = node.elapsed(MICROSECONDS);
-        if (nextGlobalSyncTimeMicros <= nowMicros)
+        try
         {
+            if (currentGlobalTopology == null || currentGlobalTopology.size() 
== 0)
+                return;
+
             startGlobalSync();
-            setNextGlobalSyncTime(nowMicros);
+        }
+        finally
+        {
+            long scheduleAt = computeNextGlobalSyncTime(nowMicros);
+            node.scheduler().selfRecurring(this::run, scheduleAt - nowMicros, 
MICROSECONDS);
         }
     }
 
@@ -453,7 +470,15 @@ public class CoordinateDurabilityScheduling
     public synchronized void updateTopology()
     {
         Topology latestGlobal = node.topology().current();
-        if (latestGlobal == currentGlobalTopology)
+        updateTopology(latestGlobal);
+    }
+
+    private synchronized void updateTopology(Topology latestGlobal)
+    {
+        if (!started)
+            return;
+
+        if (latestGlobal == currentGlobalTopology || (currentGlobalTopology != 
null && latestGlobal.epoch() < currentGlobalTopology.epoch()))
             return;
 
         Topology latestLocal = latestGlobal.forNode(node.id());
@@ -495,13 +520,10 @@ public class CoordinateDurabilityScheduling
      * It's assumed it is fine if nodes overlap or reorder or skip for 
whatever activity we are picking turns for as long as it is approximately
      * the right pacing.
      */
-    private void setNextGlobalSyncTime(long nowMicros)
+    private long computeNextGlobalSyncTime(long nowMicros)
     {
         if (currentGlobalTopology == null)
-        {
-            nextGlobalSyncTimeMicros = nowMicros;
-            return;
-        }
+            return nowMicros + globalCycleTimeMicros;
 
         // How long it takes for all nodes to go once
         long totalRoundDuration = currentGlobalTopology.nodes().size() * 
globalCycleTimeMicros;
@@ -516,6 +538,26 @@ public class CoordinateDurabilityScheduling
         if (targetTimeInCurrentRound < nowMicros)
             targetTime += totalRoundDuration;
 
-        nextGlobalSyncTimeMicros = targetTime;
+        return targetTime;
+    }
+
+    @Override
+    public void onRemoteSyncComplete(Node.Id node, long epoch)
+    {
+    }
+
+    @Override
+    public void truncateTopologyUntil(long epoch)
+    {
+    }
+
+    @Override
+    public void onEpochClosed(Ranges ranges, long epoch)
+    {
+    }
+
+    @Override
+    public void onEpochRedundant(Ranges ranges, long epoch)
+    {
     }
 }
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index e01614c2..03a249a0 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -71,10 +71,10 @@ import accord.local.SafeCommandStore;
 import accord.local.cfk.CommandsForKey;
 import accord.primitives.AbstractRanges;
 import accord.primitives.AbstractUnseekableKeys;
-import accord.primitives.Deps;
 import accord.primitives.PartialDeps;
 import accord.primitives.Participants;
 import accord.primitives.Range;
+import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
 import accord.primitives.RoutableKey;
@@ -91,6 +91,8 @@ import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.Cancellable;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 
 import static accord.local.KeyHistory.COMMANDS;
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
@@ -1435,52 +1437,19 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         }
     }
 
-    @VisibleForTesting
-    public void load(Deps loading)
-    {
-        registerHistoricalTransactions(loading,
-                                       ((key, txnId) -> {
-                                           
executeInContext(InMemoryCommandStore.this,
-                                                            
PreLoadContext.contextFor(key, COMMANDS),
-                                                            safeStore -> {
-                                                                
safeStore.get(key).registerHistorical(safeStore, txnId);
-                                                                return null;
-                                                            });
-                                       }));
-    }
-
     @Override
-    protected void registerHistoricalTransactions(Range range, Deps deps, 
SafeCommandStore safeStore)
-    {
-        registerHistoricalTransactions(deps, (key, txnId) -> 
safeStore.get(key).registerHistorical(safeStore, txnId));
-    }
-
-    private void registerHistoricalTransactions(Deps deps, 
BiConsumer<RoutingKey, TxnId> registerHistorical)
+    protected void registerTransitive(SafeCommandStore safeStore, RangeDeps 
rangeDeps)
     {
         RangesForEpoch rangesForEpoch = this.rangesForEpoch;
         Ranges allRanges = rangesForEpoch.all();
-        deps.keyDeps.keys().forEach(allRanges, key -> {
-            deps.keyDeps.forEach(key, (txnId, txnIdx) -> {
-                // TODO (desired, efficiency): this can be made more efficient 
by batching by epoch
-                if (rangesForEpoch.coordinates(txnId).contains(key))
-                    return; // already coordinates, no need to replicate
-                // TODO (required): check this logic, esp. next line, matches 
C*
-                if (!rangesForEpoch.allSince(txnId.epoch()).contains(key))
-                    return;
-
-                registerHistorical.accept(key, txnId);
-            });
-
-        });
 
         TreeMap<TxnId, RangeCommand> rangeCommands = this.rangeCommands;
         TreeMap<TxnId, Ranges> historicalRangeCommands = 
historicalRangeCommands();
-        deps.rangeDeps.forEachUniqueTxnId(allRanges, null, (ignore, txnId) -> {
-
+        rangeDeps.forEachUniqueTxnId(allRanges, null, (ignore, txnId) -> {
             if (rangeCommands.containsKey(txnId))
                 return;
 
-            Ranges ranges = deps.rangeDeps.ranges(txnId);
+            Ranges ranges = rangeDeps.ranges(txnId);
             if (rangesForEpoch.coordinates(txnId).intersects(ranges))
                 return; // already coordinates, no need to replicate
             // TODO (required): check this logic, esp. next line, matches C*
diff --git a/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java 
b/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java
deleted file mode 100644
index dcb94801..00000000
--- a/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.api.Scheduler;
-import accord.coordinate.CollectCalculatedDeps;
-import accord.coordinate.CoordinationFailed;
-import accord.local.CommandStore;
-import accord.local.Node;
-import accord.local.ShardDistributor;
-import accord.primitives.FullRoute;
-import accord.primitives.Range;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.utils.async.AsyncResult;
-
-import static accord.local.KeyHistory.COMMANDS;
-import static accord.local.PreLoadContext.contextFor;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
-
-/**
- * Copied from CoordinateDurabilityScheduling
- * TODO (required): deprecate in favour of piggy-backing on exclusive sync 
points
- */
-public class MajorityDepsFetcher
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(MajorityDepsFetcher.class);
-
-    private final Node node;
-
-    private int targetShardSplits = 64;
-    private long defaultRetryDelayMicros = TimeUnit.SECONDS.toMicros(1);
-    private long maxRetryDelayMicros = TimeUnit.MINUTES.toMicros(1);
-    private int maxNumberOfSplits = 1 << 10;
-
-    private final Map<Range, ShardScheduler> shardSchedulers = new HashMap<>();
-
-    private class ShardScheduler
-    {
-        final CommandStore commandStore;
-        final Range range;
-        final List<AsyncResult.Settable<Void>> waiting;
-
-        long epoch;
-        boolean defunct;
-
-        int index;
-        int numberOfSplits;
-        Scheduler.Scheduled scheduled;
-        long retryDelayMicros = defaultRetryDelayMicros;
-
-        private ShardScheduler(CommandStore commandStore, Range range, 
List<AsyncResult.Settable<Void>> waiting, long epoch)
-        {
-            this.commandStore = commandStore;
-            this.range = range;
-            this.waiting = waiting;
-            this.numberOfSplits = targetShardSplits;
-            this.epoch = epoch;
-        }
-
-        void markDefunct()
-        {
-            this.defunct = true;
-        }
-
-        void schedule()
-        {
-            synchronized (MajorityDepsFetcher.this)
-            {
-                if (defunct)
-                    return;
-
-                long nowMicros = node.elapsed(MICROSECONDS);
-                if (retryDelayMicros > defaultRetryDelayMicros)
-                    retryDelayMicros = Math.max(defaultRetryDelayMicros, 
(long) (0.9 * retryDelayMicros));
-                scheduleAt(nowMicros, nowMicros);
-            }
-        }
-
-        void retry()
-        {
-            synchronized (MajorityDepsFetcher.this)
-            {
-                if (defunct)
-                    return;
-
-                long nowMicros = node.elapsed(MICROSECONDS);
-                long scheduleAt = nowMicros + retryDelayMicros;
-                retryDelayMicros += retryDelayMicros / 2;
-                if (retryDelayMicros > maxRetryDelayMicros)
-                {
-                    retryDelayMicros = maxRetryDelayMicros;
-                }
-                if (numberOfSplits * 2 <= maxNumberOfSplits)
-                {
-                    index *= 2;
-                    numberOfSplits *= 2;
-                }
-                scheduleAt(nowMicros, scheduleAt);
-            }
-        }
-
-        void scheduleAt(long nowMicros, long scheduleAt)
-        {
-            synchronized (MajorityDepsFetcher.this)
-            {
-                ShardDistributor distributor = 
node.commandStores().shardDistributor();
-                Range range;
-                int nextIndex;
-                {
-                    int i = index;
-                    Range selectRange = null;
-                    while (selectRange == null)
-                        selectRange = distributor.splitRange(this.range, 
index, ++i, numberOfSplits);
-                    range = selectRange;
-                    nextIndex = i;
-                }
-
-                Runnable schedule = () -> start(range, nextIndex);
-                if (scheduleAt <= nowMicros) schedule.run();
-                else scheduled = node.scheduler().once(schedule, scheduleAt - 
nowMicros, MICROSECONDS);
-            }
-        }
-
-        /**
-         * The first step for coordinating shard durable is to run an 
exclusive sync point
-         * the result of which can then be used to run
-         */
-        private void start(Range slice, int nextIndex)
-        {
-            TxnId id = TxnId.fromValues(epoch - 1, 0, node.id());
-            Timestamp before = Timestamp.minForEpoch(epoch);
-
-            node.withEpoch(id.epoch(), (ignored, withEpochFailure) -> {
-                if (withEpochFailure != null)
-                {
-                    // don't wait on epoch failure - we aren't the cause of 
any problems
-                    start(slice, nextIndex);
-                    Throwable wrapped = 
CoordinationFailed.wrap(withEpochFailure);
-                    logger.trace("Exception waiting for epoch before 
coordinating exclusive sync point for local shard durability, epoch " + 
id.epoch(), wrapped);
-                    node.agent().onUncaughtException(wrapped);
-                    return;
-                }
-                scheduled = null;
-                FullRoute<Range> route = (FullRoute<Range>) 
node.computeRoute(id, Ranges.of(slice));
-                logger.debug("Fetching deps to sync epoch {} for range {}", 
epoch, slice);
-                CollectCalculatedDeps.withCalculatedDeps(node, id, route, 
route, before, (deps, fail) -> {
-                    if (fail != null)
-                    {
-                        logger.warn("Failed to fetch deps for syncing epoch {} 
for {}", epoch, slice, fail);
-                        retry();
-                    }
-                    else
-                    {
-                        // TODO (correctness) : PreLoadContext only works with 
Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys 
AND Ranges!
-                        // ATM all known implementations store ranges 
in-memory, but this will not be true soon, so this will need to be addressed
-                        commandStore.execute(contextFor(null, 
deps.keyDeps.keys(), COMMANDS), safeStore -> {
-                            safeStore.registerHistoricalTransactions(epoch, 
slice, deps);
-                        }).begin((success, fail2) -> {
-                            if (fail2 != null)
-                            {
-                                retry();
-                                logger.warn("Failed to apply deps for syncing 
epoch {} for range {}", epoch, slice, fail2);
-                            }
-                            else
-                            {
-                                try
-                                {
-                                    synchronized (MajorityDepsFetcher.this)
-                                    {
-                                        index = nextIndex;
-                                        if (index >= numberOfSplits)
-                                        {
-                                            waiting.forEach(w -> 
w.trySuccess(null));
-                                            logger.info("Successfully fetched 
majority deps for {} at epoch {}", this.range, epoch);
-                                            defunct = true;
-                                            shardSchedulers.remove(this.range, 
ShardScheduler.this);
-                                        }
-                                        else
-                                        {
-                                            schedule();
-                                        }
-                                    }
-                                }
-                                catch (Throwable t)
-                                {
-                                    retry();
-                                    logger.error("Unexpected exception 
handling durability scheduling callback; starting from scratch", t);
-                                }
-                            }
-                        });
-                    }
-                });
-            });
-        }
-    }
-
-    public MajorityDepsFetcher(Node node)
-    {
-        this.node = node;
-    }
-
-    public synchronized void cancel(Range range, long epoch)
-    {
-        ShardScheduler scheduler = shardSchedulers.get(range);
-        if (scheduler == null || scheduler.epoch > epoch)
-            return;
-
-        scheduler.markDefunct();
-        shardSchedulers.remove(range);
-    }
-
-    public void setTargetShardSplits(int targetShardSplits)
-    {
-        this.targetShardSplits = targetShardSplits;
-    }
-
-    public void setDefaultRetryDelay(long retryDelay, TimeUnit units)
-    {
-        this.defaultRetryDelayMicros = units.toMicros(retryDelay);
-    }
-
-    public void setMaxRetryDelay(long retryDelay, TimeUnit units)
-    {
-        this.maxRetryDelayMicros = units.toMicros(retryDelay);
-    }
-
-    public synchronized void fetchMajorityDeps(CommandStore commandStore, 
Range range, long epoch, AsyncResult.Settable<Void> waiting)
-    {
-        ShardScheduler scheduler = shardSchedulers.get(range);
-        List<AsyncResult.Settable<Void>> waitingList = 
Collections.singletonList(waiting);
-        if (scheduler != null)
-        {
-            if (scheduler.epoch >= epoch)
-                return;
-            scheduler.markDefunct();
-            waitingList = new ArrayList<>(waitingList);
-            waitingList.addAll(scheduler.waiting);
-        }
-        scheduler = new ShardScheduler(commandStore, range, waitingList, 
epoch);
-        scheduler.schedule();
-    }
-
-}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 29edb281..76e83b04 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -25,9 +25,8 @@ import accord.api.DataStore;
 import javax.annotation.Nullable;
 import accord.api.Agent;
 
-import accord.impl.MajorityDepsFetcher;
 import accord.local.CommandStores.RangesForEpoch;
-import accord.primitives.Range;
+import accord.primitives.RangeDeps;
 import accord.primitives.Routables;
 import accord.primitives.Unseekables;
 import accord.utils.async.AsyncChain;
@@ -35,13 +34,10 @@ import accord.utils.async.AsyncChain;
 import accord.api.ConfigurationService.EpochReady;
 import accord.utils.DeterministicIdentitySet;
 import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
@@ -57,11 +53,11 @@ import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.primitives.Deps;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.async.AsyncResults;
+import org.agrona.collections.LongHashSet;
 
 import static accord.api.ConfigurationService.EpochReady.DONE;
 import static accord.local.PreLoadContext.empty;
@@ -174,6 +170,20 @@ public abstract class CommandStore implements AgentExecutor
     private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new 
DeterministicIdentitySet<>());
     @Nullable private RejectBefore rejectBefore;
 
+    static class WaitingOnSync
+    {
+        final AsyncResults.SettableResult<Void> whenDone;
+        final Ranges allRanges;
+        Ranges ranges;
+
+        WaitingOnSync(AsyncResults.SettableResult<Void> whenDone, Ranges 
ranges)
+        {
+            this.whenDone = whenDone;
+            this.allRanges = this.ranges = ranges;
+        }
+    }
+    private final TreeMap<Long, WaitingOnSync> waitingOnSync = new TreeMap<>();
+
     protected CommandStore(int id,
                            NodeCommandStoreService node,
                            Agent agent,
@@ -238,7 +248,7 @@ public abstract class CommandStore implements AgentExecutor
     public abstract <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply);
     public abstract void shutdown();
 
-    protected abstract void registerHistoricalTransactions(Range range, Deps 
deps, SafeCommandStore safeStore);
+    protected abstract void registerTransitive(SafeCommandStore safeStore, 
RangeDeps deps);
 
     protected void unsafeSetRejectBefore(RejectBefore newRejectBefore)
     {
@@ -470,34 +480,11 @@ public abstract class CommandStore implements 
AgentExecutor
      */
     protected Supplier<EpochReady> sync(Node node, Ranges ranges, long epoch, 
boolean isLoad)
     {
-        return () -> syncInternal(node, ranges, epoch, isLoad);
-    }
-
-    protected EpochReady syncInternal(Node node, Ranges ranges, long epoch, 
boolean isLoad)
-    {
-        AsyncResults.SettableResult<Void> whenDone = new 
AsyncResults.SettableResult<>();
-        fetchMajorityDeps(whenDone, node, epoch, ranges);
-        return new EpochReady(epoch, DONE, whenDone, whenDone, whenDone);
-    }
-
-    private MajorityDepsFetcher fetcher;
-    protected void cancelFetch(Range range, long epoch)
-    {
-        if (fetcher != null)
-            fetcher.cancel(range, epoch);
-    }
-    // TODO (required, correctness): replace with a simple wait on suitable 
exclusive sync point(s)
-    private void fetchMajorityDeps(AsyncResult.Settable<Void> coordination, 
Node node, long epoch, Ranges ranges)
-    {
-        if (fetcher == null) fetcher = new MajorityDepsFetcher(node);
-        List<AsyncResult.Settable<Void>> waiting = new ArrayList<>();
-        for (Range range : ranges)
-        {
-            AsyncResult.Settable<Void> rangeComplete = AsyncResults.settable();
-            fetcher.fetchMajorityDeps(this, range, epoch, rangeComplete);
-            waiting.add(rangeComplete);
-        }
-        AsyncChains.reduce(waiting, (a, b) -> 
null).begin(coordination.settingCallback());
+        return () -> {
+            AsyncResults.SettableResult<Void> whenDone = new 
AsyncResults.SettableResult<>();
+            waitingOnSync.put(epoch, new WaitingOnSync(whenDone, ranges));
+            return new EpochReady(epoch, DONE, whenDone, whenDone, whenDone);
+        };
     }
 
     Supplier<EpochReady> unbootstrap(long epoch, Ranges removedRanges)
@@ -534,7 +521,7 @@ public abstract class CommandStore implements AgentExecutor
     public void markShardDurable(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges durableRanges)
     {
         final Ranges slicedRanges = 
durableRanges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal);
-        TxnId locallyRedundantBefore = 
safeStore.redundantBefore().minLocallyAppliedOrInvalidatedBefore(slicedRanges);
+        TxnId locallyRedundantBefore = 
safeStore.redundantBefore().min(slicedRanges, e -> 
e.locallyAppliedOrInvalidatedBefore);
         RedundantBefore addShardRedundant = 
RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, 
TxnId.NONE, globalSyncId, TxnId.NONE, TxnId.NONE);
         safeStore.upsertRedundantBefore(addShardRedundant);
         updatedRedundantBefore(safeStore, globalSyncId, slicedRanges);
@@ -542,7 +529,12 @@ public abstract class CommandStore implements AgentExecutor
 
         if (locallyRedundantBefore.compareTo(globalSyncId) < 0)
         {
-            logger.warn("Trying to markShardDurable we have not yet caught-up 
to locally. Local: {}, Global: {}, Ranges: {}", locallyRedundantBefore, 
globalSyncId, slicedRanges);
+            // TODO (expected): if bootstrapping only part of the range, mark 
the rest for GC; or relax this as can safely GC behind bootstrap
+            TxnId maxBootstrap = safeStore.redundantBefore().max(slicedRanges, 
e -> e.bootstrappedAt);
+            if (maxBootstrap.compareTo(globalSyncId) >= 0)
+                logger.info("Ignoring markShardDurable for a point we are 
bootstrapping. Bootstrapping: {}, Global: {}, Ranges: {}", maxBootstrap, 
globalSyncId, slicedRanges);
+            else
+                logger.warn("Trying to markShardDurable a point we have not 
yet caught-up to locally. Local: {}, Global: {}, Ranges: {}", 
locallyRedundantBefore, globalSyncId, slicedRanges);
             return;
         }
 
@@ -564,6 +556,37 @@ public abstract class CommandStore implements AgentExecutor
     {
     }
 
+    protected void markSynced(TxnId syncId, Ranges ranges)
+    {
+        if (waitingOnSync.isEmpty())
+            return;
+
+        LongHashSet remove = null;
+        for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+        {
+            if (e.getKey() > syncId.epoch())
+                break;
+
+            Ranges remaining = e.getValue().ranges;
+            Ranges synced = remaining.slice(ranges, Minimal);
+            e.getValue().ranges = remaining = remaining.without(ranges);
+            if (e.getValue().ranges.isEmpty())
+            {
+                logger.info("Completed full sync for {} on epoch {} using {}", 
e.getValue().allRanges, e.getKey(), syncId);
+                e.getValue().whenDone.trySuccess(null);
+                if (remove == null)
+                    remove = new LongHashSet();
+                remove.add(e.getKey());
+            }
+            else
+            {
+                logger.info("Completed partial sync for {} on epoch {} using 
{}; {} still to sync", synced, e.getKey(), syncId, remaining);
+            }
+        }
+        if (remove != null)
+            remove.forEach(waitingOnSync::remove);
+    }
+
     // TODO (expected): we can immediately truncate dependencies locally once 
an exclusiveSyncPoint applies, we don't need to wait for the whole shard
     // TODO (required): integrate validation of staleness with implementation 
(e.g. C* should know it has been marked stale)
     //      also: we no longer expect epochs that are losing a range to be 
marked stale, make sure logic reflects this
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index f3f46c82..15347737 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -62,6 +62,7 @@ import accord.coordinate.CoordinationFailed;
 import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
 import accord.coordinate.RecoverWithRoute;
+import accord.impl.DurabilityScheduling;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.ReplyContext;
@@ -170,6 +171,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
 
     // TODO (expected, consider): this really needs to be thought through some 
more, as it needs to be per-instance in some cases, and per-node in others
     private final Scheduler scheduler;
+    private final DurabilityScheduling durabilityScheduling;
 
     // TODO (expected, liveness): monitor the contents of this collection for 
stalled coordination, and excise them
     private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = 
new ConcurrentHashMap<>();
@@ -202,10 +204,12 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         this.random = random;
         this.persistDurableBefore = new PersistentField<>(() -> durableBefore, 
DurableBefore::merge, durableBeforePersister, this::setPersistedDurableBefore);
         this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this));
+        this.durabilityScheduling = new DurabilityScheduling(this);
         // TODO (desired): make frequency configurable
         scheduler.recurring(() -> commandStores.forEachCommandStore(store -> 
store.progressLog.maybeNotify()), 1, SECONDS);
         scheduler.recurring(timeouts::maybeNotify, 100, MILLISECONDS);
         configService.registerListener(this);
+        configService.registerListener(durabilityScheduling);
     }
 
     public LocalConfig localConfig()
@@ -223,6 +227,11 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         persistDurableBefore.load();
     }
 
+    public DurabilityScheduling durabilityScheduling()
+    {
+        return durabilityScheduling;
+    }
+
     /**
      * This starts the node for tests and makes sure that the provided 
topology is acknowledged correctly.  This method is not
      * safe for production systems as it doesn't handle restarts and partially 
acknowledged histories
@@ -232,6 +241,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     public AsyncResult<Void> unsafeStart()
     {
         EpochReady ready = 
onTopologyUpdateInternal(configService.currentTopology(), false, false);
+        durabilityScheduling.updateTopology();
         ready.coordination.addCallback(() -> 
this.topology.onEpochSyncComplete(id, topology.epoch()));
         configService.acknowledgeEpoch(ready, false);
         return ready.metadata;
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index f859f7bf..bcd6bcc1 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -21,6 +21,7 @@ package accord.local;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -152,6 +153,8 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             
Invariants.checkArgument(locallyDecidedAndAppliedOrInvalidatedBefore.equals(TxnId.NONE)
 || locallyDecidedAndAppliedOrInvalidatedBefore.domain().isRange());
             
Invariants.checkArgument(shardAppliedOrInvalidatedBefore.equals(TxnId.NONE) || 
shardAppliedOrInvalidatedBefore.domain().isRange());
             Invariants.checkArgument(gcBefore.equals(TxnId.NONE) || 
gcBefore.domain().isRange());
+            
Invariants.checkArgument(locallyDecidedAndAppliedOrInvalidatedBefore.compareTo(locallyAppliedOrInvalidatedBefore)
 <= 0);
+            
Invariants.checkArgument(shardAppliedOrInvalidatedBefore.compareTo(shardOnlyAppliedOrInvalidatedBefore)
 <= 0);
             
Invariants.checkArgument(gcBefore.compareTo(shardAppliedOrInvalidatedBefore) <= 
0);
         }
 
@@ -215,6 +218,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
 
             TxnId locallyAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMax(this.locallyAppliedOrInvalidatedBefore, newGcBefore);
             TxnId shardAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMax(this.shardAppliedOrInvalidatedBefore, newGcBefore);
+            TxnId shardOnlyAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMax(this.shardOnlyAppliedOrInvalidatedBefore, newGcBefore);
             return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, 
newGcBefore, bootstrappedAt, staleUntilAtLeast);
         }
 
@@ -299,26 +303,20 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             return safeToRead;
         }
 
-        static TxnId minGcBefore(Entry entry, @Nullable TxnId minGcBefore)
+        static TxnId min(Entry entry, @Nullable TxnId min, Function<Entry, 
TxnId> get)
         {
             if (entry == null)
-                return minGcBefore;
+                return min;
 
-            if (minGcBefore == null)
-                return entry.gcBefore;
-
-            return TxnId.min(minGcBefore, entry.gcBefore);
+            return TxnId.nonNullOrMin(min, get.apply(entry));
         }
 
-        static TxnId minLocallyAppliedOrInvalidatedBefore(Entry entry, 
@Nullable TxnId minLocallyAppliedOrInvalidatedBefore)
+        static TxnId max(Entry entry, @Nullable TxnId max, Function<Entry, 
TxnId> get)
         {
             if (entry == null)
-                return minLocallyAppliedOrInvalidatedBefore;
-
-            if (minLocallyAppliedOrInvalidatedBefore == null)
-                return entry.locallyAppliedOrInvalidatedBefore;
+                return max;
 
-            return TxnId.min(minLocallyAppliedOrInvalidatedBefore, 
entry.locallyAppliedOrInvalidatedBefore);
+            return TxnId.nonNullOrMax(max, get.apply(entry));
         }
 
         static Ranges expectToExecute(Entry entry, @Nonnull Ranges 
executeRanges, TxnId txnId, @Nullable Timestamp executeAt)
@@ -596,14 +594,14 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         return foldl(ranges, Entry::validateSafeToRead, ranges, 
forBootstrapAt, null, r -> false);
     }
 
-    public TxnId minGcBefore(Routables<?> participants)
+    public TxnId min(Routables<?> participants, Function<Entry, TxnId> get)
     {
-        return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, 
Entry::minGcBefore, null, ignore -> false));
+        return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, Entry::min, 
null, get, ignore -> false));
     }
 
-    public TxnId minLocallyAppliedOrInvalidatedBefore(Routables<?> 
participants)
+    public TxnId max(Routables<?> participants, Function<Entry, TxnId> get)
     {
-        return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, 
Entry::minLocallyAppliedOrInvalidatedBefore, null, ignore -> false));
+        return foldl(participants, Entry::max, TxnId.NONE, get, ignore -> 
false);
     }
 
     /**
@@ -681,7 +679,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             return new Entry(a.range.newRange(
                 a.range.start().compareTo(b.range.start()) <= 0 ? 
a.range.start() : b.range.start(),
                 a.range.end().compareTo(b.range.end()) >= 0 ? a.range.end() : 
b.range.end()
-            ), a.startOwnershipEpoch, a.endOwnershipEpoch, 
a.locallyDecidedAndAppliedOrInvalidatedBefore, 
a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, 
a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, 
a.staleUntilAtLeast);
+            ), a.startOwnershipEpoch, a.endOwnershipEpoch, 
a.locallyAppliedOrInvalidatedBefore, 
a.locallyDecidedAndAppliedOrInvalidatedBefore, 
a.shardAppliedOrInvalidatedBefore, a.shardOnlyAppliedOrInvalidatedBefore, 
a.gcBefore, a.bootstrappedAt, a.staleUntilAtLeast);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/RedundantStatus.java 
b/accord-core/src/main/java/accord/local/RedundantStatus.java
index e3338cd8..3ecdc073 100644
--- a/accord-core/src/main/java/accord/local/RedundantStatus.java
+++ b/accord-core/src/main/java/accord/local/RedundantStatus.java
@@ -93,6 +93,7 @@ public enum RedundantStatus
         NOT_OWNED.merge.put(SHARD_REDUNDANT, SHARD_REDUNDANT);
         WAS_OWNED.merge = new EnumMap<>(RedundantStatus.class);
         WAS_OWNED.merge.put(NOT_OWNED, NOT_OWNED);
+        WAS_OWNED.merge.put(WAS_OWNED, WAS_OWNED);
         WAS_OWNED.merge.put(LIVE, LIVE);
         WAS_OWNED.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, 
PARTIALLY_PRE_BOOTSTRAP_OR_STALE);
         WAS_OWNED.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE);
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 5c80701b..5de54fed 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -29,12 +29,10 @@ import accord.api.ProgressLog;
 import accord.api.RoutingKey;
 import accord.local.cfk.CommandsForKey;
 import accord.local.cfk.SafeCommandsForKey;
+import accord.local.cfk.UpdateUnmanagedMode;
 import accord.primitives.AbstractUnseekableKeys;
-import accord.primitives.Deps;
 import accord.primitives.Participants;
-import accord.primitives.Range;
 import accord.primitives.Ranges;
-import accord.primitives.Routable.Domain;
 import accord.primitives.RoutingKeys;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
@@ -48,6 +46,8 @@ import accord.utils.Invariants;
 
 import static accord.local.KeyHistory.COMMANDS;
 import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY;
+import static accord.local.cfk.UpdateUnmanagedMode.REGISTER;
+import static accord.primitives.Routable.Domain.Range;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.SaveStatus.Applied;
 
@@ -200,7 +200,7 @@ public abstract class SafeCommandStore
 
     public void updateExclusiveSyncPoint(Command prev, Command updated)
     {
-        if (updated.txnId().kind() != Kind.ExclusiveSyncPoint || 
updated.txnId().domain() != Domain.Range) return;
+        if (updated.txnId().kind() != Kind.ExclusiveSyncPoint || 
updated.txnId().domain() != Range) return;
         if (updated.route() == null) return;
 
         SaveStatus oldSaveStatus = prev == null ? SaveStatus.Uninitialised : 
prev.saveStatus();
@@ -265,11 +265,6 @@ public abstract class SafeCommandStore
         commandStore().unsafeSetRangesForEpoch(rangesForEpoch);
     }
 
-    public void registerHistoricalTransactions(long epoch, Range range, Deps 
deps)
-    {
-        commandStore().registerHistoricalTransactions(range, deps, this);
-    }
-
     public void updateCommandsForKey(Command prev, Command next)
     {
         if (!CommandsForKey.needsUpdate(prev, next))
@@ -278,7 +273,10 @@ public abstract class SafeCommandStore
         TxnId txnId = next.txnId();
         if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this, 
prev, next);
         if (!CommandsForKey.managesExecution(txnId) && 
next.hasBeen(Status.Stable) && !next.hasBeen(Status.Truncated) && 
!prev.hasBeen(Status.Stable))
-            updateUnmanagedExecutionCommandsForKey(this, next);
+            updateUnmanagedCommandsForKey(this, next, REGISTER);
+        // TODO (expected): register deps during Accept phase to more quickly 
sync epochs
+//        else if (txnId.is(Range) && 
next.known().deps.hasProposedOrDecidedDeps())
+//            updateUnmanagedCommandsForKey(this, next, REGISTER_DEPS_ONLY);
     }
 
     private static void updateManagedCommandsForKey(SafeCommandStore 
safeStore, Command prev, Command next)
@@ -305,7 +303,7 @@ public abstract class SafeCommandStore
         }
     }
 
-    private static void 
updateUnmanagedExecutionCommandsForKey(SafeCommandStore safeStore, Command next)
+    private static void updateUnmanagedCommandsForKey(SafeCommandStore 
safeStore, Command next, UpdateUnmanagedMode mode)
     {
         TxnId txnId = next.txnId();
         // TODO (required): use StoreParticipants.executes()
@@ -320,19 +318,25 @@ public abstract class SafeCommandStore
             for (RoutingKey key : keys)
             {
                 if (!waitingOn.isWaitingOnKey(index++)) continue;
-                safeStore.get(key).registerUnmanaged(safeStore, 
safeStore.get(txnId));
+                safeStore.get(key).registerUnmanaged(safeStore, 
safeStore.get(txnId), mode);
+            }
+
+            if (next.txnId().is(Range))
+            {
+                CommandStore commandStore = safeStore.commandStore();
+                Ranges ranges = next.participants().touches.toRanges();
+                commandStore.registerTransitive(safeStore, 
next.partialDeps().rangeDeps);
+                commandStore.markSynced(txnId, ranges);
             }
         }
         else
         {
             safeStore = safeStore;
-            safeStore.commandStore().execute(context, safeStore0 -> 
updateUnmanagedExecutionCommandsForKey(safeStore0, next))
+            safeStore.commandStore().execute(context, safeStore0 -> 
updateUnmanagedCommandsForKey(safeStore0, next, mode))
                           .begin(safeStore.commandStore().agent);
         }
     }
 
-
-
     /**
      * Visits keys first and then ranges, both in ascending order.
      * Within each key or range visits all visible txnids needed for the given 
scope in ascending order of queried timestamp.
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index a5e5d056..4ee98a64 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -69,6 +69,7 @@ import static accord.local.cfk.Pruning.isWaitingOnPruned;
 import static accord.local.cfk.Pruning.loadingPrunedFor;
 import static accord.local.cfk.Pruning.pruneById;
 import static accord.local.cfk.Pruning.prunedBeforeId;
+import static accord.local.cfk.UpdateUnmanagedMode.UPDATE;
 import static accord.local.cfk.Updating.insertOrUpdate;
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH;
@@ -1414,9 +1415,10 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
         return new CommandsForKey(key, boundsInfo, byId, committedByExecuteAt, 
minUndecidedById, maxAppliedWriteByExecuteAt, newLoadingPruned, 
prunedBeforeById, unmanageds);
     }
 
-    CommandsForKeyUpdate registerUnmanaged(SafeCommand safeCommand)
+    CommandsForKeyUpdate registerUnmanaged(SafeCommand safeCommand, 
UpdateUnmanagedMode mode)
     {
-        return Updating.updateUnmanaged(this, safeCommand, true, null);
+        Invariants.checkState(mode != UPDATE);
+        return Updating.updateUnmanaged(this, safeCommand, mode, null);
     }
 
     void postProcess(SafeCommandStore safeStore, CommandsForKey prevCfk, 
@Nullable Command command, NotifySink notifySink)
@@ -1700,11 +1702,6 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
         return Pruning.maybePrune(this, pruneInterval, minHlcDelta);
     }
 
-    CommandsForKeyUpdate registerHistorical(TxnId txnId)
-    {
-        return Updating.registerHistorical(this, txnId);
-    }
-
     int insertPos(Timestamp timestamp)
     {
         return insertPos(byId, timestamp);
diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java 
b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
index 80c71666..8b21352b 100644
--- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java
+++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
@@ -46,6 +46,7 @@ import static 
accord.local.cfk.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATE
 import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE;
 import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY;
 import static accord.local.cfk.CommandsForKey.maxContiguousManagedAppliedIndex;
+import static accord.local.cfk.UpdateUnmanagedMode.UPDATE;
 import static accord.local.cfk.Updating.updateUnmanaged;
 import static accord.local.cfk.Updating.updateUnmanagedAsync;
 import static accord.local.cfk.Utils.findApply;
@@ -198,7 +199,7 @@ abstract class PostProcess
                 SafeCommand safeCommand = 
safeStore.ifLoadedAndInitialised(txnId);
                 if (safeCommand != null)
                 {
-                    CommandsForKeyUpdate update = updateUnmanaged(cfk, 
safeCommand, false, addUnmanageds);
+                    CommandsForKeyUpdate update = updateUnmanaged(cfk, 
safeCommand, UPDATE, addUnmanageds);
                     if (update != cfk)
                     {
                         Invariants.checkState(update.cfk() == cfk);
diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
index 18e24219..f2e4c1a9 100644
--- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
@@ -28,7 +28,6 @@ import accord.local.RedundantBefore;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.primitives.Status;
-import accord.primitives.TxnId;
 
 public abstract class SafeCommandsForKey implements SafeState<CommandsForKey>
 {
@@ -88,16 +87,10 @@ public abstract class SafeCommandsForKey implements 
SafeState<CommandsForKey>
         updateCfk.postProcess(safeStore, prevCfk, command, notifySink);
     }
 
-    public void registerUnmanaged(SafeCommandStore safeStore, SafeCommand 
unmanaged)
+    public void registerUnmanaged(SafeCommandStore safeStore, SafeCommand 
unmanaged, UpdateUnmanagedMode mode)
     {
         CommandsForKey prevCfk = current();
-        update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(unmanaged));
-    }
-
-    public void registerHistorical(SafeCommandStore safeStore, TxnId txnId)
-    {
-        CommandsForKey prevCfk = current();
-        update(safeStore, null, prevCfk, prevCfk.registerHistorical(txnId));
+        update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(unmanaged, 
mode));
     }
 
     public void updateRedundantBefore(SafeCommandStore safeStore, 
RedundantBefore.Entry redundantBefore)
diff --git 
a/accord-core/src/main/java/accord/local/cfk/UpdateUnmanagedMode.java 
b/accord-core/src/main/java/accord/local/cfk/UpdateUnmanagedMode.java
new file mode 100644
index 00000000..c5749bff
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/cfk/UpdateUnmanagedMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local.cfk;
+
+public enum UpdateUnmanagedMode
+{
+    REGISTER_DEPS_ONLY, REGISTER, UPDATE
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java 
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index c4c5f966..97adbb49 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -49,15 +49,15 @@ import accord.utils.SortedCursor;
 import static accord.local.KeyHistory.COMMANDS;
 import static accord.local.cfk.CommandsForKey.InternalStatus.APPLIED;
 import static accord.local.cfk.CommandsForKey.InternalStatus.COMMITTED;
-import static accord.local.cfk.CommandsForKey.InternalStatus.HISTORICAL;
 import static 
accord.local.cfk.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATED_OR_PRUNED;
 import static 
accord.local.cfk.CommandsForKey.InternalStatus.TRANSITIVELY_KNOWN;
 import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY;
 import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.COMMIT;
 import static accord.local.cfk.CommandsForKey.reportLinearizabilityViolations;
 import static accord.local.cfk.CommandsForKey.mayExecute;
-import static accord.local.cfk.Pruning.loadPruned;
 import static accord.local.cfk.Pruning.loadingPrunedFor;
+import static accord.local.cfk.UpdateUnmanagedMode.REGISTER_DEPS_ONLY;
+import static accord.local.cfk.UpdateUnmanagedMode.UPDATE;
 import static accord.local.cfk.Utils.insertMissing;
 import static accord.local.cfk.Utils.mergeAndFilterMissing;
 import static accord.local.cfk.Utils.missingTo;
@@ -721,18 +721,25 @@ class Updating
 
     static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommand safeCommand)
     {
-        return Updating.updateUnmanaged(cfk, safeCommand, false, null);
+        return Updating.updateUnmanaged(cfk, safeCommand, UPDATE, null);
+    }
+
+    static CommandsForKeyUpdate registerDependencies(CommandsForKey cfk, 
SafeCommand safeCommand)
+    {
+        return Updating.updateUnmanaged(cfk, safeCommand, REGISTER_DEPS_ONLY, 
null);
     }
 
     /**
-     * Three modes of operation:
-     *  - {@code register}: inserts any missing dependencies from safeCommand 
into the collection; may return CommandsForKeyUpdate
-     *  - {@code !register, update == null}: fails if any dependencies are 
missing; always returns a CommandsForKey
-     *  - {@code !register && update != null}: fails if any dependencies are 
missing; always returns the original CommandsForKey, and maybe adds a new 
Unmanaged to {@code update}
+     * Four modes of operation:
+     *  - {@code REGISTER_DEPS_ONLY}: inserts any missing dependencies into 
the collection; may return CommandsForKeyUpdate for loading pruned commands
+     *  - {@code REGISTER}: inserts any missing dependencies into the 
collection and inserts the unmanaged command; may return CommandsForKeyUpdate
+     *  - {@code UPDATE, update == null}: fails if any dependencies are 
missing; always returns a CommandsForKey
+     *  - {@code UPDATE && update != null}: fails if any dependencies are 
missing; always returns the original CommandsForKey, and maybe adds a new 
Unmanaged to {@code update}
      */
-    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommand safeCommand, boolean register, @Nullable 
List<CommandsForKey.Unmanaged> update)
+    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable 
List<CommandsForKey.Unmanaged> update)
     {
-        Invariants.checkArgument(!register || update == null);
+        boolean register = mode != UPDATE;
+        Invariants.checkArgument(mode == UPDATE || update == null);
         if (safeCommand.current().hasBeen(Status.Truncated))
             return cfk;
 
@@ -851,34 +858,38 @@ class Updating
                 }
                 cachedTxnIds().discard(missing, clearMissingCount);
 
-                CommandsForKey.Unmanaged newPendingRecord;
-                if (waitingToApply)
+                CommandsForKey.Unmanaged[] newUnmanaged = cfk.unmanageds;
+                if (mode != REGISTER_DEPS_ONLY)
                 {
-                    if (executesAt instanceof TxnInfo)
-                        executesAt = ((TxnInfo) executesAt).plainExecuteAt();
-
-                    if (waitingTxnId.awaitsOnlyDeps() && executesAt != null)
+                    CommandsForKey.Unmanaged newPendingRecord;
+                    if (waitingToApply)
                     {
-                        if 
(executesAt.compareTo(command.waitingOn.executeAtLeast(Timestamp.NONE)) > 0)
+                        if (executesAt instanceof TxnInfo)
+                            executesAt = ((TxnInfo) 
executesAt).plainExecuteAt();
+
+                        if (waitingTxnId.awaitsOnlyDeps() && executesAt != 
null)
                         {
-                            Command.WaitingOn.Update waitingOn = new 
Command.WaitingOn.Update(command.waitingOn);
-                            waitingOn.updateExecuteAtLeast(executesAt);
-                            safeCommand.updateWaitingOn(waitingOn);
+                            if 
(executesAt.compareTo(command.waitingOn.executeAtLeast(Timestamp.NONE)) > 0)
+                            {
+                                Command.WaitingOn.Update waitingOn = new 
Command.WaitingOn.Update(command.waitingOn);
+                                waitingOn.updateExecuteAtLeast(executesAt);
+                                safeCommand.updateWaitingOn(waitingOn);
+                            }
                         }
+
+                        newPendingRecord = new CommandsForKey.Unmanaged(APPLY, 
command.txnId(), executesAt);
                     }
+                    else newPendingRecord = new 
CommandsForKey.Unmanaged(COMMIT, command.txnId(), txnIds.get(txnIds.size() - 
1));
 
-                    newPendingRecord = new CommandsForKey.Unmanaged(APPLY, 
command.txnId(), executesAt);
-                }
-                else newPendingRecord = new CommandsForKey.Unmanaged(COMMIT, 
command.txnId(), txnIds.get(txnIds.size() - 1));
+                    if (update != null)
+                    {
+                        update.add(newPendingRecord);
+                        return cfk;
+                    }
 
-                if (update != null)
-                {
-                    update.add(newPendingRecord);
-                    return cfk;
+                    newUnmanaged = SortedArrays.insert(cfk.unmanageds, 
newPendingRecord, CommandsForKey.Unmanaged[]::new);
                 }
 
-                CommandsForKey.Unmanaged[] newUnmanaged = 
SortedArrays.insert(cfk.unmanageds, newPendingRecord, 
CommandsForKey.Unmanaged[]::new);
-
                 CommandsForKey result;
                 if (newById == byId) result = new CommandsForKey(cfk, 
newLoadingPruned, newUnmanaged);
                 else
@@ -897,36 +908,4 @@ class Updating
 
         return new 
CommandsForKeyUpdate.CommandsForKeyUpdateWithPostProcess(cfk, new 
PostProcess.NotifyNotWaiting(null, new TxnId[] { safeCommand.txnId() }));
     }
-
-    static CommandsForKeyUpdate registerHistorical(CommandsForKey cfk, TxnId 
txnId)
-    {
-        if (txnId.compareTo(cfk.redundantBefore()) < 0)
-            return cfk;
-
-        int i = Arrays.binarySearch(cfk.byId, txnId);
-        if (i >= 0)
-        {
-            if (cfk.byId[i].status().compareTo(HISTORICAL) >= 0)
-                return cfk;
-            return cfk.update(i, txnId, cfk.byId[i], TxnInfo.create(txnId, 
HISTORICAL, cfk.mayExecute(txnId), txnId, Ballot.ZERO), false, null);
-        }
-        else if (txnId.compareTo(cfk.prunedBefore()) >= 0)
-        {
-            return cfk.insert(-1 - i, txnId, TxnInfo.create(txnId, HISTORICAL, 
cfk.mayExecute(txnId), txnId, Ballot.ZERO), false, null);
-        }
-        else if (txnId.compareTo(cfk.safelyPrunedBefore()) < 0)
-        {
-            return cfk;
-        }
-        else
-        {
-            TxnId[] loadingPrunedFor = loadingPrunedFor(cfk.loadingPruned, 
txnId, null);
-            if (loadingPrunedFor != null && 
Arrays.binarySearch(loadingPrunedFor, txnId) >= 0)
-                return cfk;
-
-            TxnId[] txnIdArray = new TxnId[] { txnId };
-            Object[] newLoadingPruned = loadPruned(cfk.loadingPruned, 
txnIdArray, NO_TXNIDS);
-            return PostProcess.LoadPruned.load(txnIdArray, 
cfk.update(newLoadingPruned));
-        }
-    }
 }
diff --git a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java 
b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java
index 76009851..f26a94fd 100644
--- a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java
+++ b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java
@@ -87,6 +87,12 @@ public class ThreadPoolScheduler implements Scheduler
         return new FutureAsScheduled(exec.schedule(wrap(run), delay, units));
     }
 
+    @Override
+    public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units)
+    {
+        return once(run, delay, units);
+    }
+
     @Override
     public void now(Runnable run)
     {
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java 
b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 0382d43b..9406781f 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -33,7 +33,7 @@ import static accord.utils.Invariants.illegalState;
 
 public class AsyncResults
 {
-    public static final AsyncResult<Void> SUCCESS_VOID = success(null);
+    public static final AsyncResult SUCCESS_NULL = new Immediate<>(null);
 
     private AsyncResults() {}
 
@@ -340,6 +340,9 @@ public class AsyncResults
 
     public static <V> AsyncResult<V> success(V value)
     {
+        if (value == null)
+            return SUCCESS_NULL;
+
         return new Immediate<>(value);
     }
 
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 80d26851..dcaf36bd 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -51,11 +51,10 @@ import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
-import accord.primitives.Range;
+import accord.primitives.RangeDeps;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status.Durability;
 import accord.local.cfk.SafeCommandsForKey;
-import accord.primitives.Deps;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
@@ -65,6 +64,8 @@ import accord.utils.AccordGens;
 import accord.utils.RandomSource;
 import accord.utils.RandomTestRunner;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import org.agrona.collections.IntHashSet;
 import org.agrona.collections.ObjectHashSet;
 
@@ -399,8 +400,7 @@ public class RemoteListenersTest
         @Override public AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer) { return null; }
         @Override public <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { return null; }
         @Override public void shutdown() {}
-        @Override protected void registerHistoricalTransactions(Range range, 
Deps deps, SafeCommandStore safeStore) {}
-
+        @Override protected void registerTransitive(SafeCommandStore 
safeStore, RangeDeps deps) { }
         @Override public <T> AsyncChain<T> submit(Callable<T> task) { return 
null; }
     }
 
@@ -434,6 +434,5 @@ public class RemoteListenersTest
         @Override public ProgressLog progressLog() { return null; }
         @Override public NodeCommandStoreService node() { return null; }
         @Override public CommandStores.RangesForEpoch ranges() { return null; }
-        @Override public void registerHistoricalTransactions(long epoch, Range 
range, Deps deps) { }
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 5c23fd18..fbe83707 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -63,7 +63,7 @@ import accord.coordinate.Invalidated;
 import accord.coordinate.Preempted;
 import accord.coordinate.Timeout;
 import accord.coordinate.Truncated;
-import accord.impl.CoordinateDurabilityScheduling;
+import accord.impl.DurabilityScheduling;
 import accord.impl.DefaultLocalListeners;
 import accord.impl.DefaultRemoteListeners;
 import accord.impl.DefaultRequestTimeouts;
@@ -317,14 +317,22 @@ public class Cluster implements Scheduler
     @Override
     public Scheduled once(Runnable run, long delay, TimeUnit units)
     {
-        RecurringPendingRunnable result = new RecurringPendingRunnable(null, 
run, () -> delay, units);
+        RecurringPendingRunnable result = new RecurringPendingRunnable(null, 
run, () -> delay, units, false);
+        pending.add(result, delay, units);
+        return result;
+    }
+
+    @Override
+    public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units)
+    {
+        RecurringPendingRunnable result = new RecurringPendingRunnable(null, 
run, () -> delay, units, true);
         pending.add(result, delay, units);
         return result;
     }
 
     public Scheduled recurring(Runnable run, LongSupplier delay, TimeUnit 
units)
     {
-        RecurringPendingRunnable result = new 
RecurringPendingRunnable(pending, run, delay, units);
+        RecurringPendingRunnable result = new 
RecurringPendingRunnable(pending, run, delay, units, true);
         ++recurring;
         result.onCancellation(() -> --recurring);
         pending.add(result, delay.getAsLong(), units);
@@ -455,7 +463,7 @@ public class Cluster implements Scheduler
                 messageListener.onTopologyChange(t);
             };
             TopologyRandomizer configRandomizer = new 
TopologyRandomizer(randomSupplier, topology, topologyUpdates, nodeMap::get, 
schemaApply);
-            List<CoordinateDurabilityScheduling> durabilityScheduling = new 
ArrayList<>();
+            List<DurabilityScheduling> durabilityScheduling = new 
ArrayList<>();
             List<Service> services = new ArrayList<>();
             for (Id id : nodes)
             {
@@ -475,24 +483,24 @@ public class Cluster implements Scheduler
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, 
DefaultRequestTimeouts::new,
                                      DefaultProgressLogs::new, 
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, 
isLoadedCheck, journal), new CoordinationAdapter.DefaultFactory(),
                                      DurableBefore.NOOP_PERSISTER, 
localConfig);
-                CoordinateDurabilityScheduling durability = new 
CoordinateDurabilityScheduling(node);
+                DurabilityScheduling durability = node.durabilityScheduling();
                 // TODO (desired): randomise
                 durability.setShardCycleTime(30, SECONDS);
                 durability.setGlobalCycleTime(180, SECONDS);
                 durabilityScheduling.add(durability);
                 nodeMap.put(id, node);
-                durabilityScheduling.add(new 
CoordinateDurabilityScheduling(node));
+                durabilityScheduling.add(new DurabilityScheduling(node));
                 services.add(new BarrierService(node, randomSupplier.get()));
             }
 
             Runnable updateDurabilityRate;
             {
                 IntSupplier targetSplits           = 
random.biasedUniformIntsSupplier(1, 16,  2,  4, 4, 16).get();
-                IntSupplier shardCycleTimeSeconds  = 
random.biasedUniformIntsSupplier(5, 60, 10, 30, 1, 30).get();
+                IntSupplier shardCycleTimeSeconds  = 
random.biasedUniformIntsSupplier(5, 60, 10, 60, 1, 30).get();
                 IntSupplier globalCycleTimeSeconds = 
random.biasedUniformIntsSupplier(1, 90, 10, 30,10, 60).get();
                 updateDurabilityRate = () -> {
                     int c = targetSplits.getAsInt();
-                    int s = shardCycleTimeSeconds.getAsInt();
+                    int s = shardCycleTimeSeconds.getAsInt() * 
topologyFactory.rf;
                     int g = globalCycleTimeSeconds.getAsInt();
                     durabilityScheduling.forEach(d -> {
                         d.setTargetShardSplits(c);
@@ -551,7 +559,6 @@ public class Cluster implements Scheduler
                         DelayedCommandStores.DelayedCommandStore store = 
(DelayedCommandStores.DelayedCommandStore) s;
                         store.clearForTesting();
                         journal.reconstructAll(store.loader(), store.id());
-                        journal.loadHistoricalTransactions(store::load, 
store.id());
                     }
                     while (sinks.drain(pred));
                     CommandsForKey.enableLinearizabilityViolationsReporting();
@@ -559,16 +566,17 @@ public class Cluster implements Scheduler
                 });
             }, () -> random.nextInt(1, 10), SECONDS);
 
-            
durabilityScheduling.forEach(CoordinateDurabilityScheduling::start);
+            durabilityScheduling.forEach(DurabilityScheduling::start);
             services.forEach(Service::start);
 
-            noMoreWorkSignal.accept(() -> {
+            Runnable stop = () -> {
                 reconfigure.cancel();
                 purge.cancel();
                 restart.cancel();
-                
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
+                durabilityScheduling.forEach(DurabilityScheduling::stop);
                 services.forEach(Service::close);
-            });
+            };
+            noMoreWorkSignal.accept(stop);
             readySignal.accept(nodeMap);
 
             Packet next;
@@ -577,10 +585,7 @@ public class Cluster implements Scheduler
 
             while (sinks.processPending());
 
-            chaos.cancel();
-            reconfigure.cancel();
-            durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
-            services.forEach(Service::close);
+            stop.run();
             sinks.links = sinks.linkConfig.defaultLinks;
 
             // give progress log et al a chance to finish
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index b5747eb8..ed1df416 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -55,7 +55,6 @@ import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.local.ShardDistributor;
-import accord.primitives.Deps;
 import accord.primitives.Range;
 import accord.primitives.RoutableKey;
 import accord.primitives.Txn;
@@ -287,13 +286,6 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             return new DelayedSafeStore(this, ranges, context, commands, 
timestampsForKey, commandsForKeys);
         }
 
-        @Override
-        protected void registerHistoricalTransactions(Range range, Deps deps, 
SafeCommandStore safeStore)
-        {
-            journal.registerHistoricalTransactions(id(), deps);
-            super.registerHistoricalTransactions(range, deps, safeStore);
-        }
-
         @Override
         public void unsafeRunIn(Runnable fn)
         {
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index f9ba1a42..ec0c1803 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -28,7 +28,6 @@ import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
@@ -46,7 +45,6 @@ import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.local.StoreParticipants;
 import accord.primitives.Ballot;
-import accord.primitives.Deps;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Timestamp;
@@ -63,7 +61,6 @@ import static accord.utils.Invariants.illegalState;
 public class Journal
 {
     private final Long2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Long2ObjectHashMap<>();
-    private final Map<Integer, List<Deps>> historicalTransactions = new 
HashMap<>();
 
     private final Node.Id id;
 
@@ -155,15 +152,6 @@ public class Journal
         Last
     }
 
-    public void loadHistoricalTransactions(Consumer<Deps> consumer, int 
commandStoreId)
-    {
-        List<Deps> depsList = historicalTransactions.get(commandStoreId);
-        if (depsList == null)
-            return;
-        for (Deps deps : depsList)
-            consumer.accept(deps);
-    }
-
     public Command reconstruct(int commandStoreId, TxnId txnId)
     {
         List<Diff> diffs = 
this.diffsPerCommandStore.get(commandStoreId).get(txnId);
@@ -339,11 +327,6 @@ public class Journal
         }
     }
 
-    public void registerHistoricalTransactions(int commandStoreId, Deps deps)
-    {
-        this.historicalTransactions.computeIfAbsent(commandStoreId, (k) -> new 
ArrayList<>()).add(deps);
-    }
-
     public void onExecute(int commandStoreId, Command before, Command after, 
boolean isPrimary)
     {
         if (loading || (before == null && after == null))
diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java 
b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
index 5bc40e69..a90f150a 100644
--- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
@@ -123,7 +123,7 @@ public class RandomDelayQueue implements PendingQueue
     public void addNoDelay(Pending item)
     {
         queue.add(new Item(now, seq++, item));
-        if (item instanceof RecurringPendingRunnable)
+        if (isRecurring(item))
             ++recurring;
     }
 
@@ -133,16 +133,16 @@ public class RandomDelayQueue implements PendingQueue
         if (delay < 0)
             throw illegalArgument("Delay must be positive or 0, but given " + 
delay);
         queue.add(new Item(now + units.toMillis(delay) + 
jitterMillis.getAsLong(), seq++, item));
-        if (item instanceof RecurringPendingRunnable)
+        if (isRecurring(item))
             ++recurring;
     }
 
     @Override
     public boolean remove(Pending item)
     {
-        if (item instanceof RecurringPendingRunnable)
+        if (isRecurring(item))
             --recurring;
-        return queue.remove(item);
+        return queue.removeIf(i -> i.item == item);
     }
 
     @Override
@@ -152,7 +152,7 @@ public class RandomDelayQueue implements PendingQueue
         if (item == null)
             return null;
 
-        if (item.item instanceof RecurringPendingRunnable)
+        if (isRecurring(item.item))
             --recurring;
 
         now = item.time;
@@ -173,7 +173,7 @@ public class RandomDelayQueue implements PendingQueue
             else
             {
                 queue.add(item);
-                if (item.item instanceof RecurringPendingRunnable)
+                if (isRecurring(item.item))
                     ++recurring;
             }
         }
@@ -301,4 +301,9 @@ public class RandomDelayQueue implements PendingQueue
             this.seed = seed;
         }
     }
+
+    private static boolean isRecurring(Pending pending)
+    {
+        return pending instanceof RecurringPendingRunnable && 
((RecurringPendingRunnable) pending).isRecurring;
+    }
 }
diff --git 
a/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java 
b/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java
index d8b876a0..6dbfd754 100644
--- a/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java
+++ b/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java
@@ -28,15 +28,17 @@ class RecurringPendingRunnable implements PendingRunnable, 
Scheduled
     final PendingQueue requeue;
     final LongSupplier delay;
     final TimeUnit units;
+    final boolean isRecurring;
     Runnable run;
     Runnable onCancellation;
 
-    RecurringPendingRunnable(PendingQueue requeue, Runnable run, LongSupplier 
delay, TimeUnit units)
+    RecurringPendingRunnable(PendingQueue requeue, Runnable run, LongSupplier 
delay, TimeUnit units, boolean isRecurring)
     {
         this.requeue = requeue;
         this.run = run;
         this.delay = delay;
         this.units = units;
+        this.isRecurring = isRecurring;
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index ea43e67e..46ac4c2b 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -18,6 +18,7 @@
 
 package accord.impl.list;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
@@ -30,7 +31,6 @@ import accord.coordinate.Invalidated;
 import accord.coordinate.Truncated;
 import accord.coordinate.Timeout;
 import accord.impl.MessageListener;
-import accord.impl.basic.Cluster;
 import accord.impl.basic.Packet;
 import accord.impl.basic.SimulatedFault;
 import accord.local.Node;
@@ -166,12 +166,12 @@ public class ListRequest implements Request
                     }
 
                     node.reply(client, replyContext, 
ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null);
-                    ((Cluster) node.scheduler()).onDone(() -> 
checkOnResult(homeKey, id, 0, null));
+                    node.scheduler().once(() -> checkOnResult(homeKey, id, 0, 
null), 5L, TimeUnit.MINUTES);
                 }
                 else if (fail instanceof SimulatedFault)
                 {
                     node.reply(client, replyContext, 
ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null);
-                    ((Cluster) node.scheduler()).onDone(() -> 
checkOnResult(null, id, 0, null));
+                    node.scheduler().once(() -> checkOnResult(null, id, 0, 
null), 5L, TimeUnit.MINUTES);
                 }
                 else
                 {
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 710780fe..9d3f9fd4 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -61,6 +61,7 @@ import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
+import accord.primitives.RangeDeps;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.local.StoreParticipants;
@@ -87,9 +88,11 @@ import accord.utils.DefaultRandom;
 import accord.utils.Invariants;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
 import static accord.local.Command.NotDefined.notDefined;
+import static accord.local.cfk.UpdateUnmanagedMode.REGISTER;
 import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.Status.Durability.NotDurable;
 
@@ -636,7 +639,7 @@ public class CommandsForKeyTest
                 if (!CommandsForKey.managesExecution(update.next.txnId()) && 
update.next.hasBeen(Status.Stable) && !update.next.hasBeen(Status.Truncated))
                 {
                     CommandsForKey prev = safeCfk.current();
-                    result = prev.registerUnmanaged(safeCommand);
+                    result = prev.registerUnmanaged(safeCommand, REGISTER);
                     safeCfk.set(result.cfk());
                     result.postProcess(safeStore, prev, null, canon);
                 }
@@ -961,10 +964,7 @@ public class CommandsForKeyTest
         }
 
         @Override
-        protected void registerHistoricalTransactions(Range range, Deps deps, 
SafeCommandStore safeStore)
-        {
-            throw new UnsupportedOperationException();
-        }
+        protected void registerTransitive(SafeCommandStore safeStore, 
RangeDeps deps){ }
 
         @Override
         public <T> AsyncChain<T> submit(Callable<T> task)
diff --git a/accord-core/src/test/resources/burn-logback.xml 
b/accord-core/src/test/resources/burn-logback.xml
index 9f4e1b18..30f90316 100644
--- a/accord-core/src/test/resources/burn-logback.xml
+++ b/accord-core/src/test/resources/burn-logback.xml
@@ -36,6 +36,8 @@
         </filter>
     </appender>
 
+    <logger name="accord.impl.DurabilityScheduling" level="ERROR"/>
+
     <root level="INFO">
         <appender-ref ref="FILE"/>
         <appender-ref ref="STDOUT"/>
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index fa8a2048..46d497c6 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -242,14 +242,16 @@ public class Cluster implements Scheduler
     class CancellableRunnable implements Runnable, Scheduled
     {
         final boolean recurring;
+        final boolean selfRecurring;
         final long delay;
         final TimeUnit units;
         Runnable run;
 
-        CancellableRunnable(Runnable run, boolean recurring, long delay, 
TimeUnit units)
+        CancellableRunnable(Runnable run, boolean recurring, boolean 
selfRecurring, long delay, TimeUnit units)
         {
             this.run = run;
             this.recurring = recurring;
+            this.selfRecurring = selfRecurring;
             this.delay = delay;
             this.units = units;
         }
@@ -260,7 +262,7 @@ public class Cluster implements Scheduler
             if (run != null)
             {
                 run.run();
-                if (recurring) pending.add(this, delay, units);
+                if (recurring && !selfRecurring) pending.add(this, delay, 
units);
                 else run = null;
             }
         }
@@ -281,7 +283,7 @@ public class Cluster implements Scheduler
     @Override
     public Scheduled recurring(Runnable run, long delay, TimeUnit units)
     {
-        CancellableRunnable result = new CancellableRunnable(run, true, delay, 
units);
+        CancellableRunnable result = new CancellableRunnable(run, true, false, 
delay, units);
         ++recurring;
         pending.add(result, delay, units);
         return result;
@@ -290,7 +292,16 @@ public class Cluster implements Scheduler
     @Override
     public Scheduled once(Runnable run, long delay, TimeUnit units)
     {
-        CancellableRunnable result = new CancellableRunnable(run, false, 
delay, units);
+        CancellableRunnable result = new CancellableRunnable(run, false, 
false, delay, units);
+        pending.add(result, delay, units);
+        return result;
+    }
+
+    @Override
+    public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units)
+    {
+        CancellableRunnable result = new CancellableRunnable(run, true, true, 
delay, units);
+        ++recurring;
         pending.add(result, delay, units);
         return result;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to