This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb070c6  Faster SimpleProgressLog and BurnTest (#16)
bb070c6 is described below

commit bb070c6b47616e47d6370c41438715634bcd8b48
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Jan 4 16:00:30 2023 +0000

    Faster SimpleProgressLog and BurnTest (#16)
    
    Improve the SimpleProgressLog to only perform work as necessary, and make 
some other minor improvements to performance to improve burn test throughput.
    
    Co-authored-by: Aleksey Yeschenko <alek...@apache.org>
---
 .../main/java/accord/coordinate/CheckShards.java   |    4 +-
 .../java/accord/coordinate/ReadCoordinator.java    |    8 +-
 .../coordinate/tracking/AbstractTracker.java       |    2 +-
 .../accord/coordinate/tracking/ReadTracker.java    |   11 +-
 .../java/accord/impl/InMemoryCommandStore.java     |    4 +-
 .../main/java/accord/impl/SimpleProgressLog.java   | 1072 ++++++++++----------
 .../java/accord/impl/SizeOfIntersectionSorter.java |    2 +-
 .../src/main/java/accord/local/Command.java        |    2 +-
 .../src/main/java/accord/local/CommandStores.java  |    4 +-
 .../main/java/accord/messages/BeginRecovery.java   |   12 +-
 .../src/main/java/accord/messages/Defer.java       |    5 +-
 .../main/java/accord/messages/InformDurable.java   |    2 +
 .../main/java/accord/primitives/AbstractKeys.java  |   10 +-
 .../java/accord/primitives/AbstractRanges.java     |    2 +-
 .../src/main/java/accord/primitives/Deps.java      |   74 +-
 .../src/main/java/accord/primitives/Routables.java |    8 +-
 .../src/main/java/accord/primitives/Txn.java       |    2 +-
 .../src/main/java/accord/primitives/Writes.java    |    2 +-
 .../src/main/java/accord/topology/Topologies.java  |    2 +-
 .../src/main/java/accord/topology/Topology.java    |  126 ++-
 .../main/java/accord/topology/TopologyManager.java |   14 +-
 .../src/main/java/accord/utils/ArrayBuffers.java   |   14 +-
 .../main/java/accord/utils/IndexedBiFunction.java  |    4 +-
 .../main/java/accord/utils/IndexedConsumer.java    |    4 +-
 .../src/main/java/accord/utils/IndexedFold.java    |    3 +-
 .../accord/utils/IndexedFoldIntersectToLong.java   |    4 +-
 .../main/java/accord/utils/IndexedFoldToLong.java  |    4 +-
 .../main/java/accord/utils/IndexedFunction.java    |    4 +-
 .../main/java/accord/utils/IndexedIntFunction.java |    4 +-
 .../main/java/accord/utils/IndexedPredicate.java   |    4 +-
 .../java/accord/utils/IndexedRangeFoldToLong.java  |    2 +-
 .../java/accord/utils/IndexedRangeTriConsumer.java |    6 +
 .../main/java/accord/utils/IndexedTriConsumer.java |    7 +
 .../main/java/accord/utils/IndexedTriFunction.java |    4 +-
 .../java/accord/utils/IntrusiveLinkedList.java     |  109 ++
 ...oldToLong.java => IntrusiveLinkedListNode.java} |   24 +-
 .../src/main/java/accord/utils/SortedArrays.java   |    2 +-
 accord-core/src/test/java/accord/KeysTest.java     |   27 +-
 .../src/test/java/accord/burn/BurnTest.java        |    9 +-
 .../coordinate/tracking/TrackerReconciler.java     |    2 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   10 +-
 41 files changed, 920 insertions(+), 695 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java 
b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index 965e6a9..dd91993 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -59,7 +59,9 @@ public abstract class CheckShards extends 
ReadCoordinator<CheckStatusReply>
     @Override
     protected Action process(Id from, CheckStatusReply reply)
     {
-        debug.put(from, reply);
+        if (debug != null)
+            debug.put(from, reply);
+        
         if (reply.isOk())
         {
             CheckStatusOk ok = (CheckStatusOk) reply;
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java 
b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 4154b21..3c9ccbf 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -16,6 +16,8 @@ import static 
com.google.common.collect.Sets.newHashSetWithExpectedSize;
 
 abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends 
ReadTracker implements Callback<Reply>
 {
+    private static final boolean DEBUG = false;
+
     protected enum Action
     {
         /**
@@ -53,7 +55,7 @@ abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> extends Read
     final TxnId txnId;
     private boolean isDone;
     private Throwable failure;
-    Map<Id, Object> debug = new HashMap<>();
+    Map<Id, Object> debug = DEBUG ? new HashMap<>() : null;
 
     ReadCoordinator(Node node, Topologies topologies, TxnId txnId)
     {
@@ -69,7 +71,9 @@ abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> extends Read
     @Override
     public void onSuccess(Id from, Reply reply)
     {
-        if (debug != null) debug.put(from, reply);
+        if (debug != null)
+            debug.put(from, reply);
+
         if (isDone)
             return;
 
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
index e8fadff..68a3201 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -138,7 +138,7 @@ public abstract class AbstractTracker<ST extends 
ShardTracker, P>
     }
 
     static <ST extends ShardTracker, P, T extends AbstractTracker<ST, P>>
-    ShardOutcomes apply(int trackerIndex, T tracker, BiFunction<? super ST, P, 
? extends ShardOutcome<? super T>> function, P param)
+    ShardOutcomes apply(T tracker, BiFunction<? super ST, P, ? extends 
ShardOutcome<? super T>> function, P param, int trackerIndex)
     {
         return function.apply(tracker.trackers[trackerIndex], 
param).apply(tracker, trackerIndex);
     }
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index 4d4142e..e86f2d4 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -158,10 +158,9 @@ public class ReadTracker extends 
AbstractTracker<ReadTracker.ReadShardTracker, B
     }
 
     // TODO: abstract the candidate selection process so the implementation 
may prioritise based on distance/health etc
-    // TODO: faster Id sets and arrays using primitive ints when unambiguous
-    final Set<Id> inflight;
-    final List<Id> candidates;
-    private Set<Id> slow;
+    final Set<Id> inflight;    // TODO: use Agrona's IntHashSet as soon as 
Node.Id switches from long to int
+    final List<Id> candidates; // TODO: use Agrona's IntArrayList as soon as 
Node.Id switches from long to int
+    private Set<Id> slow;      // TODO: use Agrona's IntHashSet as soon as 
Node.Id switches from long to int
     protected int waitingOnData;
 
     public ReadTracker(Topologies topologies)
@@ -265,9 +264,9 @@ public class ReadTracker extends 
AbstractTracker<ReadTracker.ReadShardTracker, B
         while (i >= 0)
         {
             Id candidate = candidates.get(i);
-            topologies().forEach((ti, topology) -> {
+            topologies().forEach((topology, ti) -> {
                 int offset = topologyOffset(ti);
-                topology.forEachOn(candidate, (si, s) -> toRead.clear(offset + 
si));
+                topology.forEachOn(candidate, (s, si) -> toRead.clear(offset + 
si));
             });
 
             if (toRead.isEmpty())
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 8cd1de2..ce4562d 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -22,10 +22,10 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
-import accord.local.CommandStore; // java8 fails compilation if this is in 
correct position
-import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails 
compilation if this is in correct position
 import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
 import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
+import accord.local.CommandStore; // java8 fails compilation if this is in 
correct position
+import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails 
compilation if this is in correct position
 import accord.local.Command;
 import accord.local.CommandStore.RangesForEpoch;
 import accord.local.CommandsForKey;
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java 
b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index cfde871..f295235 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -29,38 +29,34 @@ import java.util.function.BiConsumer;
 
 import javax.annotation.Nullable;
 
-import accord.coordinate.*;
-import accord.local.*;
-import accord.local.Status.Known;
-import accord.primitives.*;
-import accord.utils.Invariants;
-
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
+import accord.coordinate.*;
+import accord.impl.SimpleProgressLog.Instance.State.Monitoring;
+import accord.local.*;
 import accord.local.Node.Id;
-import accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus;
+import accord.local.Status.Known;
 import accord.messages.Callback;
 import accord.messages.InformDurable;
 import accord.messages.SimpleReply;
+import accord.primitives.*;
 import accord.topology.Topologies;
+import accord.utils.IntrusiveLinkedList;
+import accord.utils.IntrusiveLinkedListNode;
+import accord.utils.Invariants;
 import org.apache.cassandra.utils.concurrent.Future;
 
 import static accord.api.ProgressLog.ProgressShard.Home;
 import static accord.api.ProgressLog.ProgressShard.Unsure;
 import static accord.coordinate.InformHomeOfTxn.inform;
-import static 
accord.impl.SimpleProgressLog.DisseminateState.DisseminateStatus.NotExecuted;
-import static 
accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.NotWitnessed;
-import static 
accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.ReadyToExecute;
-import static 
accord.impl.SimpleProgressLog.CoordinateState.CoordinateStatus.Uncommitted;
-import static accord.impl.SimpleProgressLog.NonHomeState.Safe;
-import static accord.impl.SimpleProgressLog.NonHomeState.StillUnsafe;
-import static accord.impl.SimpleProgressLog.NonHomeState.Unsafe;
+import static accord.impl.SimpleProgressLog.CoordinateStatus.ReadyToExecute;
+import static accord.impl.SimpleProgressLog.CoordinateStatus.Uncommitted;
+import static accord.impl.SimpleProgressLog.DisseminateStatus.NotExecuted;
 import static accord.impl.SimpleProgressLog.Progress.Done;
 import static accord.impl.SimpleProgressLog.Progress.Expected;
 import static accord.impl.SimpleProgressLog.Progress.Investigating;
 import static accord.impl.SimpleProgressLog.Progress.NoProgress;
 import static accord.impl.SimpleProgressLog.Progress.NoneExpected;
-import static accord.impl.SimpleProgressLog.Progress.advance;
 import static accord.local.PreLoadContext.contextFor;
 import static accord.local.Status.Durability.Durable;
 import static accord.local.Status.Known.Nothing;
@@ -69,565 +65,576 @@ import static accord.local.Status.PreCommitted;
 import static accord.primitives.Route.isFullRoute;
 
 // TODO: consider propagating invalidations in the same way as we do applied
-public class SimpleProgressLog implements Runnable, ProgressLog.Factory
+public class SimpleProgressLog implements ProgressLog.Factory
 {
-    enum Progress
-    {
-        NoneExpected, Expected, NoProgress, Investigating, Done;
+    enum Progress { NoneExpected, Expected, NoProgress, Investigating, Done }
 
-        static Progress advance(Progress current)
-        {
-            switch (current)
-            {
-                default: throw new IllegalStateException();
-                case NoneExpected:
-                case Investigating:
-                case Done:
-                    return current;
-                case Expected:
-                case NoProgress:
-                    return NoProgress;
-            }
-        }
-    }
-
-    // exists only on home shard
-    static class CoordinateState
+    enum CoordinateStatus
     {
-        enum CoordinateStatus
-        {
-            NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
-            boolean isAtMost(CoordinateStatus equalOrLessThan)
-            {
-                return compareTo(equalOrLessThan) <= 0;
-            }
-            boolean isAtLeast(CoordinateStatus equalOrGreaterThan)
-            {
-                return compareTo(equalOrGreaterThan) >= 0;
-            }
-        }
-
-        CoordinateStatus status = NotWitnessed;
-        Progress progress = NoneExpected;
-        ProgressToken token = ProgressToken.NONE;
-
-        Object debugInvestigating;
+        NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
 
-        void ensureAtLeast(Command command, CoordinateStatus newStatus, 
Progress newProgress)
+        boolean isAtMostReadyToExecute()
         {
-            ensureAtLeast(newStatus, newProgress);
-            updateMax(command);
+            return compareTo(CoordinateStatus.ReadyToExecute) <= 0;
         }
 
-        void ensureAtLeast(CoordinateStatus newStatus, Progress newProgress)
+        boolean isAtLeastCommitted()
         {
-            if (newStatus.compareTo(status) > 0)
-            {
-                status = newStatus;
-                progress = newProgress;
-            }
+            return compareTo(CoordinateStatus.Committed) >= 0;
         }
+    }
 
-        void updateMax(Command command)
-        {
-            token = token.merge(new ProgressToken(command.durability(), 
command.status(), command.promised(), command.accepted()));
-        }
+    enum DisseminateStatus { NotExecuted, Durable, Done }
 
-        void updateMax(ProgressToken ok)
-        {
-            // TODO: perhaps set localProgress back to Waiting if 
Investigating and we update anything?
-            token = token.merge(ok);
-        }
+    final Node node;
+    final List<Instance> instances = new CopyOnWriteArrayList<>();
 
-        void durableGlobal()
-        {
-            switch (status)
-            {
-                default: throw new IllegalStateException();
-                case NotWitnessed:
-                case Uncommitted:
-                case Committed:
-                case ReadyToExecute:
-                    status = CoordinateStatus.Done;
-                    progress = NoneExpected;
-                case Done:
-            }
-        }
+    public SimpleProgressLog(Node node)
+    {
+        this.node = node;
+    }
 
-        void update(Node node, CommandStore commandStore, TxnId txnId, Command 
command)
+    class Instance extends IntrusiveLinkedList<Monitoring> implements 
ProgressLog, Runnable
+    {
+        class State
         {
-            if (progress != NoProgress)
+            abstract class Monitoring extends IntrusiveLinkedListNode
             {
-                progress = advance(progress);
-                return;
-            }
+                private Progress progress = NoneExpected;
 
-            progress = Investigating;
-            switch (status)
-            {
-                default: throw new AssertionError();
-                case NotWitnessed: // can't make progress if we haven't 
witnessed it yet
-                case Committed: // can't make progress if we aren't yet 
ReadyToExecute
-                case Done: // shouldn't be trying to make progress, as we're 
done
-                    throw new IllegalStateException();
-
-                case Uncommitted:
-                case ReadyToExecute:
+                void setProgress(Progress newProgress)
                 {
-                    if (status.isAtLeast(CoordinateStatus.Committed) && 
command.durability().isDurable())
+                    this.progress = newProgress;
+                    switch (newProgress)
                     {
-                        // must also be committed, as at the time of writing 
we do not guarantee dissemination of Commit
-                        // records to the home shard, so we only know the 
executeAt shards will have witnessed this
-                        // if the home shard is at an earlier phase, it must 
run recovery
-                        long epoch = command.executeAt().epoch;
-                        node.withEpoch(epoch, () -> debugInvestigating = 
FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, 
(success, fail) -> {
-                            // should have found enough information to apply 
the result, but in case we did not reset progress
-                            if (progress == Investigating)
-                                progress = Expected;
-                        }));
+                        default: throw new AssertionError();
+                        case NoneExpected:
+                        case Done:
+                        case Investigating:
+                            remove();
+                            break;
+                        case Expected:
+                        case NoProgress:
+                            if (isFree())
+                                addFirst(this);
                     }
-                    else
+                }
+
+                boolean shouldRun()
+                {
+                    switch (progress)
                     {
-                        RoutingKey homeKey = command.homeKey();
-                        node.withEpoch(txnId.epoch, () -> {
-
-                            Future<? extends Outcome> recover = 
node.maybeRecover(txnId, homeKey, command.route(), token);
-                            recover.addCallback((success, fail) -> {
-                                if (status.isAtMost(ReadyToExecute) && 
progress == Investigating)
-                                {
-                                    progress = Expected;
-                                    if (fail != null)
-                                        return;
-
-                                    ProgressToken token = 
success.asProgressToken();
-                                    // TODO: avoid returning null (need to 
change semantics here in this case, though, as Recover doesn't return 
CheckStatusOk)
-                                    if (token.durability.isDurable())
-                                    {
-                                        
commandStore.execute(contextFor(txnId), safeStore -> {
-                                            Command cmd = 
safeStore.command(txnId);
-                                            cmd.setDurability(safeStore, 
token.durability, homeKey, null);
-                                            
safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
-                                        }).addCallback(commandStore.agent());
-                                    }
-
-                                    updateMax(token);
-                                }
-                            });
-
-                            debugInvestigating = recover;
-                        });
+                        default: throw new AssertionError();
+                        case NoneExpected:
+                        case Done:
+                        case Investigating:
+                            throw new IllegalStateException();
+                        case Expected:
+                            if (isFree())
+                                throw new IllegalStateException();
+                            progress = NoProgress;
+                            return false;
+                        case NoProgress:
+                            remove();
+                            return true;
                     }
                 }
-            }
-        }
-
-        @Override
-        public String toString()
-        {
-            return "{" + status + ',' + progress + '}';
-        }
-    }
 
-    // exists only on home shard
-    static class DisseminateState
-    {
-        enum DisseminateStatus { NotExecuted, Durable, Done }
+                abstract void run(Command command);
 
-        // TODO: thread safety (schedule on progress log executor)
-        class CoordinateAwareness implements Callback<SimpleReply>
-        {
-            @Override
-            public void onSuccess(Id from, SimpleReply reply)
-            {
-                notAwareOfDurability.remove(from);
-                maybeDone();
-            }
+                Progress progress()
+                {
+                    return progress;
+                }
 
-            @Override
-            public void onFailure(Id from, Throwable failure)
-            {
+                TxnId txnId()
+                {
+                    return txnId;
+                }
             }
 
-            @Override
-            public void onCallbackFailure(Id from, Throwable failure)
+            // exists only on home shard
+            class CoordinateState extends Monitoring
             {
-            }
-        }
+                CoordinateStatus status = CoordinateStatus.NotWitnessed;
+                ProgressToken token = ProgressToken.NONE;
 
-        DisseminateStatus status = NotExecuted;
-        Progress progress = NoneExpected;
-        Set<Id> notAwareOfDurability;
-        Set<Id> notPersisted;
+                Object debugInvestigating;
 
-        List<Runnable> whenReady;
-
-        CoordinateAwareness investigating;
+                void ensureAtLeast(Command command, CoordinateStatus 
newStatus, Progress newProgress)
+                {
+                    ensureAtLeast(newStatus, newProgress);
+                    updateMax(command);
+                }
 
-        private void whenReady(Node node, Command command, Runnable runnable)
-        {
-            if (notAwareOfDurability != null || maybeReady(node, command))
-            {
-                runnable.run();
-            }
-            else
-            {
-                if (whenReady == null)
-                    whenReady = new ArrayList<>();
-                whenReady.add(runnable);
-            }
-        }
+                void ensureAtLeast(CoordinateStatus newStatus, Progress 
newProgress)
+                {
+                    if (newStatus.compareTo(status) > 0)
+                    {
+                        status = newStatus;
+                        setProgress(newProgress);
+                    }
+                }
 
-        private void whenReady(Runnable runnable)
-        {
-            if (notAwareOfDurability != null)
-            {
-                runnable.run();
-            }
-            else
-            {
-                if (whenReady == null)
-                    whenReady = new ArrayList<>();
-                whenReady.add(runnable);
-            }
-        }
+                void updateMax(Command command)
+                {
+                    token = token.merge(new 
ProgressToken(command.durability(), command.status(), command.promised(), 
command.accepted()));
+                }
 
-        // must know the epoch information, and have a valid Route
-        private boolean maybeReady(Node node, Command command)
-        {
-            if (!command.status().hasBeen(Status.PreCommitted))
-                return false;
+                void updateMax(ProgressToken ok)
+                {
+                    // TODO: perhaps set localProgress back to Waiting if 
Investigating and we update anything?
+                    token = token.merge(ok);
+                }
 
-            if (!isFullRoute(command.route()))
-                return false;
+                void durableGlobal()
+                {
+                    switch (status)
+                    {
+                        default: throw new IllegalStateException();
+                        case NotWitnessed:
+                        case Uncommitted:
+                        case Committed:
+                        case ReadyToExecute:
+                            status = CoordinateStatus.Done;
+                            setProgress(NoneExpected);
+                        case Done:
+                    }
+                }
 
-            if (!node.topology().hasEpoch(command.executeAt().epoch))
-                return false;
+                @Override
+                void run(Command command)
+                {
+                    setProgress(Investigating);
+                    switch (status)
+                    {
+                        default: throw new AssertionError();
+                        case NotWitnessed: // can't make progress if we 
haven't witnessed it yet
+                        case Committed: // can't make progress if we aren't 
yet ReadyToExecute
+                        case Done: // shouldn't be trying to make progress, as 
we're done
+                            throw new IllegalStateException();
+
+                        case Uncommitted:
+                        case ReadyToExecute:
+                        {
+                            if (status.isAtLeastCommitted() && 
command.durability().isDurable())
+                            {
+                                // must also be committed, as at the time of 
writing we do not guarantee dissemination of Commit
+                                // records to the home shard, so we only know 
the executeAt shards will have witnessed this
+                                // if the home shard is at an earlier phase, 
it must run recovery
+                                long epoch = command.executeAt().epoch;
+                                node.withEpoch(epoch, () -> debugInvestigating 
= FetchData.fetch(PreApplied.minKnown, node, txnId, command.route(), epoch, 
(success, fail) -> {
+                                    // should have found enough information to 
apply the result, but in case we did not reset progress
+                                    if (progress() == Investigating)
+                                        setProgress(Expected);
+                                }));
+                            }
+                            else
+                            {
+                                RoutingKey homeKey = command.homeKey();
+                                node.withEpoch(txnId.epoch, () -> {
+
+                                    Future<? extends Outcome> recover = 
node.maybeRecover(txnId, homeKey, command.route(), token);
+                                    recover.addCallback((success, fail) -> {
+                                        if (status.isAtMostReadyToExecute() && 
progress() == Investigating)
+                                        {
+                                            setProgress(Expected);
+                                            if (fail != null)
+                                                return;
+
+                                            ProgressToken token = 
success.asProgressToken();
+                                            // TODO: avoid returning null 
(need to change semantics here in this case, though, as Recover doesn't return 
CheckStatusOk)
+                                            if (token.durability.isDurable())
+                                            {
+                                                
commandStore.execute(contextFor(txnId), safeStore -> {
+                                                    Command cmd = 
safeStore.command(txnId);
+                                                    
cmd.setDurability(safeStore, token.durability, homeKey, null);
+                                                    
safeStore.progressLog().durable(txnId, cmd.maxUnseekables(), null);
+                                                
}).addCallback(commandStore.agent());
+                                            }
+
+                                            updateMax(token);
+                                        }
+                                    });
+
+                                    debugInvestigating = recover;
+                                });
+                            }
+                        }
+                    }
+                }
 
-            Topologies topology = 
node.topology().preciseEpochs(command.route(), command.txnId().epoch, 
command.executeAt().epoch);
-            notAwareOfDurability = topology.copyOfNodes();
-            notPersisted = topology.copyOfNodes();
-            if (whenReady != null)
-            {
-                whenReady.forEach(Runnable::run);
-                whenReady = null;
+                @Override
+                public String toString()
+                {
+                    return "{" + status + ',' + progress() + '}';
+                }
             }
 
-            return true;
-        }
-
-        private void maybeDone()
-        {
-            if (notAwareOfDurability.isEmpty())
+            // exists only on home shard
+            class DisseminateState extends Monitoring
             {
-                status = DisseminateStatus.Done;
-                progress = Done;
-            }
-        }
+                class CoordinateAwareness implements Callback<SimpleReply>
+                {
+                    @Override
+                    public void onSuccess(Id from, SimpleReply reply)
+                    {
+                        // TODO: callbacks should be associated with a 
commandStore for processing to avoid this
+                        commandStore.execute(PreLoadContext.empty(), ignore -> 
{
+                            notAwareOfDurability.remove(from);
+                            maybeDone();
+                        });
+                    }
 
-        void durableGlobal(Node node, Command command, @Nullable Set<Id> 
persistedOn)
-        {
-            if (status == DisseminateStatus.Done)
-                return;
+                    @Override
+                    public void onFailure(Id from, Throwable failure)
+                    {
+                    }
 
-            status = DisseminateStatus.Durable;
-            progress = Expected;
-            if (persistedOn == null)
-                return;
+                    @Override
+                    public void onCallbackFailure(Id from, Throwable failure)
+                    {
+                    }
+                }
 
-            whenReady(node, command, () -> {
-                notPersisted.removeAll(persistedOn);
-                notAwareOfDurability.removeAll(persistedOn);
-                maybeDone();
-            });
-        }
+                DisseminateStatus status = NotExecuted;
+                Set<Id> notAwareOfDurability; // TODO: use Agrona's IntHashSet 
as soon as Node.Id switches from long to int
+                Set<Id> notPersisted;         // TODO: use Agrona's IntHashSet 
as soon as Node.Id switches from long to int
 
-        void durableLocal(Node node)
-        {
-            if (status == DisseminateStatus.Done)
-                return;
+                List<Runnable> whenReady;
 
-            status = DisseminateStatus.Durable;
-            progress = Expected;
+                CoordinateAwareness investigating;
 
-            whenReady(() -> {
-                notPersisted.remove(node.id());
-                notAwareOfDurability.remove(node.id());
-                maybeDone();
-            });
-        }
+                private void whenReady(Node node, Command command, Runnable 
runnable)
+                {
+                    if (notAwareOfDurability != null || maybeReady(node, 
command))
+                    {
+                        runnable.run();
+                    }
+                    else
+                    {
+                        if (whenReady == null)
+                            whenReady = new ArrayList<>();
+                        whenReady.add(runnable);
+                    }
+                }
 
-        void update(Node node, TxnId txnId, Command command)
-        {
-            switch (status)
-            {
-                default: throw new IllegalStateException();
-                case NotExecuted:
-                case Done:
-                    return;
-                case Durable:
-            }
+                private void whenReady(Runnable runnable)
+                {
+                    if (notAwareOfDurability != null)
+                    {
+                        runnable.run();
+                    }
+                    else
+                    {
+                        if (whenReady == null)
+                            whenReady = new ArrayList<>();
+                        whenReady.add(runnable);
+                    }
+                }
 
-            if (notAwareOfDurability == null && !maybeReady(node, command))
-                return;
+                // must know the epoch information, and have a valid Route
+                private boolean maybeReady(Node node, Command command)
+                {
+                    if (!command.status().hasBeen(Status.PreCommitted))
+                        return false;
 
-            if (progress != NoProgress)
-            {
-                progress = advance(progress);
-                return;
-            }
+                    if (!isFullRoute(command.route()))
+                        return false;
 
-            progress = Investigating;
-            if (notAwareOfDurability.isEmpty())
-            {
-                // TODO: also track actual durability
-                status = DisseminateStatus.Done;
-                progress = Done;
-                return;
-            }
+                    if (!node.topology().hasEpoch(command.executeAt().epoch))
+                        return false;
 
-            FullRoute<?> route = Route.castToFullRoute(command.route());
-            Timestamp executeAt = command.executeAt();
-            investigating = new CoordinateAwareness();
-            Topologies topologies = node.topology().preciseEpochs(route, 
txnId.epoch, executeAt.epoch);
-            node.send(notAwareOfDurability, to -> new InformDurable(to, 
topologies, route, txnId, executeAt, Durable), investigating);
-        }
+                    Topologies topology = 
node.topology().preciseEpochs(command.route(), command.txnId().epoch, 
command.executeAt().epoch);
+                    notAwareOfDurability = topology.copyOfNodes();
+                    notPersisted = topology.copyOfNodes();
+                    if (whenReady != null)
+                    {
+                        whenReady.forEach(Runnable::run);
+                        whenReady = null;
+                    }
 
-        @Override
-        public String toString()
-        {
-            return "{" + status + ',' + progress + '}';
-        }
-    }
+                    return true;
+                }
 
-    static class BlockingState
-    {
-        Known blockedUntil = Nothing;
-        Progress progress = NoneExpected;
+                private void maybeDone()
+                {
+                    if (notAwareOfDurability.isEmpty())
+                    {
+                        status = DisseminateStatus.Done;
+                        setProgress(Done);
+                    }
+                }
 
-        Unseekables<?, ?> blockedOn;
+                void durableGlobal(Node node, Command command, @Nullable 
Set<Id> persistedOn)
+                {
+                    if (status == DisseminateStatus.Done)
+                        return;
+
+                    status = DisseminateStatus.Durable;
+                    setProgress(Expected);
+                    if (persistedOn == null)
+                        return;
+
+                    whenReady(node, command, () -> {
+                        notPersisted.removeAll(persistedOn);
+                        notAwareOfDurability.removeAll(persistedOn);
+                        maybeDone();
+                    });
+                }
 
-        Object debugInvestigating;
+                void durableLocal(Node node)
+                {
+                    if (status == DisseminateStatus.Done)
+                        return;
 
-        void recordBlocking(Known blockedUntil, Unseekables<?, ?> blockedOn)
-        {
-            Invariants.checkState(!blockedOn.isEmpty());
-            if (this.blockedOn == null) this.blockedOn = blockedOn;
-            else this.blockedOn = Unseekables.merge(this.blockedOn, 
(Unseekables)blockedOn);
-            if (!blockedUntil.isSatisfiedBy(this.blockedUntil))
-            {
-                this.blockedUntil = this.blockedUntil.merge(blockedUntil);
-                progress = Expected;
-            }
-        }
+                    status = DisseminateStatus.Durable;
+                    setProgress(Expected);
 
-        void record(Known known)
-        {
-            if (blockedUntil.isSatisfiedBy(known))
-                progress = NoneExpected;
-        }
+                    whenReady(() -> {
+                        notPersisted.remove(node.id());
+                        notAwareOfDurability.remove(node.id());
+                        maybeDone();
+                    });
+                }
 
-        void update(Node node, TxnId txnId, Command command)
-        {
-            if (progress != NoProgress)
-            {
-                progress = advance(progress);
-                return;
-            }
+                @Override
+                void run(Command command)
+                {
+                    switch (status)
+                    {
+                        default: throw new IllegalStateException();
+                        case NotExecuted:
+                        case Done:
+                            return;
+                        case Durable:
+                    }
 
-            if (command.has(blockedUntil))
-            {
-                progress = NoneExpected;
-                return;
-            }
+                    if (notAwareOfDurability == null && !maybeReady(node, 
command))
+                        return;
 
-            progress = Investigating;
-            // first make sure we have enough information to obtain the 
command locally
-            Timestamp executeAt = command.hasBeen(PreCommitted) ? 
command.executeAt() : null;
-            long srcEpoch = (executeAt != null ? executeAt : txnId).epoch;
-            // TODO: compute fromEpoch, the epoch we already have this txn 
replicated until
-            long toEpoch = Math.max(srcEpoch, node.topology().epoch());
-            Unseekables<?, ?> someKeys = unseekables(command);
+                    setProgress(Investigating);
+                    if (notAwareOfDurability.isEmpty())
+                    {
+                        // TODO: also track actual durability
+                        status = DisseminateStatus.Done;
+                        setProgress(Done);
+                        return;
+                    }
 
-            BiConsumer<Known, Throwable> callback = (success, fail) -> {
-                if (progress != Investigating)
-                    return;
+                    FullRoute<?> route = 
Route.castToFullRoute(command.route());
+                    Timestamp executeAt = command.executeAt();
+                    investigating = new CoordinateAwareness();
+                    Topologies topologies = 
node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
+                    node.send(notAwareOfDurability, to -> new 
InformDurable(to, topologies, route, txnId, executeAt, Durable), investigating);
+                }
 
-                progress = Expected;
-                if (fail == null)
+                @Override
+                public String toString()
                 {
-                    if (!success.isDefinitionKnown()) invalidate(node, txnId, 
someKeys);
-                    else record(success);
+                    return "{" + status + ',' + progress() + '}';
                 }
-            };
+            }
 
-            node.withEpoch(toEpoch, () -> {
-                debugInvestigating = FetchData.fetch(blockedUntil, node, 
txnId, someKeys, executeAt, toEpoch, callback);
-            });
-        }
+            class BlockingState extends Monitoring
+            {
+                Known blockedUntil = Nothing;
 
-        private Unseekables<?, ?> unseekables(Command command)
-        {
-            return Unseekables.merge((Route)command.route(), blockedOn);
-        }
+                Unseekables<?, ?> blockedOn;
 
-        private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> 
someKeys)
-        {
-            progress = Investigating;
-            // TODO (RangeTxns): This should be a Routable, or we should 
guarantee it is safe to operate on any key in the range
-            RoutingKey someKey = Route.isRoute(someKeys) ? 
(Route.castToRoute(someKeys)).homeKey() : 
someKeys.get(0).someIntersectingRoutingKey();
-            someKeys = someKeys.with(someKey);
-            debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, 
(success, fail) -> {
-                if (progress != Investigating)
-                    return;
-
-                progress = Expected;
-                if (fail == null && 
success.asProgressToken().durability.isDurable())
-                    progress = Done;
-            });
-        }
+                Object debugInvestigating;
 
-        public String toString()
-        {
-            return progress.toString();
-        }
-    }
+                void recordBlocking(Known blockedUntil, Unseekables<?, ?> 
blockedOn)
+                {
+                    Invariants.checkState(!blockedOn.isEmpty());
+                    if (this.blockedOn == null) this.blockedOn = blockedOn;
+                    else this.blockedOn = Unseekables.merge(this.blockedOn, 
(Unseekables)blockedOn);
+                    if (!blockedUntil.isSatisfiedBy(this.blockedUntil))
+                    {
+                        this.blockedUntil = 
this.blockedUntil.merge(blockedUntil);
+                        setProgress(Expected);
+                    }
+                }
 
-    enum NonHomeState
-    {
-        Unsafe, StillUnsafe, Investigating, Safe
-    }
+                void record(Known known)
+                {
+                    if (blockedUntil.isSatisfiedBy(known))
+                        setProgress(NoneExpected);
+                }
 
-    static class State
-    {
-        final TxnId txnId;
-        final CommandStore commandStore;
+                @Override
+                void run(Command command)
+                {
+                    if (command.has(blockedUntil))
+                    {
+                        setProgress(NoneExpected);
+                        return;
+                    }
 
-        CoordinateState coordinateState;
-        DisseminateState disseminateState;
-        NonHomeState nonHomeState;
-        BlockingState blockingState;
+                    setProgress(Investigating);
+                    // first make sure we have enough information to obtain 
the command locally
+                    Timestamp executeAt = command.hasBeen(PreCommitted) ? 
command.executeAt() : null;
+                    long srcEpoch = (executeAt != null ? executeAt : 
txnId).epoch;
+                    // TODO: compute fromEpoch, the epoch we already have this 
txn replicated until
+                    long toEpoch = Math.max(srcEpoch, node.topology().epoch());
+                    Unseekables<?, ?> someKeys = unseekables(command);
 
-        State(TxnId txnId, CommandStore commandStore)
-        {
-            this.txnId = txnId;
-            this.commandStore = commandStore;
-        }
+                    BiConsumer<Known, Throwable> callback = (success, fail) -> 
{
+                        if (progress() != Investigating)
+                            return;
 
-        void recordBlocking(TxnId txnId, Known waitingFor, Unseekables<?, ?> 
unseekables)
-        {
-            Invariants.checkArgument(txnId.equals(this.txnId));
-            if (blockingState == null)
-                blockingState = new BlockingState();
-            blockingState.recordBlocking(waitingFor, unseekables);
-        }
+                        setProgress(Expected);
+                        if (fail == null)
+                        {
+                            if (!success.isDefinitionKnown()) invalidate(node, 
txnId, someKeys);
+                            else record(success);
+                        }
+                    };
 
-        void ensureAtLeast(NonHomeState ensureAtLeast)
-        {
-            if (nonHomeState == null || nonHomeState.compareTo(ensureAtLeast) 
< 0)
-                nonHomeState = ensureAtLeast;
-        }
+                    node.withEpoch(toEpoch, () -> {
+                        debugInvestigating = FetchData.fetch(blockedUntil, 
node, txnId, someKeys, executeAt, toEpoch, callback);
+                    });
+                }
 
-        CoordinateState local()
-        {
-            if (coordinateState == null)
-                coordinateState = new CoordinateState();
-            return coordinateState;
-        }
+                private Unseekables<?, ?> unseekables(Command command)
+                {
+                    return Unseekables.merge((Route)command.route(), 
blockedOn);
+                }
 
-        DisseminateState global()
-        {
-            if (disseminateState == null)
-                disseminateState = new DisseminateState();
-            return disseminateState;
-        }
+                private void invalidate(Node node, TxnId txnId, Unseekables<?, 
?> someKeys)
+                {
+                    setProgress(Investigating);
+                    // TODO (RangeTxns): This should be a Routable, or we 
should guarantee it is safe to operate on any key in the range
+                    RoutingKey someKey = Route.isRoute(someKeys) ? 
(Route.castToRoute(someKeys)).homeKey() : 
someKeys.get(0).someIntersectingRoutingKey();
+                    someKeys = someKeys.with(someKey);
+                    debugInvestigating = Invalidate.invalidate(node, txnId, 
someKeys, (success, fail) -> {
+                        if (progress() != Investigating)
+                            return;
 
-        void ensureAtLeast(Command command, CoordinateStatus newStatus, 
Progress newProgress)
-        {
-            local().ensureAtLeast(command, newStatus, newProgress);
-        }
+                        setProgress(Expected);
+                        if (fail == null && 
success.asProgressToken().durability.isDurable())
+                            setProgress(Done);
+                    });
+                }
 
-        void ensureAtLeast(TxnId txnId, RoutingKey homeKey, CoordinateStatus 
newStatus, Progress newProgress)
-        {
-            local().ensureAtLeast(newStatus, newProgress);
-        }
+                @Override
+                public String toString()
+                {
+                    return progress().toString();
+                }
+            }
 
-        void updateNonHome(Node node, Command command)
-        {
-            switch (nonHomeState)
+            class NonHomeState extends Monitoring
             {
-                default: throw new IllegalStateException();
-                case Safe:
-                case Investigating:
-                    break;
-                case Unsafe:
-                    nonHomeState = StillUnsafe;
-                    break;
-                case StillUnsafe:
+                NonHomeState()
+                {
+                    setProgress(Expected);
+                }
+
+                void setSafe()
+                {
+                    setProgress(Done);
+                }
+
+                @Override
+                void run(Command command)
+                {
                     // make sure a quorum of the home shard is aware of the 
transaction, so we can rely on it to ensure progress
                     Future<Void> inform = inform(node, txnId, 
command.homeKey());
                     inform.addCallback((success, fail) -> {
-                        if (nonHomeState == Safe)
+                        if (progress() == Done)
                             return;
 
-                        if (fail != null) nonHomeState = Unsafe;
-                        else nonHomeState = Safe;
+                        setProgress(fail != null ? Expected : Done);
                     });
-                    break;
+                }
+
+                @Override
+                public String toString()
+                {
+                    return progress() == Done ? "Safe" : "Unsafe";
+                }
             }
-        }
 
-        void update(Node node)
-        {
-            PreLoadContext context = contextFor(txnId);
-            commandStore.execute(context, safeStore -> {
-                Command command = safeStore.command(txnId);
-                if (blockingState != null)
-                    blockingState.update(node, txnId, command);
+            final TxnId txnId;
 
-                if (coordinateState != null)
-                    coordinateState.update(node, safeStore.commandStore(), 
txnId, command);
+            CoordinateState coordinateState;
+            DisseminateState disseminateState;
+            NonHomeState nonHomeState;
+            BlockingState blockingState;
 
-                if (disseminateState != null)
-                    disseminateState.update(node, txnId, command);
+            State(TxnId txnId)
+            {
+                this.txnId = txnId;
+            }
 
-                if (nonHomeState != null)
-                    updateNonHome(node, command);
-            }).addCallback(commandStore.agent());
-        }
+            void recordBlocking(TxnId txnId, Known waitingFor, Unseekables<?, 
?> routables)
+            {
+                Invariants.checkArgument(txnId.equals(this.txnId));
+                if (blockingState == null)
+                    blockingState = new BlockingState();
+                blockingState.recordBlocking(waitingFor, routables);
+            }
 
-        @Override
-        public String toString()
-        {
-            return coordinateState != null ? coordinateState.toString()
-                                           : nonHomeState != null
-                                       ? nonHomeState.toString()
-                                       : blockingState.toString();
-        }
-    }
+            CoordinateState local()
+            {
+                if (coordinateState == null)
+                    coordinateState = new CoordinateState();
+                return coordinateState;
+            }
 
-    final Node node;
-    final List<Instance> instances = new CopyOnWriteArrayList<>();
+            DisseminateState global()
+            {
+                if (disseminateState == null)
+                    disseminateState = new DisseminateState();
+                return disseminateState;
+            }
 
-    public SimpleProgressLog(Node node)
-    {
-        this.node = node;
-        node.scheduler().recurring(this, 200L, TimeUnit.MILLISECONDS);
-    }
+            void ensureAtLeast(Command command, CoordinateStatus newStatus, 
Progress newProgress)
+            {
+                local().ensureAtLeast(command, newStatus, newProgress);
+            }
+
+            void ensureAtLeast(CoordinateStatus newStatus, Progress 
newProgress)
+            {
+                local().ensureAtLeast(newStatus, newProgress);
+            }
+
+            void touchNonHomeUnsafe()
+            {
+                if (nonHomeState == null)
+                    nonHomeState = new NonHomeState();
+            }
+
+            void setSafe()
+            {
+                if (nonHomeState == null)
+                    nonHomeState = new NonHomeState();
+                nonHomeState.setSafe();
+            }
+
+            @Override
+            public String toString()
+            {
+                return coordinateState != null ? coordinateState.toString()
+                        : nonHomeState != null
+                        ? nonHomeState.toString()
+                        : blockingState.toString();
+            }
+        }
 
-    class Instance implements ProgressLog
-    {
         final CommandStore commandStore;
         final Map<TxnId, State> stateMap = new HashMap<>();
+        boolean isScheduled;
 
         Instance(CommandStore commandStore)
         {
             this.commandStore = commandStore;
-            instances.add(this);
         }
 
         State ensure(TxnId txnId)
         {
-            return stateMap.computeIfAbsent(txnId, id -> new State(id, 
commandStore));
+            return stateMap.computeIfAbsent(txnId, State::new);
         }
 
         State ensure(TxnId txnId, State state)
@@ -635,23 +642,21 @@ public class SimpleProgressLog implements Runnable, 
ProgressLog.Factory
             return state != null ? state : ensure(txnId);
         }
 
-        @Override
-        public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard 
shard)
-        {
-            if (shard.isHome())
-                ensure(txnId).ensureAtLeast(txnId, homeKey, Uncommitted, 
Expected);
-        }
-
-        @Override
-        public void preaccepted(Command command, ProgressShard shard)
+        private void ensureSafeOrAtLeast(Command command, ProgressShard shard, 
CoordinateStatus newStatus, Progress newProgress)
         {
             Invariants.checkState(shard != Unsure);
 
+            State state = null;
+            assert newStatus.isAtMostReadyToExecute();
+            if (newStatus.isAtLeastCommitted())
+                state = recordCommit(command.txnId());
+
             if (shard.isProgress())
             {
-                State state = ensure(command.txnId());
-                if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, 
Expected);
-                else state.ensureAtLeast(NonHomeState.Unsafe);
+                state = ensure(command.txnId(), state);
+
+                if (shard.isHome()) state.ensureAtLeast(command, newStatus, 
newProgress);
+                else ensure(command.txnId()).setSafe();
             }
         }
 
@@ -671,21 +676,23 @@ public class SimpleProgressLog implements Runnable, 
ProgressLog.Factory
             return state;
         }
 
-        private void ensureSafeOrAtLeast(Command command, ProgressShard shard, 
CoordinateStatus newStatus, Progress newProgress)
+        @Override
+        public void unwitnessed(TxnId txnId, RoutingKey homeKey, ProgressShard 
shard)
         {
-            Invariants.checkState(shard != Unsure);
+            if (shard.isHome())
+                ensure(txnId).ensureAtLeast(Uncommitted, Expected);
+        }
 
-            State state = null;
-            assert newStatus.isAtMost(ReadyToExecute);
-            if (newStatus.isAtLeast(CoordinateStatus.Committed))
-                state = recordCommit(command.txnId());
+        @Override
+        public void preaccepted(Command command, ProgressShard shard)
+        {
+            Invariants.checkState(shard != Unsure);
 
             if (shard.isProgress())
             {
-                state = ensure(command.txnId(), state);
-
-                if (shard.isHome()) state.ensureAtLeast(command, newStatus, 
newProgress);
-                else ensure(command.txnId()).ensureAtLeast(Safe);
+                State state = ensure(command.txnId());
+                if (shard.isHome()) state.ensureAtLeast(command, Uncommitted, 
Expected);
+                else state.touchNonHomeUnsafe();
             }
         }
 
@@ -704,7 +711,7 @@ public class SimpleProgressLog implements Runnable, 
ProgressLog.Factory
         @Override
         public void readyToExecute(Command command, ProgressShard shard)
         {
-            ensureSafeOrAtLeast(command, shard, 
CoordinateStatus.ReadyToExecute, Expected);
+            ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected);
         }
 
         @Override
@@ -712,7 +719,7 @@ public class SimpleProgressLog implements Runnable, 
ProgressLog.Factory
         {
             recordApply(command.txnId());
             // this is the home shard's state ONLY, so we don't know it is 
fully durable locally
-            ensureSafeOrAtLeast(command, shard, 
CoordinateStatus.ReadyToExecute, Expected);
+            ensureSafeOrAtLeast(command, shard, ReadyToExecute, Expected);
         }
 
         @Override
@@ -728,7 +735,7 @@ public class SimpleProgressLog implements Runnable, 
ProgressLog.Factory
                 state = ensure(command.txnId(), state);
 
                 if (shard.isHome()) state.ensureAtLeast(command, 
CoordinateStatus.Done, Done);
-                else ensure(command.txnId()).ensureAtLeast(Safe);
+                else ensure(command.txnId()).setSafe();
             }
         }
 
@@ -753,43 +760,78 @@ public class SimpleProgressLog implements Runnable, 
ProgressLog.Factory
         public void durable(TxnId txnId, Unseekables<?, ?> unseekables, 
ProgressShard shard)
         {
             State state = ensure(txnId);
-            // TODO: we can probably simplify things by requiring (empty) 
Apply messages to be sent also to the coordinating topology
+            // TODO (progress consider-prerelease): we can probably simplify 
things by requiring (empty) Apply messages to be sent also to the coordinating 
topology
             state.recordBlocking(txnId, PreApplied.minKnown, unseekables);
         }
 
+        @Override
         public void waiting(TxnId blockedBy, Known blockedUntil, 
Unseekables<?, ?> blockedOn)
         {
-            // TODO (soon): forward to progress shard for processing (if known)
+            // TODO (perf+ consider-prerelease): consider triggering a 
preemption of existing coordinator (if any) in some circumstances;
+            //     today, an LWT can pre-empt more efficiently (i.e. 
instantly) a failed operation whereas Accord will
+            //     wait for some progress interval before taking over; there 
is probably some middle ground where we trigger
+            //     faster preemption once we're blocked on a transaction, 
while still offering some amount of time to complete.
+            // TODO (soon): forward to local progress shard for processing (if 
known)
             // TODO (soon): if we are co-located with the home shard, don't 
need to do anything unless we're in a
             //              later topology that wasn't covered by its 
coordination
             ensure(blockedBy).recordBlocking(blockedBy, blockedUntil, 
blockedOn);
         }
-    }
 
-    @Override
-    public void run()
-    {
-        for (Instance instance : instances)
+        @Override
+        public void addFirst(Monitoring add)
         {
-            // TODO: we want to be able to poll others about pending 
dependencies to check forward progress,
-            //       as we don't know all dependencies locally (or perhaps 
any, at execution time) so we may
-            //       begin expecting forward progress too early
-            new ArrayList<>(instance.stateMap.values()).forEach(state -> {
-                try
-                {
-                    state.update(node);
-                }
-                catch (Throwable t)
+            super.addFirst(add);
+            ensureScheduled();
+        }
+
+        @Override
+        public void addLast(Monitoring add)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        void ensureScheduled()
+        {
+            if (isScheduled)
+                return;
+
+            isScheduled = true;
+            node.scheduler().once(() -> 
commandStore.execute(PreLoadContext.empty(), ignore -> run()), 200L, 
TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public void run()
+        {
+            isScheduled = false;
+            try
+            {
+                for (Monitoring run : this)
                 {
-                    node.agent().onUncaughtException(t);
+                    if (run.shouldRun())
+                    {
+                        commandStore.execute(contextFor(run.txnId()), 
safeStore -> {
+                            run.run(safeStore.command(run.txnId()));
+                        });
+                    }
                 }
-            });
+            }
+            catch (Throwable t)
+            {
+                t.printStackTrace();
+            }
+            finally
+            {
+                if (!isEmpty())
+                    ensureScheduled();
+            }
         }
     }
 
     @Override
-    public ProgressLog create(CommandStore commandStore)
+    public Instance create(CommandStore commandStore)
     {
-        return new Instance(commandStore);
+        Instance instance = new Instance(commandStore);
+        instances.add(instance);
+        return instance;
     }
 }
diff --git 
a/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java 
b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
index 39bac13..2d7f6b8 100644
--- a/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
+++ b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
@@ -46,6 +46,6 @@ public class SizeOfIntersectionSorter implements 
TopologySorter
 
     private static int count(Node.Id node, ShardSelection shards, int offset, 
Topology topology)
     {
-        return topology.foldlIntOn(node, (i, shard, v) -> shard.get(i) ? v + 1 
: v, shards, offset, 0, 0);
+        return topology.foldlIntOn(node, (shard, v, index) -> shard.get(index) 
? v + 1 : v, shards, offset, 0, 0);
     }
 }
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index cf483a5..7b8cacb 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -1021,7 +1021,7 @@ public abstract class Command implements CommandListener, 
BiConsumer<SafeCommand
                 if (partialTxn() != null)
                 {
                     partialTxn = partialTxn.slice(allRanges, shard.isHome());
-                    Routables.foldlMissing((Seekables)partialTxn.keys(), 
partialTxn().keys(), (i, keyOrRange, p, v) -> {
+                    Routables.foldlMissing((Seekables)partialTxn.keys(), 
partialTxn().keys(), (keyOrRange, p, v, i) -> {
                         // TODO: duplicate application of ranges
                         safeStore.forEach(keyOrRange, allRanges, forKey -> 
forKey.register(this));
                         return v;
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index cecacba..87d39b2 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -191,7 +191,7 @@ public abstract class CommandStores<S extends CommandStore>
                     for (int i = Math.max(0, indexForEpoch(minEpoch)), maxi = 
indexForEpoch(maxEpoch); i <= maxi ; ++i)
                     {
                         // include every shard if we match a range
-                        accumulate = Routables.foldl((Ranges)keysOrRanges, 
ranges[i], (idx, k, p, a) -> p, terminalValue, accumulate, terminalValue);
+                        accumulate = Routables.foldl((Ranges)keysOrRanges, 
ranges[i], (k, p, a, idx) -> p, terminalValue, accumulate, terminalValue);
                     }
                     return accumulate;
                 }
@@ -208,7 +208,7 @@ public abstract class CommandStores<S extends CommandStore>
             return Integer.toUnsignedLong(key.routingHash()) % numShards;
         }
 
-        private static long addKeyIndex(int i, RoutableKey key, long 
numShards, long accumulate)
+        private static long addKeyIndex(RoutableKey key, long numShards, long 
accumulate, int i)
         {
             return accumulate | (1L << keyIndex(key, numShards));
         }
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java 
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 19cb9f1..8421c63 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -301,7 +301,7 @@ public class BeginRecovery extends 
TxnRequest<BeginRecovery.RecoverReply>
         {
             commandStore.forEach(keys, ranges, forKey -> {
                 // accepted txns with an earlier txnid that do not have our 
txnid as a dependency
-                /**
+                /*
                  * The idea here is to discover those transactions that have 
been Accepted without witnessing us
                  * and whom may not have adopted us as dependencies as 
responses to the Accept. Once we have
                  * reached a quorum for recovery any re-proposals will 
discover us. So we do not need to look
@@ -327,13 +327,13 @@ public class BeginRecovery extends 
TxnRequest<BeginRecovery.RecoverReply>
         try (Deps.OrderedBuilder builder = Deps.orderedBuilder(true))
         {
             commandStore.forEach(keys, ranges, forKey -> {
-                /**
+                /*
                  * The idea here is to discover those transactions that have 
been Committed and DID witness us
                  * so that we can remove these from the set of 
acceptedStartedBeforeAndDidNotWitness
                  * on other nodes, to minimise the number of transactions we 
try to wait for on recovery
                  */
                 builder.nextKey(forKey.key());
-                forKey.committedById().before(txnId, RorWs, WITH, txnId, 
ANY_STATUS, null)
+                forKey.committedById().before(txnId, RorWs, WITH, txnId, 
HAS_BEEN, Committed)
                         .forEach(builder::add);
             });
             return builder.build();
@@ -342,7 +342,7 @@ public class BeginRecovery extends 
TxnRequest<BeginRecovery.RecoverReply>
 
     private static Stream<? extends TxnIdWithExecuteAt> 
acceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId 
startedAfter, Ranges ranges, Seekables<?, ?> keys)
     {
-        /**
+        /*
          * The idea here is to discover those transactions that were started 
after us and have been Accepted
          * and did not witness us as part of their pre-accept round, as this 
means that we CANNOT have taken
          * the fast path. This is central to safe recovery, as if every 
transaction that executes later has
@@ -356,14 +356,14 @@ public class BeginRecovery extends 
TxnRequest<BeginRecovery.RecoverReply>
 
     private static Stream<TxnId> 
committedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId 
startedAfter, Ranges ranges, Seekables<?, ?> keys)
     {
-        /**
+        /*
          * The idea here is to discover those transactions that have been 
decided to execute after us
          * and did not witness us as part of their pre-accept or accept round, 
as this means that we CANNOT have
          * taken the fast path. This is central to safe recovery, as if every 
transaction that executes later has
          * witnessed us we are safe to propose the pre-accept timestamp 
regardless, whereas if any transaction
          * has not witnessed us we can safely invalidate it.
          */
-        return commandStore.mapReduce(keys, ranges, forKey -> 
forKey.committedByExecuteAt().after(startedAfter, RorWs, WITHOUT, startedAfter, 
ANY_STATUS, null),
+        return commandStore.mapReduce(keys, ranges, forKey -> 
forKey.committedByExecuteAt().after(startedAfter, RorWs, WITHOUT, startedAfter, 
HAS_BEEN, Committed),
                 Stream::concat, Stream.empty());
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/Defer.java 
b/accord-core/src/main/java/accord/messages/Defer.java
index c528c11..07a592b 100644
--- a/accord-core/src/main/java/accord/messages/Defer.java
+++ b/accord-core/src/main/java/accord/messages/Defer.java
@@ -6,8 +6,8 @@ import java.util.function.Function;
 import accord.local.*;
 import accord.local.Status.Known;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 
-import static accord.local.PreLoadContext.contextFor;
 import static accord.messages.Defer.Ready.Expired;
 import static accord.messages.Defer.Ready.No;
 import static accord.messages.Defer.Ready.Yes;
@@ -73,7 +73,8 @@ class Defer implements CommandListener
     @Override
     public PreLoadContext listenerPreLoadContext(TxnId caller)
     {
-        return contextFor(caller);
+        Invariants.checkState(caller.equals(request.txnId));
+        return request;
     }
 }
 
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java 
b/accord-core/src/main/java/accord/messages/InformDurable.java
index 66da6aa..3631003 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -52,6 +52,8 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
         {
             // we need to pick a progress log, but this node might not have 
participated in the coordination epoch
             // in this rare circumstance we simply pick a key to select some 
progress log to coordinate this
+            // TODO (now): We might not replicate either txnId.epoch OR 
executeAt.epoch, but some inbetween.
+            //             Do we need to receive this message in that case? If 
so, we need to account for this when selecting a progress key
             at = executeAt;
             progressKey = node.selectProgressKey(executeAt.epoch, scope, 
scope.homeKey());
             shard = Adhoc;
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java 
b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index 47725fe..a089823 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -68,7 +68,7 @@ public abstract class AbstractKeys<K extends RoutableKey, KS 
extends Routables<K
 
     public final boolean containsAll(Routables<?, ?> keysOrRanges)
     {
-        return keysOrRanges.size() == Routables.foldl(keysOrRanges, this, (i, 
k, p, v) -> v + 1, 0, 0, 0);
+        return keysOrRanges.size() == Routables.foldl(keysOrRanges, this, (k, 
p, v, i) -> v + 1, 0, 0, 0);
     }
 
     @Override
@@ -152,12 +152,12 @@ public abstract class AbstractKeys<K extends RoutableKey, 
KS extends Routables<K
 
     public boolean any(Ranges ranges, Predicate<? super K> predicate)
     {
-        return 1 == foldl(ranges, (i1, key, i2, i3) -> predicate.test(key) ? 1 
: 0, 0, 0, 1);
+        return 1 == foldl(ranges, (key, p2, prev, index) -> 
predicate.test(key) ? 1 : 0, 0, 0, 1);
     }
 
     public boolean any(Predicate<? super K> predicate)
     {
-        return 1 == foldl((i, key, p, v) -> predicate.test(key) ? 1 : 0, 0, 0, 
1);
+        return 1 == foldl((key, p2, prev, index) -> predicate.test(key) ? 1 : 
0, 0, 0, 1);
     }
 
     public boolean none(Predicate<? super K> predicate)
@@ -182,7 +182,7 @@ public abstract class AbstractKeys<K extends RoutableKey, 
KS extends Routables<K
     @Inline
     public final void forEach(Ranges rs, Consumer<? super K> forEach)
     {
-        Routables.foldl(this, rs, (i, k, consumer) -> { consumer.accept(k); 
return consumer; }, forEach);
+        Routables.foldl(this, rs, (k, consumer, i) -> { consumer.accept(k); 
return consumer; }, forEach);
     }
 
     @Inline
@@ -196,7 +196,7 @@ public abstract class AbstractKeys<K extends RoutableKey, 
KS extends Routables<K
     {
         for (int i = 0; i < keys.length; i++)
         {
-            initialValue = fold.apply(i, keys[i], param, initialValue);
+            initialValue = fold.apply(keys[i], param, initialValue, i);
             if (terminalValue == initialValue)
                 return initialValue;
         }
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java 
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index 7b5a24c..a377221 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -77,7 +77,7 @@ public abstract class AbstractRanges<RS extends 
Routables<Range, ?>> implements
     {
         if (this.isEmpty()) return that.isEmpty();
         if (that.isEmpty()) return true;
-        return Routables.rangeFoldl(that, this, (from, to, p, v) -> v + (to - 
from), 0, 0, 0) == that.size();
+        return Routables.rangeFoldl(that, this, (p, v, from, to) -> v + (to - 
from), 0, 0, 0) == that.size();
     }
 
     /**
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java 
b/accord-core/src/main/java/accord/primitives/Deps.java
index fef69b9..6f9cc2c 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -35,7 +35,6 @@ import accord.utils.Invariants;
 import static accord.utils.ArrayBuffers.*;
 import static accord.utils.SortedArrays.*;
 import static accord.utils.SortedArrays.Search.FAST;
-import static accord.utils.Utils.listOf;
 
 /**
  * A collection of dependencies for a transaction, organised by the key the 
dependency is adopted via.
@@ -153,6 +152,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>>
 
             if (totalCount != keyOffset && !hasOrderedTxnId)
             {
+                // TODO: this allocates a significant amount of memory: would 
be preferable to be able to sort using a pre-defined scratch buffer
                 Arrays.sort(keyToTxnId, keyOffset, totalCount);
                 for (int i = keyOffset + 1 ; i < totalCount ; ++i)
                 {
@@ -218,7 +218,7 @@ public class Deps implements Iterable<Map.Entry<Key, TxnId>>
                 result[k] = keyCount + keyLimits[k];
                 int from = k == 0 ? 0 : keyLimits[k - 1];
                 int to = keyLimits[k];
-                offset = (int)SortedArrays.foldlIntersection(txnIds, 0, 
txnIdCount, keyToTxnId, from, to, (li, ri, key, p, v) -> {
+                offset = (int)SortedArrays.foldlIntersection(txnIds, 0, 
txnIdCount, keyToTxnId, from, to, (key, p, v, li, ri) -> {
                     result[(int)v] = li;
                     return v + 1;
                 }, keyCount, offset, -1);
@@ -885,63 +885,63 @@ public class Deps implements Iterable<Map.Entry<Key, 
TxnId>>
             return this;
 
         IntBuffers cache = ArrayBuffers.cachedInts();
-        int[] remapTxnIds = cache.getInts(txnIds.length);
-        int[] keyToTxnId = null;
+        TxnId[] oldTxnIds = txnIds;
+        int[] oldKeyToTxnId = keyToTxnId;
+        int[] remapTxnIds = cache.getInts(oldTxnIds.length);
+        int[] newKeyToTxnId = null;
+        TxnId[] newTxnIds;
         int o = 0;
         try
         {
-            TxnId[] txnIds; {
-                int count = 0;
-                for (int i = 0 ; i < this.txnIds.length ; ++i)
-                {
-                    if (remove.test(this.txnIds[i])) remapTxnIds[i] = -1;
-                    else remapTxnIds[i] = count++;
-                }
+            int count = 0;
+            for (int i = 0 ; i < oldTxnIds.length ; ++i)
+            {
+                if (remove.test(oldTxnIds[i])) remapTxnIds[i] = -1;
+                else remapTxnIds[i] = count++;
+            }
 
-                if (count == this.txnIds.length)
-                    return this;
+            if (count == oldTxnIds.length)
+                return this;
 
-                if (count == 0)
-                    return NONE;
+            if (count == 0)
+                return NONE;
 
-                txnIds = new TxnId[count];
-                for (int i = 0 ; i < this.txnIds.length ; ++i)
-                {
-                    if (remapTxnIds[i] >= 0)
-                        txnIds[remapTxnIds[i]] = this.txnIds[i];
-                }
+            newTxnIds = new TxnId[count];
+            for (int i = 0 ; i < oldTxnIds.length ; ++i)
+            {
+                if (remapTxnIds[i] >= 0)
+                    newTxnIds[remapTxnIds[i]] = oldTxnIds[i];
             }
 
-            keyToTxnId = cache.getInts(this.keyToTxnId.length);
+            newKeyToTxnId = cache.getInts(oldKeyToTxnId.length);
             int k = 0, i = keys.size();
             o = i;
-            while (i < this.keyToTxnId.length)
+            while (i < oldKeyToTxnId.length)
             {
-                while (this.keyToTxnId[k] == i)
-                    keyToTxnId[k++] = o;
+                while (oldKeyToTxnId[k] == i)
+                    newKeyToTxnId[k++] = o;
 
-                int remapped = remapTxnIds[this.keyToTxnId[i]];
+                int remapped = remapTxnIds[oldKeyToTxnId[i]];
                 if (remapped >= 0)
-                    keyToTxnId[o++] = remapped;
+                    newKeyToTxnId[o++] = remapped;
                 ++i;
             }
 
             while (k < keys.size())
-                keyToTxnId[k++] = o;
-
-            int[] result = cache.complete(keyToTxnId, o);
-            cache.discard(keyToTxnId, o);
-            return new Deps(keys, txnIds, result);
+                newKeyToTxnId[k++] = o;
         }
         catch (Throwable t)
         {
-            cache.forceDiscard(keyToTxnId, o);
+            cache.forceDiscard(newKeyToTxnId, o);
             throw t;
         }
         finally
         {
-            cache.forceDiscard(remapTxnIds, txnIds.length);
+            cache.forceDiscard(remapTxnIds, oldTxnIds.length);
         }
+
+        newKeyToTxnId = cache.completeAndDiscard(newKeyToTxnId, o);
+        return new Deps(keys, newTxnIds, newKeyToTxnId);
     }
 
     public boolean contains(TxnId txnId)
@@ -1054,7 +1054,7 @@ public class Deps implements Iterable<Map.Entry<Key, 
TxnId>>
 
     public void forEachOn(Ranges ranges, Predicate<Key> include, 
BiConsumer<Key, TxnId> forEach)
     {
-        Routables.foldl(keys, ranges, (index, key, value) -> {
+        Routables.foldl(keys, ranges, (key, value, index) -> {
             if (!include.test(key))
                 return null;
 
@@ -1082,7 +1082,7 @@ public class Deps implements Iterable<Map.Entry<Key, 
TxnId>>
         // does not rely on this ordering.
         for (int offset = 0 ; offset < txnIds.length ; offset += 64)
         {
-            long bitset = Routables.foldl(keys, ranges, (keyIndex, key, off, 
value) -> {
+            long bitset = Routables.foldl(keys, ranges, (key, off, value, 
keyIndex) -> {
                 if (!include.test(key))
                     return value;
 
@@ -1151,7 +1151,7 @@ public class Deps implements Iterable<Map.Entry<Key, 
TxnId>>
 
     public Collection<TxnId> txnIds()
     {
-        return listOf(txnIds);
+        return Arrays.asList(txnIds);
     }
 
     public List<TxnId> txnIds(Key key)
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java 
b/accord-core/src/main/java/accord/primitives/Routables.java
index 05e85ba..94d4baa 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -148,7 +148,7 @@ public interface Routables<K extends Routable, U extends 
Routables<K, ?>> extend
                 int nexti = valueIntersections.findLimit(is, i, ms, m);
                 while (i < nexti)
                 {
-                    initialValue = fold.apply(i, is.get(i), initialValue);
+                    initialValue = fold.apply(is.get(i), initialValue, i);
                     ++i;
                 }
             }
@@ -174,7 +174,7 @@ public interface Routables<K extends Routable, U extends 
Routables<K, ?>> extend
                 int nexti = valueIntersections.findLimit(is, i, ms, m);
                 while (i < nexti)
                 {
-                    initialValue = fold.apply(i, is.get(i), param, 
initialValue);
+                    initialValue = fold.apply(is.get(i), param, initialValue, 
i);
                     if (initialValue == terminalValue)
                         break done;
                     ++i;
@@ -199,7 +199,7 @@ public interface Routables<K extends Routable, U extends 
Routables<K, ?>> extend
                 int nexti = (int)(im);
                 while (i < nexti)
                 {
-                    initialValue = fold.apply(i, is.get(i), param, 
initialValue);
+                    initialValue = fold.apply(is.get(i), param, initialValue, 
i);
                     if (initialValue == terminalValue)
                         break done;
                     ++i;
@@ -227,7 +227,7 @@ public interface Routables<K extends Routable, U extends 
Routables<K, ?>> extend
                 m = (int)(kri >>> 32);
 
                 int nexti = valueIntersections.findLimit(is, i, ms, m);
-                initialValue = fold.apply(i, nexti, param, initialValue);
+                initialValue = fold.apply(param, initialValue, i, nexti);
                 if (initialValue == terminalValue)
                     break;
                 i = nexti;
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java 
b/accord-core/src/main/java/accord/primitives/Txn.java
index d475119..907bdaf 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -175,7 +175,7 @@ public interface Txn
     default Future<Data> read(SafeCommandStore safeStore, Command command)
     {
         Ranges ranges = safeStore.ranges().at(command.executeAt().epoch);
-        List<Future<Data>> futures = read().keys().foldl(ranges, (index, key, 
accumulate) -> {
+        List<Future<Data>> futures = read().keys().foldl(ranges, (key, 
accumulate, index) -> {
             if (!safeStore.commandStore().hashIntersects(key))
                 return accumulate;
 
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java 
b/accord-core/src/main/java/accord/primitives/Writes.java
index 2986dd0..b41d2bb 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -71,7 +71,7 @@ public class Writes
         if (ranges == null)
             return SUCCESS;
 
-        List<Future<Void>> futures = keys.foldl(ranges, (index, key, 
accumulate) -> {
+        List<Future<Void>> futures = keys.foldl(ranges, (key, accumulate, 
index) -> {
             if (safeStore.commandStore().hashIntersects(key))
                 accumulate.add(write.apply(key, safeStore, executeAt, 
safeStore.dataStore()));
             return accumulate;
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java 
b/accord-core/src/main/java/accord/topology/Topologies.java
index 4881f9e..f799193 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -65,7 +65,7 @@ public interface Topologies extends TopologySorter
     default void forEach(IndexedConsumer<Topology> consumer)
     {
         for (int i=0, mi=size(); i<mi; i++)
-            consumer.accept(i, get(i));
+            consumer.accept(get(i), i);
     }
 
     static boolean equals(Topologies t, Object o)
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index d7e93a7..277446f 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -27,6 +27,7 @@ import accord.api.RoutingKey;
 import accord.local.Node.Id;
 import accord.primitives.*;
 import accord.utils.*;
+import accord.utils.ArrayBuffers.IntBuffers;
 
 import static accord.utils.SortedArrays.Search.FLOOR;
 import static accord.utils.SortedArrays.exponentialSearch;
@@ -192,22 +193,22 @@ public class Topology
 
     public Topology forSelection(Unseekables<?, ?> select)
     {
-        return forSelection(select, (i, shard) -> true);
+        return forSelection(select, (ignore, index) -> true, null);
     }
 
-    public Topology forSelection(Unseekables<?, ?> select, 
IndexedPredicate<Shard> predicate)
+    public <P1> Topology forSelection(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param)
     {
-        return forSubset(subsetFor(select, predicate));
+        return forSubset(subsetFor(select, predicate, param));
     }
 
     public Topology forSelection(Unseekables<?, ?> select, Collection<Id> 
nodes)
     {
-        return forSelection(select, nodes, (i, shard) -> true);
+        return forSelection(select, nodes, (ignore, index) -> true, null);
     }
 
-    public Topology forSelection(Unseekables<?, ?> select, Collection<Id> 
nodes, IndexedPredicate<Shard> predicate)
+    public <P1> Topology forSelection(Unseekables<?, ?> select, Collection<Id> 
nodes, IndexedPredicate<P1> predicate, P1 param)
     {
-        return forSubset(subsetFor(select, predicate), nodes);
+        return forSubset(subsetFor(select, predicate, param), nodes);
     }
 
     private Topology forSubset(int[] newSubset)
@@ -236,66 +237,79 @@ public class Topology
         return new Topology(epoch, shards, ranges, nodeLookup, rangeSubset, 
newSubset);
     }
 
-    private int[] subsetFor(Unseekables<?, ?> select, IndexedPredicate<Shard> 
predicate)
+    private <P1> int[] subsetFor(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param)
     {
         int count = 0;
-        int[] newSubset = new int[Math.min(select.size(), 
subsetOfRanges.size())];
-        Unseekables<?, ?> as = select;
-        Ranges bs = subsetOfRanges;
-        int ai = 0, bi = 0;
-        // ailim tracks which ai have been included; since there may be 
multiple matches
-        // we cannot increment ai to avoid missing a match with a second bi
-        int ailim = 0;
-
-        if (subsetOfRanges == ranges)
+        IntBuffers cachedInts = ArrayBuffers.cachedInts();
+        int[] newSubset = cachedInts.getInts(Math.min(select.size(), 
subsetOfRanges.size()));
+        try
         {
-            while (true)
+            Routables<?, ?> as = select;
+            Ranges bs = subsetOfRanges;
+            int ai = 0, bi = 0;
+            // ailim tracks which ai have been included; since there may be 
multiple matches
+            // we cannot increment ai to avoid missing a match with a second bi
+            int ailim = 0;
+
+            if (subsetOfRanges == ranges)
             {
-                long abi = as.findNextIntersection(ai, bs, bi);
-                if (abi < 0)
+                while (true)
                 {
-                    if (ailim < select.size())
-                        throw new IllegalArgumentException("Range not found 
for " + select.get(ailim));
-                    break;
+                    long abi = as.findNextIntersection(ai, bs, bi);
+                    if (abi < 0)
+                    {
+                        if (ailim < as.size())
+                            throw new IllegalArgumentException("Range not 
found for " + as.get(ailim));
+                        break;
+                    }
+
+                    ai = (int)abi;
+                    if (ailim < ai)
+                        throw new IllegalArgumentException("Range not found 
for " + as.get(ailim));
+
+                    bi = (int)(abi >>> 32);
+                    if (predicate.test(param, bi))
+                        newSubset[count++] = bi;
+
+                    ailim = as.findNext(ai + 1, bs.get(bi), FLOOR);
+                    if (ailim < 0) ailim = -1 - ailim;
+                    else ailim++;
+                    ++bi;
                 }
+            }
+            else
+            {
+                while (true)
+                {
+                    long abi = as.findNextIntersection(ai, bs, bi);
+                    if (abi < 0)
+                        break;
 
-                bi = (int)(abi >>> 32);
-                if (ailim < (int)abi)
-                    throw new IllegalArgumentException("Range not found for " 
+ select.get(ailim));
-
-                if (predicate.test(bi, shards[bi]))
-                    newSubset[count++] = bi;
+                    bi = (int)(abi >>> 32);
+                    if (predicate.test(param, bi))
+                        newSubset[count++] = bi;
 
-                ai = (int)abi;
-                ailim = as.findNext(ai + 1, bs.get(bi), FLOOR);
-                if (ailim < 0) ailim = -1 - ailim;
-                else ailim++;
-                ++bi;
+                    ++bi;
+                }
             }
         }
-        else
+        catch (Throwable t)
         {
-            while (true)
-            {
-                long abi = as.findNextIntersection(ai, bs, bi);
-                if (abi < 0)
-                    break;
+            cachedInts.forceDiscard(newSubset, count);
+            throw t;
+        }
 
-                bi = (int)(abi >>> 32);
-                if (predicate.test(bi, shards[bi]))
-                    newSubset[count++] = bi;
+        return cachedInts.completeAndDiscard(newSubset, count);
+    }
 
-                ++bi;
-            }
-        }
-        if (count != newSubset.length)
-            newSubset = Arrays.copyOf(newSubset, count);
-        return newSubset;
+    public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
Consumer<Id> nodes)
+    {
+        visitNodeForKeysOnceOrMore(select, (i1, i2) -> true, null, nodes);
     }
 
-    public void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
IndexedPredicate<Shard> predicate, Consumer<Id> nodes)
+    public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param, Consumer<Id> nodes)
     {
-        for (int shardIndex : subsetFor(select, predicate))
+        for (int shardIndex : subsetFor(select, predicate, param))
         {
             Shard shard = shards[shardIndex];
             for (Id id : shard.nodes)
@@ -318,7 +332,7 @@ public class Topology
             ai = (int)(abi);
             bi = (int)(abi >>> 32);
 
-            accumulator = function.apply(bi, shards[bi], accumulator);
+            accumulator = function.apply(shards[bi], accumulator, bi);
             ++bi;
         }
 
@@ -336,7 +350,7 @@ public class Topology
         {
             if (a[ai] == b[bi])
             {
-                consumer.accept(ai, shards[a[ai]]);
+                consumer.accept(shards[a[ai]], ai);
                 ++ai; ++bi;
             }
             else if (a[ai] < b[bi])
@@ -363,7 +377,7 @@ public class Topology
         {
             if (a[ai] == b[bi])
             {
-                O next = function.apply(offset + ai, p1, p2, p3);
+                O next = function.apply(p1, p2, p3, offset + ai);
                 initialValue = reduce.apply(initialValue, next);
                 ++ai; ++bi;
             }
@@ -394,7 +408,7 @@ public class Topology
         {
             if (a[ai] == b[bi])
             {
-                if (consumer.test(ai, shards[a[ai]]))
+                if (consumer.test(shards[a[ai]], ai))
                     ++count;
                 ++ai; ++bi;
             }
@@ -424,7 +438,7 @@ public class Topology
         {
             if (a[ai] == b[bi])
             {
-                initialValue = consumer.apply(offset + ai, param, 
initialValue);
+                initialValue = consumer.apply(param, initialValue, offset + 
ai);
                 if (terminalValue == initialValue)
                     return terminalValue;
                 ++ai; ++bi;
@@ -446,7 +460,7 @@ public class Topology
     public void forEach(IndexedConsumer<Shard> consumer)
     {
         for (int i = 0; i < supersetIndexes.length ; ++i)
-            consumer.accept(i, shards[supersetIndexes[i]]);
+            consumer.accept(shards[supersetIndexes[i]], i);
     }
 
     public int size()
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 2e7de96..ec1061f 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -113,7 +113,7 @@ public class TopologyManager implements 
ConfigurationService.Listener
                 return false;
             if (syncComplete)
                 return true;
-            Boolean result = global().foldl(intersect, (i, shard, acc) -> {
+            Boolean result = global().foldl(intersect, (shard, acc, i) -> {
                 if (acc == Boolean.FALSE)
                     return acc;
                 return syncTracker.get(i).hasReachedQuorum();
@@ -121,7 +121,7 @@ public class TopologyManager implements 
ConfigurationService.Listener
             return result == Boolean.TRUE;
         }
 
-        boolean shardIsUnsynced(int idx, Shard shard)
+        boolean shardIsUnsynced(int idx)
         {
             return !prevSynced || !syncTracker.get(idx).hasReachedQuorum();
         }
@@ -334,9 +334,9 @@ public class TopologyManager implements 
ConfigurationService.Listener
         {
             EpochState epochState = snapshot.epochs[i];
             if (epochState.epoch() < minEpoch)
-                epochState.global.visitNodeForKeysOnceOrMore(select, 
epochState::shardIsUnsynced, nodes::add);
+                epochState.global.visitNodeForKeysOnceOrMore(select, 
EpochState::shardIsUnsynced, epochState, nodes::add);
             else
-                epochState.global.visitNodeForKeysOnceOrMore(select, (i1, i2) 
-> true, nodes::add);
+                epochState.global.visitNodeForKeysOnceOrMore(select, 
nodes::add);
         }
 
         Topologies.Multi topologies = new Topologies.Multi(sorter, count);
@@ -344,9 +344,9 @@ public class TopologyManager implements 
ConfigurationService.Listener
         {
             EpochState epochState = snapshot.epochs[i];
             if (epochState.epoch() < minEpoch)
-                topologies.add(epochState.global.forSelection(select, nodes, 
epochState::shardIsUnsynced));
+                topologies.add(epochState.global.forSelection(select, nodes, 
EpochState::shardIsUnsynced, epochState));
             else
-                topologies.add(epochState.global.forSelection(select, nodes, 
(i1, i2) -> true));
+                topologies.add(epochState.global.forSelection(select, nodes, 
(ignore, idx) -> true, null));
         }
 
         return topologies;
@@ -362,7 +362,7 @@ public class TopologyManager implements 
ConfigurationService.Listener
         Set<Id> nodes = new LinkedHashSet<>();
         int count = (int)(1 + maxEpoch - minEpoch);
         for (int i = count - 1 ; i >= 0 ; --i)
-            snapshot.get(minEpoch + 
i).global().visitNodeForKeysOnceOrMore(keys, (i1, i2) -> true, nodes::add);
+            snapshot.get(minEpoch + 
i).global().visitNodeForKeysOnceOrMore(keys, nodes::add);
 
         Topologies.Multi topologies = new Topologies.Multi(sorter, count);
         for (int i = count - 1 ; i >= 0 ; --i)
diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java 
b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
index b035e26..2dce966 100644
--- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java
+++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
@@ -117,6 +117,19 @@ public class ArrayBuffers
          */
         boolean discard(int[] buffer, int usedSize);
 
+        /**
+         * Equivalent to
+         *   int[] result = complete(buffer, usedSize);
+         *   discard(buffer, usedSize);
+         *   return result;
+         */
+        default int[] completeAndDiscard(int[] buffer, int usedSize)
+        {
+            int[] result = complete(buffer, usedSize);
+            discard(buffer, usedSize);
+            return result;
+        }
+
         /**
          * Indicate this buffer is definitely unused, and return it to a pool 
if possible
          * @return true if the buffer is discarded (and discard-able), false 
if it was retained
@@ -577,7 +590,6 @@ public class ArrayBuffers
                 return;
 
             savedInts = buffer;
-            return;
         }
     }
 
diff --git a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java 
b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
index 80fb7ec..03b96ce 100644
--- a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedBiFunction<T, U, R>
+public interface IndexedBiFunction<P1, P2, O>
 {
-    R apply(int i, T t, U u);
+    O apply(P1 p1, P2 p2, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedConsumer.java 
b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
index 75aa8d0..cfa2c1b 100644
--- a/accord-core/src/main/java/accord/utils/IndexedConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedConsumer<V>
+public interface IndexedConsumer<P1>
 {
-    void accept(int i, V v);
+    void accept(P1 p1, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedFold.java 
b/accord-core/src/main/java/accord/utils/IndexedFold.java
index eec2223..079c16c 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFold.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFold.java
@@ -18,7 +18,6 @@
 
 package accord.utils;
 
-public interface IndexedFold<K, V>
+public interface IndexedFold<P1, Accumulate> extends IndexedBiFunction<P1, 
Accumulate, Accumulate>
 {
-    V apply(int index, K key, V value);
 }
\ No newline at end of file
diff --git 
a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java 
b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
index 6b3def1..0cd5a82 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedFoldIntersectToLong<K>
+public interface IndexedFoldIntersectToLong<P1>
 {
-    long apply(int leftIndex, int rightIndex, K key, long param, long prev);
+    long apply(P1 p1, long p2, long accumulate, int leftIndex, int rightIndex);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java 
b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
index e01f018..c36df26 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedFoldToLong<K>
+public interface IndexedFoldToLong<P1>
 {
-    long apply(int index, K key, long param, long prev);
+    long apply(P1 p1, long p2, long accumulate, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedFunction.java 
b/accord-core/src/main/java/accord/utils/IndexedFunction.java
index e4d662c..df7020e 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFunction.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedFunction<T, R>
+public interface IndexedFunction<P1, O>
 {
-    R apply(int i, T t);
+    O apply(P1 p1, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java 
b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
index 3d35adc..272c53a 100644
--- a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedIntFunction<T>
+public interface IndexedIntFunction<P1>
 {
-    int apply(int i, T t, int v);
+    int apply(P1 p1, int p2, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedPredicate.java 
b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
index db40c45..68b555b 100644
--- a/accord-core/src/main/java/accord/utils/IndexedPredicate.java
+++ b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedPredicate<V>
+public interface IndexedPredicate<P1>
 {
-    boolean test(int i, V v);
+    boolean test(P1 p1, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java 
b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
index abfc58e..6381e9c 100644
--- a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
@@ -20,5 +20,5 @@ package accord.utils;
 
 public interface IndexedRangeFoldToLong
 {
-    long apply(int from, int to, long param, long prev);
+    long apply(long p1, long p2, int fromIndex, int toIndex);
 }
\ No newline at end of file
diff --git 
a/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java 
b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
new file mode 100644
index 0000000..8fca7eb
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedRangeTriConsumer<P1, P2, P3>
+{
+    void accept(P1 p1, P2 p2, P3 p3, int fromIndex, int toIndex);
+}
diff --git a/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java 
b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
new file mode 100644
index 0000000..418c69c
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
@@ -0,0 +1,7 @@
+package accord.utils;
+
+// TODO (now): migrate to utils, but must standardise on parameter order with 
index last
+public interface IndexedTriConsumer<P1, P2, P3>
+{
+    void accept(P1 p1, P2 p2, P3 p3, int index);
+}
diff --git a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java 
b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
index 3c722d5..22b2e48 100644
--- a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
@@ -18,7 +18,7 @@
 
 package accord.utils;
 
-public interface IndexedTriFunction<I1, I2, I3, O>
+public interface IndexedTriFunction<P1, P2, P3, V>
 {
-    O apply(int i0, I1 i1, I2 i2, I3 i3);
+    V apply(P1 p1, P2 p2, P3 p3, int index);
 }
diff --git a/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java 
b/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java
new file mode 100644
index 0000000..eadcb92
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Spliterators.spliteratorUnknownSize;
+
+/**
+ * A simple intrusive double-linked list for maintaining a list of tasks,
+ * useful for invalidating queued ordered tasks
+ *
+ * TODO COPIED FROM CASSANDRA
+ */
+
+@SuppressWarnings("unchecked")
+public class IntrusiveLinkedList<O extends IntrusiveLinkedListNode> extends 
IntrusiveLinkedListNode implements Iterable<O>
+{
+    public IntrusiveLinkedList()
+    {
+        prev = next = this;
+    }
+
+    public void addFirst(O add)
+    {
+        if (add.next != null)
+            throw new IllegalStateException();
+        add(this, add, next);
+    }
+
+    public void addLast(O add)
+    {
+        if (add.next != null)
+            throw new IllegalStateException();
+        add(prev, add, this);
+    }
+
+    private void add(IntrusiveLinkedListNode after, IntrusiveLinkedListNode 
add, IntrusiveLinkedListNode before)
+    {
+        add.next = before;
+        add.prev = after;
+        before.prev = add;
+        after.next = add;
+    }
+
+    public O poll()
+    {
+        if (isEmpty())
+            return null;
+
+        IntrusiveLinkedListNode next = this.next;
+        next.remove();
+        return (O) next;
+    }
+
+    public boolean isEmpty()
+    {
+        return next == this;
+    }
+
+    public Iterator<O> iterator()
+    {
+        return new Iterator<O>()
+        {
+            IntrusiveLinkedListNode next = IntrusiveLinkedList.this.next;
+
+            @Override
+            public boolean hasNext()
+            {
+                return next != IntrusiveLinkedList.this;
+            }
+
+            @Override
+            public O next()
+            {
+                O result = (O)next;
+                if (result.next == null)
+                    throw new NullPointerException();
+                next = result.next;
+                return result;
+            }
+        };
+    }
+
+    public Stream<O> stream()
+    {
+        return StreamSupport.stream(spliteratorUnknownSize(iterator(), 
Spliterator.IMMUTABLE), false);
+    }
+}
+
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java 
b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
similarity index 66%
copy from accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
copy to accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
index e01f018..0270e28 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
@@ -18,7 +18,27 @@
 
 package accord.utils;
 
-public interface IndexedFoldToLong<K>
+/**
+ * TODO COPIED FROM CASSANDRA
+ */
+public abstract class IntrusiveLinkedListNode
 {
-    long apply(int index, K key, long param, long prev);
+    IntrusiveLinkedListNode prev;
+    IntrusiveLinkedListNode next;
+
+    protected boolean isFree()
+    {
+        return next == null;
+    }
+
+    protected void remove()
+    {
+        if (next != null)
+        {
+            prev.next = next;
+            next.prev = prev;
+            next = null;
+            prev = null;
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/utils/SortedArrays.java 
b/accord-core/src/main/java/accord/utils/SortedArrays.java
index 6068f9b..a0aa473 100644
--- a/accord-core/src/main/java/accord/utils/SortedArrays.java
+++ b/accord-core/src/main/java/accord/utils/SortedArrays.java
@@ -855,7 +855,7 @@ public class SortedArrays
             ai = (int)(abi);
             bi = (int)(abi >>> 32);
 
-            initialValue = fold.apply(ai, bi, as[ai], param, initialValue);
+            initialValue = fold.apply(as[ai], param, initialValue, ai, bi);
             if (initialValue == terminalValue)
                 break;
 
diff --git a/accord-core/src/test/java/accord/KeysTest.java 
b/accord-core/src/test/java/accord/KeysTest.java
index 808e506..d7867f2 100644
--- a/accord-core/src/test/java/accord/KeysTest.java
+++ b/accord-core/src/test/java/accord/KeysTest.java
@@ -26,7 +26,6 @@ import java.util.stream.IntStream;
 import accord.api.Key;
 import accord.impl.IntKey;
 import accord.impl.IntKey.Raw;
-import accord.impl.IntKey.Routing;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.Keys;
@@ -126,22 +125,22 @@ public class KeysTest
     void foldlTest()
     {
         List<Key> keys = new ArrayList<>();
-        long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), 
(i, key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), 
(key, p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1);
         assertEquals(16, result);
         assertEquals(keys(250, 350), Keys.of(keys));
 
         keys.clear();
-        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (i, 
key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (key, 
p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1);
         assertEquals(3616, result);
         assertEquals(keys(150, 250, 350, 450), Keys.of(keys));
 
         keys.clear();
-        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), (i, 
key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), 
(key, p2, v, i) -> { keys.add(key); return v * p2 + 1; }, 15, 0, -1);
         assertEquals(1, result);
         assertEquals(keys(550), Keys.of(keys));
 
         keys.clear();
-        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 
140), r(149, 151), r(560, 2000)), (i, key, p, v) -> { keys.add(key); return v * 
p + 1; }, 15, 0, -1);
+        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 
140), r(149, 151), r(560, 2000)), (key, p2, v, i) -> { keys.add(key); return v 
* p2 + 1; }, 15, 0, -1);
         assertEquals(1, result);
         assertEquals(keys(150), Keys.of(keys));
     }
@@ -232,21 +231,21 @@ public class KeysTest
         qt().forAll(keysGen()).check(list -> {
             Keys keys = Keys.of(list);
 
-            Assertions.assertEquals(keys.size(), 
keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (index, key, 
accum) -> accum + 1, 0));
-            Assertions.assertEquals(keys.size(), 
keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (index, key, 
ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE));
-            Assertions.assertEquals(keys.size(), keys.foldl((index, key, 
ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE));
+            Assertions.assertEquals(keys.size(), 
keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (key, accum, 
index) -> accum + 1, 0));
+            Assertions.assertEquals(keys.size(), 
keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (p1, ignore, 
accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE));
+            Assertions.assertEquals(keys.size(), keys.foldl((p1, ignore, 
accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE));
 
             // early termination
-            Assertions.assertEquals(1, 
keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (index, key, 
ignore, accum) -> accum + 1, -1, 0, 1));
-            Assertions.assertEquals(1, keys.foldl((index, key, ignore, accum) 
-> accum + 1, -1, 0, 1));
+            Assertions.assertEquals(1, 
keys.foldl(ranges(range(Integer.MIN_VALUE, Integer.MAX_VALUE)), (p1, ignore, 
accum, index) -> accum + 1, -1, 0, 1));
+            Assertions.assertEquals(1, keys.foldl((p1, ignore, accum, index) 
-> accum + 1, -1, 0, 1));
 
-            Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), 
(index, key, accum) -> accum + 1, 0));
-            Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), 
(index, key, ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE));
+            Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), 
(key, accum, index) -> accum + 1, 0));
+            Assertions.assertEquals(keys.size(), keys.foldl(ranges(keys), (p1, 
ignore, accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE));
 
             for (Key k : keys)
             {
-                Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (index, 
key, accum) -> accum + 1, 0));
-                Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (index, 
key, ignore, accum) -> accum + 1, -1, 0, Long.MAX_VALUE));
+                Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (key, 
accum, index) -> accum + 1, 0));
+                Assertions.assertEquals(1, keys.foldl(ranges(keys, k), (p1, 
ignore, accum, index) -> accum + 1, -1, 0, Long.MAX_VALUE));
             }
         });
     }
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 286f4b4..bc75e62 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
 
 import accord.impl.IntHashKey;
 import accord.impl.basic.Cluster;
@@ -269,7 +270,8 @@ public class BurnTest
     {
 //        Long overrideSeed = null;
         int count = 1;
-        Long overrideSeed = 1683848112394089134L;
+        Long overrideSeed = 188057951046487786L;
+        LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong;
         for (int i = 0 ; i < args.length ; i += 2)
         {
             switch (args[i])
@@ -282,11 +284,14 @@ public class BurnTest
                 case "-s":
                     overrideSeed = Long.parseLong(args[i + 1]);
                     count = 1;
+                    break;
+                case "--loop-seed":
+                    seedGenerator = new Random(Long.parseLong(args[i + 
1]))::nextLong;
             }
         }
         while (count-- > 0)
         {
-            run(overrideSeed != null ? overrideSeed : 
ThreadLocalRandom.current().nextLong());
+            run(overrideSeed != null ? overrideSeed : 
seedGenerator.getAsLong());
         }
     }
 
diff --git 
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java 
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
index acbb3b3..fc569cf 100644
--- 
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
+++ 
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -53,7 +53,7 @@ public abstract class TrackerReconciler<ST extends 
ShardTracker, T extends Abstr
             RequestStatus newStatus = invoke(next, tracker, from);
             for (int i = 0 ; i < topologies().size() ; ++i)
             {
-                topologies().get(i).forEachOn(from, (si, s) -> {
+                topologies().get(i).forEachOn(from, (s, si) -> {
                     counts[si].compute(next, (ignore, cur) -> cur + 1);
                 });
             }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 1ea9210..9e2988e 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -88,7 +88,8 @@ public class Cluster implements Scheduler
     private void add(Packet packet)
     {
         boolean isReply = packet.message instanceof Reply;
-        trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND", packet);
+        if (trace.isTraceEnabled())
+            trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND", 
packet);
         if (lookup.apply(packet.dst) == null) responseSink.accept(packet);
         else pending.add(packet);
     }
@@ -140,11 +141,14 @@ public class Cluster implements Scheduler
                              || !partitionSet.contains(deliver.src) && 
!partitionSet.contains(deliver.dst));
             if (drop)
             {
-                trace.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver);
+                if (trace.isTraceEnabled())
+                    trace.trace("{} DROP[{}] {}", clock++, on.epoch(), 
deliver);
                 return;
             }
 
-            trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
+            if (trace.isTraceEnabled())
+                trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
+
             if (deliver.message instanceof Reply)
             {
                 Reply reply = (Reply) deliver.message;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to