This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 256b35e2 Need to simulate Cassandra Journal in Accord BurnTest to 
detect issues earlier before they are seen in Cassandra (#87)
256b35e2 is described below

commit 256b35e27d170db9fcd8024d5678b4f6e9d3a956
Author: dcapwell <dcapw...@apache.org>
AuthorDate: Wed May 15 09:16:01 2024 -0700

    Need to simulate Cassandra Journal in Accord BurnTest to detect issues 
earlier before they are seen in Cassandra (#87)
    
    patch by Benedict Elliott Smith, David Capwell; reviewed by Benedict 
Elliott Smith, David Capwell for CASSANDRA-19618
---
 .../coordinate/AbstractCoordinatePreAccept.java    |   8 +-
 .../accord/coordinate/CoordinatePreAccept.java     |   2 +-
 .../accord/coordinate/CoordinationAdapter.java     |   5 +
 .../java/accord/impl/InMemoryCommandStore.java     |  26 +-
 .../main/java/accord/impl/InMemorySafeCommand.java |  28 ++
 .../src/main/java/accord/local/Bootstrap.java      |   5 +-
 .../src/main/java/accord/local/Command.java        |  32 +-
 .../src/main/java/accord/local/CommandStore.java   |   6 +
 .../src/main/java/accord/local/CommandsForKey.java |   5 +-
 .../main/java/accord/local/CommonAttributes.java   |   6 +
 .../main/java/accord/local/SafeCommandStore.java   |   2 +-
 .../main/java/accord/local/SerializerSupport.java  | 166 ++++++--
 .../src/main/java/accord/messages/Propagate.java   |   1 +
 .../src/main/java/accord/primitives/Routables.java |   6 +
 .../src/main/java/accord/utils/Invariants.java     |   5 +
 .../src/test/java/accord/burn/BurnTest.java        |  50 ++-
 .../src/test/java/accord/impl/MessageListener.java |  19 +
 .../src/test/java/accord/impl/basic/Cluster.java   | 146 ++++++-
 .../accord/impl/basic/DelayedCommandStores.java    | 107 ++++-
 .../src/test/java/accord/impl/basic/Journal.java   | 436 +++++++++++++++++++++
 .../src/test/java/accord/impl/list/ListRead.java   |  18 +
 .../src/test/java/accord/impl/list/ListResult.java |  35 ++
 .../src/test/java/accord/impl/list/ListStore.java  |  11 +
 .../src/test/java/accord/impl/list/ListWrite.java  |  27 ++
 .../src/test/java/accord/utils/AccordGens.java     |  44 +++
 accord-core/src/test/java/accord/utils/Gens.java   |  20 +-
 26 files changed, 1138 insertions(+), 78 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index 39083dbd..b5c5b49d 100644
--- 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++ 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -96,6 +96,7 @@ abstract class AbstractCoordinatePreAccept<T, R> extends 
SettableResult<T> imple
     }
 
     final Node node;
+    @Nullable
     final TxnId txnId;
     final FullRoute<?> route;
 
@@ -219,7 +220,7 @@ abstract class AbstractCoordinatePreAccept<T, R> extends 
SettableResult<T> imple
                 onNewEpochTopologyMismatch(mismatch);
                 return;
             }
-            topologies = node.topology().withUnsyncedEpochs(route, 
txnId.epoch(), latestEpoch);
+            topologies = node.topology().withUnsyncedEpochs(route, 
earliestEpoch(), latestEpoch);
             boolean equivalent = topologies.oldestEpoch() <= 
prevTopologies.currentEpoch();
             for (long epoch = topologies.currentEpoch() ; equivalent && epoch 
> prevTopologies.currentEpoch() ; --epoch)
                 equivalent = 
topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards());
@@ -236,6 +237,11 @@ abstract class AbstractCoordinatePreAccept<T, R> extends 
SettableResult<T> imple
         });
     }
 
+    protected long earliestEpoch()
+    {
+        return txnId == null ? executeAtEpoch() : txnId.epoch();
+    }
+
     @Override
     public final void accept(T success, Throwable failure)
     {
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
index 5f25e41a..1c19fc60 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
@@ -152,7 +152,7 @@ abstract class CoordinatePreAccept<T> extends 
AbstractCoordinatePreAccept<T, Pre
     void onPreAccepted(Topologies topologies)
     {
         Timestamp executeAt = foldl(oks, (ok, prev) -> 
mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
-        onPreAccepted(topologies, executeAt, oks);
+        node.withEpoch(executeAt.epoch(), () -> onPreAccepted(topologies, 
executeAt, oks));
     }
 
     abstract void onPreAccepted(Topologies topologies, Timestamp executeAt, 
List<PreAcceptOk> oks);
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java 
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 0b08dd2d..dc8de331 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -91,6 +91,11 @@ public interface CoordinationAdapter<R>
 
         public static <R> void stabilise(CoordinationAdapter<R> adapter, Node 
node, Topologies any, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn, 
Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback)
         {
+            if (!node.topology().hasEpoch(executeAt.epoch()))
+            {
+                node.withEpoch(executeAt.epoch(), () -> stabilise(adapter, 
node, any, route, ballot, txnId, txn, executeAt, deps, callback));
+                return;
+            }
             Topologies coordinates = any.forEpochs(txnId.epoch(), 
txnId.epoch());
             Topologies all;
             if (txnId.epoch() == executeAt.epoch()) all = coordinates;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index e94bac11..5ee3f8d5 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -388,6 +388,8 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         timestampsForKey.put(key, timestampsForKey((Key) 
key).createSafeReference());
     }
 
+    protected void validateRead(Command current) {}
+
     protected final InMemorySafeStore createSafeStore(PreLoadContext context, 
RangesForEpoch ranges)
     {
         Map<TxnId, InMemorySafeCommand> commands = new HashMap<>();
@@ -395,6 +397,14 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey = new 
HashMap<>();
 
         context.forEachId(txnId -> commands.put(txnId, lazyReference(txnId)));
+        for (InMemorySafeCommand safe : commands.values())
+        {
+            GlobalCommand global = safe.unsafeGlobal();
+            if (global == null) continue;
+            Command current = global.value();
+            if (current == null) continue;
+            validateRead(current);
+        }
 
         for (Seekable seekable : context.keys())
         {
@@ -402,8 +412,18 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             {
                 case Key:
                     RoutableKey key = (RoutableKey) seekable;
-                    commandsForKey.put(key, commandsForKey((Key) 
key).createSafeReference());
-                    timestampsForKey.put(key, timestampsForKey((Key) 
key).createSafeReference());
+                    switch (context.keyHistory())
+                    {
+                        case NONE:
+                            continue;
+                        case COMMANDS:
+                            commandsForKey.put(key, commandsForKey((Key) 
key).createSafeReference());
+                            break;
+                        case TIMESTAMPS:
+                            timestampsForKey.put(key, timestampsForKey((Key) 
key).createSafeReference());
+                            break;
+                        default: throw new 
UnsupportedOperationException("Unknown key history: " + context.keyHistory());
+                    }
                     break;
                 case Range:
                     // load range cfks here
@@ -633,7 +653,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     public static class InMemorySafeStore extends 
AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeTimestampsForKey, 
InMemorySafeCommandsForKey>
     {
         private final InMemoryCommandStore commandStore;
-        private final Map<TxnId, InMemorySafeCommand> commands;
+        protected final Map<TxnId, InMemorySafeCommand> commands;
         private final Map<RoutableKey, InMemorySafeTimestampsForKey> 
timestampsForKey;
         private final Map<RoutableKey, InMemorySafeCommandsForKey> 
commandsForKey;
         private final RangesForEpoch ranges;
diff --git a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java 
b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
index 267d5692..946f255a 100644
--- a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
@@ -18,8 +18,11 @@
 
 package accord.impl;
 
+import java.util.Objects;
 import java.util.function.Supplier;
 
+import javax.annotation.Nullable;
+
 import accord.impl.InMemoryCommandStore.GlobalCommand;
 import accord.local.Command;
 import accord.local.Listeners;
@@ -30,9 +33,11 @@ import static accord.utils.Invariants.illegalState;
 
 public class InMemorySafeCommand extends SafeCommand implements 
SafeState<Command>
 {
+    private static final Object INIT = new Object();
     private static final Supplier<GlobalCommand> INVALIDATED = () -> null;
 
     private Supplier<GlobalCommand> lazy;
+    private Object original = INIT;
     private GlobalCommand global;
 
     public InMemorySafeCommand(TxnId txnId, GlobalCommand global)
@@ -54,10 +59,26 @@ public class InMemorySafeCommand extends SafeCommand 
implements SafeState<Comman
         return global.value();
     }
 
+    public boolean isModified()
+    {
+        return original != INIT && !Objects.equals(original, global.value());
+    }
+
+    @Nullable
+    public Command original()
+    {
+        touch();
+        if (!isModified())
+            return global.value();
+        return (Command) original;
+    }
+
     @Override
     protected void set(Command update)
     {
         touch();
+        if (original == INIT)
+            original = global.value();
         global.value(update);
     }
 
@@ -83,6 +104,7 @@ public class InMemorySafeCommand extends SafeCommand 
implements SafeState<Comman
     public void invalidate()
     {
         lazy = INVALIDATED;
+        original = INIT;
     }
 
     @Override
@@ -107,4 +129,10 @@ public class InMemorySafeCommand extends SafeCommand 
implements SafeState<Comman
         touch();
         return global;
     }
+
+    @Nullable
+    GlobalCommand unsafeGlobal()
+    {
+        return global;
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index d268812f..e478eb21 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -261,7 +261,10 @@ class Bootstrap
             else
             {
                 // TODO (expected): first check to see if we are still relevant
-                node.agent().onFailedBootstrap("SafeToRead", state.ranges, () 
-> started(state, null), failure);
+                CommandStore store = CommandStore.current();
+                node.agent().onFailedBootstrap("SafeToRead", state.ranges, () 
-> {
+                    store.maybeExecuteImmediately(() -> started(state, null));
+                }, failure);
             }
         }
 
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index f67c9663..6b9d71af 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -370,10 +370,10 @@ public abstract class Command implements CommonAttributes
                     case DefinitionErased:
                     case DefinitionUnknown:
                     case NoOp:
-                        Invariants.checkState(partialTxn == null);
+                        Invariants.checkState(partialTxn == null, "partialTxn 
is defined");
                         break;
                     case DefinitionKnown:
-                        Invariants.checkState(partialTxn != null);
+                        Invariants.checkState(partialTxn != null, "partialTxn 
is null");
                         break;
                 }
             }
@@ -418,8 +418,8 @@ public abstract class Command implements CommonAttributes
                 {
                     default: throw new AssertionError("Unhandled Outcome: " + 
known.outcome);
                     case Apply:
-                        Invariants.checkState(writes != null);
-                        Invariants.checkState(result != null);
+                        Invariants.checkState(writes != null, "Writes is 
null");
+                        Invariants.checkState(result != null, "Result is 
null");
                         break;
                     case Invalidated:
                         
Invariants.checkState(validate.durability().isMaybeInvalidated());
@@ -427,8 +427,8 @@ public abstract class Command implements CommonAttributes
                         Invariants.checkState(validate.durability() != Local);
                     case Erased:
                     case WasApply:
-                        Invariants.checkState(writes == null);
-                        Invariants.checkState(result == null);
+                        Invariants.checkState(writes == null, "Writes exist");
+                        Invariants.checkState(result == null, "Results exist");
                         break;
                 }
             }
@@ -840,15 +840,18 @@ public abstract class Command implements CommonAttributes
 
         public static Truncated truncatedApply(CommonAttributes common, 
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result)
         {
-            // TODO (now) !!! uncomment and fix
-//            
Invariants.checkArgument(!common.txnId().kind().awaitsOnlyDeps());
+            Invariants.checkArgument(!common.txnId().kind().awaitsOnlyDeps());
             Durability durability = checkTruncatedApplyInvariants(common, 
saveStatus, executeAt);
             return validate(new Truncated(common.txnId(), saveStatus, 
durability, common.route(), executeAt, EMPTY, writes, result));
         }
 
-        public static Truncated truncatedApply(CommonAttributes common, 
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result, 
Timestamp dependencyExecutesAt)
+        public static Truncated truncatedApply(CommonAttributes common, 
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result, 
@Nullable Timestamp dependencyExecutesAt)
         {
-            Invariants.checkArgument(common.txnId().kind().awaitsOnlyDeps());
+            if (!common.txnId().kind().awaitsOnlyDeps())
+            {
+                Invariants.checkState(dependencyExecutesAt == null);
+                return truncatedApply(common, saveStatus, executeAt, writes, 
result);
+            }
             Durability durability = checkTruncatedApplyInvariants(common, 
saveStatus, executeAt);
             return validate(new TruncatedAwaitsOnlyDeps(common.txnId(), 
saveStatus, durability, common.route(), executeAt, EMPTY, writes, result, 
dependencyExecutesAt));
         }
@@ -926,6 +929,10 @@ public abstract class Command implements CommonAttributes
 
     public static class TruncatedAwaitsOnlyDeps extends Truncated
     {
+        /**
+         * TODO (desired): Ideally we would not store this differently than we 
do for earlier states (where we encode in WaitingOn), but we also
+         *  don't want to waste the space and complexity budget in earlier 
phases. Consider how to improve.
+         */
         @Nullable final Timestamp executesAtLeast;
 
         public TruncatedAwaitsOnlyDeps(CommonAttributes commonAttributes, 
SaveStatus saveStatus, @Nullable Timestamp executeAt, @Nullable Writes writes, 
@Nullable Result result, @Nullable Timestamp executesAtLeast)
@@ -963,7 +970,7 @@ public abstract class Command implements CommonAttributes
 
     public static class PreAccepted extends AbstractCommand
     {
-        private final Timestamp executeAt;
+        private final @Nullable Timestamp executeAt;
         private final PartialTxn partialTxn;
         private final @Nullable PartialDeps partialDeps;
 
@@ -993,7 +1000,7 @@ public abstract class Command implements CommonAttributes
             if (o == null || getClass() != o.getClass()) return false;
             if (!super.equals(o)) return false;
             PreAccepted that = (PreAccepted) o;
-            return executeAt.equals(that.executeAt)
+            return Objects.equals(executeAt, that.executeAt)
                     && Objects.equals(partialTxn, that.partialTxn)
                     && Objects.equals(partialDeps, that.partialDeps);
         }
@@ -1692,6 +1699,7 @@ public abstract class Command implements CommonAttributes
     static Command.Accepted acceptInvalidated(Command command, Ballot ballot)
     {
         SaveStatus saveStatus = SaveStatus.get(Status.AcceptedInvalidate, 
command.known());
+        // TODO (desired): This should be NonNull, but AcceptedInvalidated is 
represented by Command.Accepted because there’s no acceptedOrCommitted register 
in NotDefined
         return validate(new Command.Accepted(command, saveStatus, ballot, 
command.executeAt(), command.partialTxn(), null, ballot));
     }
 
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index c79a2871..8e2f18eb 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -219,6 +219,12 @@ public abstract class CommandStore implements AgentExecutor
 
     public abstract boolean inStore();
 
+    public void maybeExecuteImmediately(Runnable task)
+    {
+        if (inStore()) task.run();
+        else           execute(task);
+    }
+
     public abstract AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer);
 
     public abstract <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply);
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 9e628b92..cba170c9 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -662,12 +662,13 @@ public class CommandsForKey implements CommandsSummary
             {
                 if (newStatus.compareTo(cur.status) <= 0)
                 {
+                    // TODO (required): this validation is not safe for replay 
where we may have to "catch up" commands that are behind CFK
                     // we can redundantly update the same transaction via 
notifyWaitingOnCommit since updates to CFK may be asynchronous
                     // (particularly for invalidations). So we should expect 
that we might already represent the latest information for this transaction.
                     // TODO (desired): consider only accepting this for 
Invalidation
                     // TODO (desired): also clean-up special casing for 
AcceptedInvalidate, which exists because it currently has no effect on the CFK 
state
                     //    so it could be any of Transitively Known, Historic, 
PreAccept or Accept
-                    Invariants.checkState(cur.status == newStatus || 
next.status() == Status.AcceptedInvalidate);
+                    Invariants.checkState(cur.status == newStatus || 
next.status() == Status.AcceptedInvalidate, "Attempted update to CommandsForKey 
with %s, implying stale status; found %s", next, cur);
                     if (!newStatus.hasInfo || 
next.acceptedOrCommitted().equals(prev.acceptedOrCommitted()))
                         return this;
                 }
@@ -1220,7 +1221,7 @@ public class CommandsForKey implements CommandsSummary
             Key key = this.key;
             Keys keys = Keys.of(key);
             safeStore = safeStore; // make unsafe for compiler to permit in 
lambda
-            safeStore.commandStore().execute(PreLoadContext.contextFor(txnId, 
keys), safeStore0 -> {
+            safeStore.commandStore().execute(PreLoadContext.contextFor(txnId, 
keys, KeyHistory.COMMANDS), safeStore0 -> {
                 SafeCommand safeCommand0 = safeStore0.get(txnId);
                 safeCommand0.initialise();
                 Command command = safeCommand0.current();
diff --git a/accord-core/src/main/java/accord/local/CommonAttributes.java 
b/accord-core/src/main/java/accord/local/CommonAttributes.java
index 9c180a9c..46c38588 100644
--- a/accord-core/src/main/java/accord/local/CommonAttributes.java
+++ b/accord-core/src/main/java/accord/local/CommonAttributes.java
@@ -136,6 +136,12 @@ public interface CommonAttributes
             return this;
         }
 
+        public Mutable removePartialTxn()
+        {
+            this.partialTxn = null;
+            return this;
+        }
+
         @Override
         public PartialDeps partialDeps()
         {
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 8fdf127f..5c8e2834 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -246,7 +246,7 @@ public abstract class SafeCommandStore
             keys = updated.asCommitted().waitingOn.keys;
             // TODO (required): consider how execution works for transactions 
that await future deps and where the command store inherits additional keys in 
execution epoch
             Ranges ranges = ranges().allAt(updated.executeAt());
-            PreLoadContext context = PreLoadContext.contextFor(txnId, keys);
+            PreLoadContext context = PreLoadContext.contextFor(txnId, keys, 
COMMANDS);
             // TODO (expected): execute immediately for any keys we already 
have loaded, and save only those we haven't for later
             if (canExecuteWith(context))
             {
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java 
b/accord-core/src/main/java/accord/local/SerializerSupport.java
index 7ca59e00..5384a7e3 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -19,6 +19,8 @@ package accord.local;
 
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.ImmutableSet;
 
 import accord.api.Result;
@@ -28,6 +30,7 @@ import accord.local.CommandStores.RangesForEpoch;
 import accord.local.CommonAttributes.Mutable;
 import accord.messages.Accept;
 import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
 import accord.messages.BeginRecovery;
 import accord.messages.Commit;
 import accord.messages.MessageType;
@@ -38,18 +41,21 @@ import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
 
 import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
 import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
 import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
 import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
 import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
 import static accord.messages.MessageType.PRE_ACCEPT_REQ;
 import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
+import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
 import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
 import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
 import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
 import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
@@ -60,15 +66,40 @@ import static accord.utils.Invariants.illegalState;
 @VisibleForImplementation
 public class SerializerSupport
 {
+    private static final Set<MessageType> PRE_ACCEPT_TYPES =
+    ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG);
+
+    private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
+    ImmutableSet.<MessageType>builder()
+                .addAll(PRE_ACCEPT_TYPES)
+                .add(COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ)
+                .build();
+
+    private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES =
+    ImmutableSet.<MessageType>builder()
+                .addAll(PRE_ACCEPT_COMMIT_TYPES)
+                .add(STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, 
STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG)
+                .build();
+
+    private static final Set<MessageType> APPLY_TYPES =
+    ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG, 
APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+
+    private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
+    ImmutableSet.<MessageType>builder()
+                .addAll(PRE_ACCEPT_STABLE_TYPES)
+                .addAll(APPLY_TYPES)
+                .build();
+
     /**
      * Reconstructs Command from register values and protocol messages.
      */
-    public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable 
attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot 
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
+    public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable 
attrs, SaveStatus status, Timestamp executeAt, @Nullable Timestamp 
executesAtLeast, Ballot promised, Ballot accepted, WaitingOnProvider 
waitingOnProvider, MessageProvider messageProvider)
     {
         switch (status.status)
         {
             case NotDefined:
-                return Command.NotDefined.notDefined(attrs, promised);
+                return status == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+                                                          : 
Command.NotDefined.notDefined(attrs, promised);
             case PreAccepted:
                 return preAccepted(rangesForEpoch, attrs, executeAt, promised, 
messageProvider);
             case AcceptedInvalidate:
@@ -83,15 +114,12 @@ public class SerializerSupport
                 return executed(rangesForEpoch, attrs, status, executeAt, 
promised, accepted, waitingOnProvider, messageProvider);
             case Truncated:
             case Invalidated:
-                return truncated(attrs, status, executeAt, messageProvider);
+                return truncated(attrs, status, executeAt, executesAtLeast, 
messageProvider);
             default:
                 throw new IllegalStateException();
         }
     }
 
-    private static final Set<MessageType> PRE_ACCEPT_TYPES =
-        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG);
-
     private static Command.PreAccepted preAccepted(RangesForEpoch 
rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, 
MessageProvider messageProvider)
     {
         Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
@@ -118,24 +146,12 @@ public class SerializerSupport
         return Command.Accepted.accepted(attrs, status, executeAt, promised, 
accepted);
     }
 
-    private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
-        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG, COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ);
-
-    private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES =
-        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG,
-                        COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ, 
STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, STABLE_MAXIMAL_REQ, 
PROPAGATE_STABLE_MSG);
-
     private static Command.Committed committed(RangesForEpoch rangesForEpoch, 
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot 
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
     {
         attrs = extract(rangesForEpoch, status, accepted, messageProvider, 
(attrs0, txn, deps, i1, i2) -> attrs0.partialTxn(txn).partialDeps(deps), attrs);
         return Command.Committed.committed(attrs, status, executeAt, promised, 
accepted, waitingOnProvider.provide(attrs.partialDeps()));
     }
 
-    private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
-        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG,
-                        COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ, 
STABLE_MAXIMAL_REQ, STABLE_FAST_PATH_REQ, PROPAGATE_STABLE_MSG,
-                        APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, 
PROPAGATE_APPLY_MSG);
-
     private static Command.Executed executed(RangesForEpoch rangesForEpoch, 
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot 
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
     {
         return extract(rangesForEpoch, status, accepted, messageProvider, 
(attrs0, txn, deps, writes, result) -> {
@@ -146,10 +162,7 @@ public class SerializerSupport
         }, attrs);
     }
 
-    private static final Set<MessageType> APPLY_TYPES =
-            ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, 
PROPAGATE_APPLY_MSG);
-
-    private static Command.Truncated truncated(Mutable attrs, SaveStatus 
status, Timestamp executeAt, MessageProvider messageProvider)
+    private static Command.Truncated truncated(Mutable attrs, SaveStatus 
status, Timestamp executeAt, @Nullable Timestamp executesAtLeast, 
MessageProvider messageProvider)
     {
         Writes writes = null;
         Result result = null;
@@ -162,15 +175,15 @@ public class SerializerSupport
             case TruncatedApplyWithDeps:
                 Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES);
                 checkState(!witnessed.isEmpty());
-                if (witnessed.contains(APPLY_MINIMAL_REQ))
+                if (witnessed.contains(APPLY_MAXIMAL_REQ))
                 {
-                    Apply apply = messageProvider.applyMinimal();
+                    Apply apply = messageProvider.applyMaximal();
                     writes = apply.writes;
                     result = apply.result;
                 }
-                if (witnessed.contains(APPLY_MAXIMAL_REQ))
+                else if (witnessed.contains(APPLY_MINIMAL_REQ))
                 {
-                    Apply apply = messageProvider.applyMaximal();
+                    Apply apply = messageProvider.applyMinimal();
                     writes = apply.writes;
                     result = apply.result;
                 }
@@ -180,8 +193,18 @@ public class SerializerSupport
                     writes = propagate.writes;
                     result = propagate.result;
                 }
+                else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
+                {
+                    ApplyThenWaitUntilApplied apply = 
messageProvider.applyThenWaitUntilApplied();
+                    writes = apply.writes;
+                    result = apply.result;
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Unhandled types: 
" + witnessed);
+                }
             case TruncatedApply:
-                return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result);
+                return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, executesAtLeast);
             case ErasedOrInvalidated:
                 return Command.Truncated.erasedOrInvalidated(attrs.txnId(), 
attrs.durability(), attrs.route());
             case Erased:
@@ -231,6 +254,7 @@ public class SerializerSupport
             case AcceptedInvalidate:
             case Accepted:
             case PreCommitted:
+            {
                 PartialTxn txn = null;
                 PartialDeps deps = null;
 
@@ -247,7 +271,7 @@ public class SerializerSupport
                     deps = slicePartialDeps(rangesForEpoch, accept);
                 }
                 return withContents.apply(param, txn, deps, null, null);
-
+            }
             case Committed:
             {
                 witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
@@ -285,14 +309,25 @@ public class SerializerSupport
                 else
                 {
                     checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), 
"Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
-                    if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
+                    if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+                    {
+                        commit = messageProvider.commitMaximal();
+                    }
+                    else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
                     {
                         commit = messageProvider.commitSlowPath();
                     }
+                    else if (witnessed.contains(PRE_ACCEPT_REQ) || 
witnessed.contains(BEGIN_RECOVER_REQ) || 
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+                    {
+                        Commit slowPath = messageProvider.stableSlowPath();
+                        Ranges ranges = 
rangesForEpoch.allBetween(slowPath.txnId.epoch(), slowPath.executeAt.epoch());
+                        PartialTxn txn = 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, 
messageProvider).slice(ranges, true);
+                        PartialDeps deps = slowPath.partialDeps.slice(ranges);
+                        return withContents.apply(param, txn, deps, null, 
null);
+                    }
                     else
                     {
-                        checkState(witnessed.contains(COMMIT_MAXIMAL_REQ), 
"Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
-                        commit = messageProvider.commitMaximal();
+                        throw illegalState("Unable to find 
COMMIT_SLOW_PATH_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
                     }
                 }
 
@@ -309,14 +344,14 @@ public class SerializerSupport
                     Ranges ranges = 
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
                     return withContents.apply(param, apply.txn.slice(ranges, 
true), apply.deps.slice(ranges), apply.writes, apply.result);
                 }
-                else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+                else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
                 {
-                    Propagate propagate = messageProvider.propagateApply();
-                    return sliceAndApply(rangesForEpoch, propagate, 
withContents, param, propagate.writes, propagate.result);
+                    ApplyThenWaitUntilApplied apply = 
messageProvider.applyThenWaitUntilApplied();
+                    Ranges ranges = 
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
+                    return withContents.apply(param, apply.txn.slice(ranges, 
true), apply.deps.slice(ranges), apply.writes, apply.result);
                 }
-                else
+                else if (witnessed.contains(APPLY_MINIMAL_REQ))
                 {
-                    checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable 
to find APPLY_MINIMAL_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
                     Apply apply = messageProvider.applyMinimal();
                     Commit commit;
                     if (witnessed.contains(STABLE_MAXIMAL_REQ))
@@ -326,7 +361,18 @@ public class SerializerSupport
                     else if (witnessed.contains(PROPAGATE_STABLE_MSG))
                     {
                         Propagate propagate = 
messageProvider.propagateStable();
-                        return withContents.apply(param, propagate.partialTxn, 
propagate.stableDeps, apply.writes, apply.result);
+                        var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+                        return withContents.apply(param, 
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), 
apply.writes, apply.result);
+                    }
+                    else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+                    {
+                        Propagate propagate = messageProvider.propagateApply();
+                        var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+                        return withContents.apply(param, 
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), 
apply.writes, apply.result);
+                    }
+                    else if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+                    {
+                        commit = messageProvider.commitMaximal();
                     }
                     else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
                     {
@@ -336,6 +382,12 @@ public class SerializerSupport
                     {
                         commit = messageProvider.stableFastPath();
                     }
+                    else if (witnessed.contains(PRE_ACCEPT_REQ) || 
witnessed.contains(BEGIN_RECOVER_REQ) || 
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+                    {
+                        PartialTxn txn = 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
+                        Ranges ranges = 
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
+                        return withContents.apply(param, txn.slice(ranges, 
true), apply.deps.slice(ranges), apply.writes, apply.result);
+                    }
                     else
                     {
                         throw illegalState("Invalid state: insufficient stable 
or commit messages found to reconstruct PreApplied or greater SaveStatus; 
witnessed " + witnessed);
@@ -343,6 +395,36 @@ public class SerializerSupport
 
                     return sliceAndApply(rangesForEpoch, messageProvider, 
witnessed, commit, withContents, param, apply.writes, apply.result);
                 }
+                else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+                {
+                    Propagate propagate = messageProvider.propagateApply();
+                    Invariants.nonNull(propagate.partialTxn, "Unable to find 
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
+                    Invariants.nonNull(propagate.stableDeps, "Unable to find 
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
+                    return sliceAndApply(rangesForEpoch, propagate, 
withContents, param, propagate.writes, propagate.result);
+                }
+                else if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+                {
+                    // once propgate runs locally it merges the local state 
with the remote state, which may make this go from PRE_ACCEPT to PRE_APPLIED!
+                    Propagate propagate = messageProvider.propagatePreAccept();
+                    Invariants.nonNull(propagate.partialTxn, "Unable to find 
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
+                    Invariants.nonNull(propagate.stableDeps, "Unable to find 
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
+
+                    var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+                    return withContents.apply(param, 
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), 
propagate.writes, propagate.result);
+                }
+                else if (witnessed.contains(PROPAGATE_OTHER_MSG))
+                {
+                    // the txn/deps may have been erased, won't always be 
here...
+                    Propagate propagate = messageProvider.propagateOther();
+                    var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+                    PartialTxn txn = propagate.partialTxn == null ? null : 
propagate.partialTxn.slice(ranges, true);
+                    PartialDeps deps = propagate.stableDeps == null ? null : 
propagate.stableDeps.slice(ranges);
+                    return withContents.apply(param, txn, deps, 
propagate.writes, propagate.result);
+                }
+                else
+                {
+                    throw illegalState("Unable to find messages that lead to 
PreApplied state; txn_id=%s, witnessed %s", messageProvider.txnId(), new 
LoggedMessageProvider(messageProvider));
+                }
             }
 
             case NotDefined:
@@ -406,6 +488,8 @@ public class SerializerSupport
                 PartialTxn preAcceptedPartialTxn = 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
                 if (partialTxn == null || partialTxn.keys().size() == 0) 
partialTxn = preAcceptedPartialTxn;
                 else partialTxn = merge(preAcceptedPartialTxn, partialTxn);
+                if (partialTxn == null && 
witnessed.contains(COMMIT_MAXIMAL_REQ))
+                    partialTxn = messageProvider.commitMaximal().partialTxn;
             case StableWithTxnAndDeps:
             case CommitWithTxn:
         }
@@ -421,6 +505,7 @@ public class SerializerSupport
     // TODO (required): randomised testing that we always restore the exact 
same state
     public interface MessageProvider
     {
+        TxnId txnId();
         Set<MessageType> test(Set<MessageType> messages);
         Set<MessageType> all();
 
@@ -437,6 +522,7 @@ public class SerializerSupport
         Commit commitMaximal();
 
         Commit stableFastPath();
+        Commit stableSlowPath();
 
         Commit stableMaximal();
 
@@ -447,6 +533,10 @@ public class SerializerSupport
         Apply applyMaximal();
 
         Propagate propagateApply();
+
+        Propagate propagateOther();
+
+        ApplyThenWaitUntilApplied applyThenWaitUntilApplied();
     }
 
     private static class LoggedMessageProvider
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index 49572c55..351c636e 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -513,6 +513,7 @@ public class Propagate implements EpochSupplier, 
LocalRequest<Status.Known>
                 if (toEpoch >= committedExecuteAt.epoch())
                     return MessageType.PROPAGATE_APPLY_MSG;
             case Committed:
+            case Stable:
                 return MessageType.PROPAGATE_STABLE_MSG;
             case PreCommitted:
                 if (!achieved.definition.isKnown())
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java 
b/accord-core/src/main/java/accord/primitives/Routables.java
index 9e7e53e7..b37bcc91 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -24,6 +24,8 @@ import accord.utils.*;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static accord.utils.SortedArrays.Search.FLOOR;
 
@@ -46,6 +48,10 @@ public interface Routables<K extends Routable> extends 
Iterable<K>
     int size();
 
     boolean isEmpty();
+    default Stream<K> stream()
+    {
+        return StreamSupport.stream(spliterator(), false);
+    }
     boolean intersects(AbstractRanges ranges);
     boolean intersects(AbstractKeys<?> keys);
     default boolean intersects(Routables<?> routables)
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java 
b/accord-core/src/main/java/accord/utils/Invariants.java
index d1061465..842ed5d3 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -50,6 +50,11 @@ public class Invariants
          throw createIllegalState(msg);
     }
 
+    public static IllegalStateException illegalState(String fmt, Object... 
args)
+    {
+        return illegalState(format(fmt, args));
+    }
+
     public static IllegalStateException illegalState()
     {
         throw illegalState(null);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 47240516..447b80b0 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
@@ -50,12 +51,15 @@ import accord.impl.MessageListener;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.Utils;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import accord.verify.CompositeVerifier;
 import accord.verify.ElleVerifier;
 import accord.verify.StrictSerializabilityVerifier;
 import accord.verify.Verifier;
+
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -540,10 +544,50 @@ public class BurnTest
     }
 
     @Test
-    @Timeout(value = 3, unit = TimeUnit.MINUTES)
     public void testOne()
     {
-        run(1L, 1000);
+        run(System.nanoTime());
+    }
+
+    private static void run(long seed)
+    {
+        Duration timeout = Duration.ofMinutes(3);
+        Runnable fn = () -> run(seed, 1000);
+        AsyncResult.Settable<?> promise = AsyncResults.settable();
+        Thread t = new Thread(() -> {
+            try
+            {
+                fn.run();
+                promise.setSuccess(null);
+            }
+            catch (Throwable e)
+            {
+                promise.setFailure(e);
+            }
+        });
+        t.setName("BurnTest with timeout");
+        t.setDaemon(true);
+        try
+        {
+            t.start();
+            AsyncChains.getBlocking(promise, timeout.toNanos(), 
TimeUnit.NANOSECONDS);
+        }
+        catch (Throwable thrown)
+        {
+            Throwable cause = thrown;
+            if (cause instanceof ExecutionException)
+                cause = cause.getCause();
+            if (cause instanceof InterruptedException || cause instanceof 
TimeoutException)
+                t.interrupt();
+            if (cause instanceof TimeoutException)
+            {
+                TimeoutException override = new TimeoutException("test did not 
complete within " + timeout);
+                override.setStackTrace(new StackTraceElement[0]);
+                cause = override;
+            }
+            logger.error("Exception running burn test for seed {}:", seed, t);
+            throw SimulationException.wrap(seed, cause);
+        }
     }
 
     private static void run(long seed, int operations)
diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java 
b/accord-core/src/test/java/accord/impl/MessageListener.java
index 71bbc0ec..6be9f8de 100644
--- a/accord-core/src/test/java/accord/impl/MessageListener.java
+++ b/accord-core/src/test/java/accord/impl/MessageListener.java
@@ -28,6 +28,8 @@ import accord.messages.SimpleReply;
 import accord.messages.TxnRequest;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.topology.Topology;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
@@ -49,6 +51,7 @@ public interface MessageListener
     void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id, 
Message message);
 
     void onClientAction(ClientAction action, Node.Id from, TxnId id, Object 
message);
+    void onTopologyChange(Topology topology);
 
     static MessageListener get()
     {
@@ -71,6 +74,12 @@ public interface MessageListener
         {
 
         }
+
+        @Override
+        public void onTopologyChange(Topology topology)
+        {
+
+        }
     }
 
     class DebugListener implements MessageListener
@@ -117,6 +126,16 @@ public interface MessageListener
             }
         }
 
+        private Topology previous = null;
+
+        @Override
+        public void onTopologyChange(Topology topology)
+        {
+            if (previous != null)
+                logger.debug("Topology Change {} -> {}", previous.epoch(), 
topology.epoch());
+            previous = topology;
+        }
+
         private static Object normalizeClientMessage(Object o)
         {
             if (o instanceof Throwable)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index b949c5ab..da5851f8 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.BarrierType;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.burn.BurnTestConfigurationService;
@@ -48,7 +49,12 @@ import accord.burn.TopologyUpdates;
 import accord.burn.random.FrequentLargeRange;
 import accord.config.LocalConfig;
 import accord.config.MutableLocalConfig;
+import accord.coordinate.Barrier;
 import accord.coordinate.CoordinationAdapter;
+import accord.coordinate.Exhausted;
+import accord.coordinate.Invalidated;
+import accord.coordinate.Preempted;
+import accord.coordinate.Timeout;
 import accord.impl.CoordinateDurabilityScheduling;
 import accord.impl.MessageListener;
 import accord.impl.PrefixedIntHashKey;
@@ -61,16 +67,20 @@ import accord.local.Node.Id;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
-import accord.messages.LocalRequest;
 import accord.messages.Message;
 import accord.messages.MessageType;
 import accord.messages.Reply;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
+import accord.primitives.Keys;
+import accord.primitives.Range;
 import accord.primitives.Ranges;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.topology.Topology;
 import accord.topology.TopologyRandomizer;
+import accord.utils.Gens;
+import accord.utils.Invariants;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
@@ -79,6 +89,9 @@ import static 
accord.impl.basic.Cluster.OverrideLinksKind.NONE;
 import static accord.impl.basic.Cluster.OverrideLinksKind.RANDOM_BIDIRECTIONAL;
 import static accord.impl.basic.NodeSink.Action.DELIVER;
 import static accord.impl.basic.NodeSink.Action.DROP;
+import static accord.utils.AccordGens.keysInsideRanges;
+import static accord.utils.AccordGens.rangeInsideRange;
+import static accord.utils.Gens.mixedDistribution;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -127,6 +140,7 @@ public class Cluster implements Scheduler
     final RandomSource random;
     final LinkConfig linkConfig;
     final Function<Id, Node> lookup;
+    final Function<Id, Journal> journalLookup;
     final PendingQueue pending;
     final Runnable checkFailures;
     final List<Runnable> onDone = new ArrayList<>();
@@ -137,13 +151,14 @@ public class Cluster implements Scheduler
     int recurring;
     BiFunction<Id, Id, Link> links;
 
-    public Cluster(RandomSource random, MessageListener messageListener, 
Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, 
Node> lookup, IntSupplier rf, Consumer<Packet> responseSink)
+    public Cluster(RandomSource random, MessageListener messageListener, 
Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, 
Node> lookup, Function<Id, Journal> journalLookup, IntSupplier rf, 
Consumer<Packet> responseSink)
     {
         this.random = random;
         this.messageListener = messageListener;
         this.pending = queueSupplier.get();
         this.checkFailures = checkFailures;
         this.lookup = lookup;
+        this.journalLookup = journalLookup;
         this.responseSink = responseSink;
         this.linkConfig = defaultLinkConfig(random, rf);
         this.links = linkConfig.defaultLinks;
@@ -216,7 +231,7 @@ public class Cluster implements Scheduler
                     else callback.success(deliver.src, reply);
                 }
             }
-            else on.receive((Request) deliver.message, deliver.src, deliver);
+            else journalLookup.apply(deliver.dst).handle((Request) 
deliver.message, deliver.src, deliver);
         }
         else
         {
@@ -269,10 +284,11 @@ public class Cluster implements Scheduler
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> nodeMap = new LinkedHashMap<>();
         Map<Id, AgentExecutor> executorMap = new LinkedHashMap<>();
+        Map<Id, Journal> journalMap = new LinkedHashMap<>();
         try
         {
             RandomSource random = randomSupplier.get();
-            Cluster sinks = new Cluster(randomSupplier.get(), messageListener, 
queueSupplier, checkFailures, nodeMap::get, () -> topologyFactory.rf, 
responseSink);
+            Cluster sinks = new Cluster(randomSupplier.get(), messageListener, 
queueSupplier, checkFailures, nodeMap::get, journalMap::get, () -> 
topologyFactory.rf, responseSink);
             TopologyUpdates topologyUpdates = new 
TopologyUpdates(executorMap::get);
             TopologyRandomizer.Listener schemaApply = t -> {
                 for (Node node : nodeMap.values())
@@ -280,9 +296,11 @@ public class Cluster implements Scheduler
                     ListStore store = (ListStore) 
node.commandStores().dataStore();
                     store.onTopologyUpdate(node, t);
                 }
+                messageListener.onTopologyChange(t);
             };
             TopologyRandomizer configRandomizer = new 
TopologyRandomizer(randomSupplier, topology, topologyUpdates, nodeMap::get, 
schemaApply);
             List<CoordinateDurabilityScheduling> durabilityScheduling = new 
ArrayList<>();
+            List<Service> services = new ArrayList<>();
             for (Id id : nodes)
             {
                 MessageSink messageSink = sinks.create(id, 
randomSupplier.get());
@@ -291,13 +309,15 @@ public class Cluster implements Scheduler
                 BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) 
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
                 AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, 
onStale);
                 executorMap.put(id, nodeExecutor);
+                Journal journal = new Journal(messageListener);
+                journalMap.put(id, journal);
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, 
nodeMap::get, topologyUpdates);
-                BooleanSupplier isLoadedCheck = 
random.biasedUniformBools(0.5f);
-                Node node = new Node(id, messageSink, LocalRequest::process, 
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, 
nowSupplier),
+                BooleanSupplier isLoadedCheck = 
Gens.supplier(Gens.bools().mixedDistribution().next(random), random);
+                Node node = new Node(id, messageSink, journal, configService, 
nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
                                      () -> new ListStore(id), new 
ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()),
                                      nodeExecutor.agent(),
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
-                                     SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending, isLoadedCheck), new 
CoordinationAdapter.DefaultFactory(),
+                                     SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending, isLoadedCheck, journal), new 
CoordinationAdapter.DefaultFactory(),
                                      localConfig);
                 CoordinateDurabilityScheduling durability = new 
CoordinateDurabilityScheduling(node);
                 // TODO (desired): randomise
@@ -306,6 +326,7 @@ public class Cluster implements Scheduler
                 durabilityScheduling.add(durability);
                 nodeMap.put(id, node);
                 durabilityScheduling.add(new 
CoordinateDurabilityScheduling(node));
+                services.add(new BarrierService(node, randomSupplier.get()));
             }
 
             Runnable updateDurabilityRate;
@@ -328,6 +349,7 @@ public class Cluster implements Scheduler
             schemaApply.onUpdate(topology);
 
             // startup
+            journalMap.entrySet().forEach(e -> 
e.getValue().start(nodeMap.get(e.getKey())));
             AsyncResult<?> startup = 
AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()),
 (a, b) -> null).beginAsResult();
             while (sinks.processPending());
             Assertions.assertTrue(startup.isDone());
@@ -341,10 +363,12 @@ public class Cluster implements Scheduler
 
             Scheduled reconfigure = 
sinks.recurring(configRandomizer::maybeUpdateTopology, 1, SECONDS);
             
durabilityScheduling.forEach(CoordinateDurabilityScheduling::start);
+            services.forEach(Service::start);
 
             noMoreWorkSignal.accept(() -> {
                 reconfigure.cancel();
                 
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
+                services.forEach(Service::close);
             });
             readySignal.accept(nodeMap);
 
@@ -357,6 +381,7 @@ public class Cluster implements Scheduler
             chaos.cancel();
             reconfigure.cancel();
             durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
+            services.forEach(Service::close);
             sinks.links = sinks.linkConfig.defaultLinks;
 
             // give progress log et al a chance to finish
@@ -379,10 +404,117 @@ public class Cluster implements Scheduler
         }
         finally
         {
+            journalMap.values().forEach(Journal::shutdown);
             nodeMap.values().forEach(Node::shutdown);
         }
     }
 
+    private interface Service extends AutoCloseable
+    {
+        void start();
+        @Override
+        void close();
+    }
+
+    private static abstract class AbstractService implements Service, Runnable
+    {
+        protected final Node node;
+        protected final RandomSource rs;
+        private Scheduled scheduled;
+
+        protected AbstractService(Node node, RandomSource rs)
+        {
+            this.node = node;
+            this.rs = rs;
+        }
+
+        @Override
+        public void start()
+        {
+            Invariants.checkState(scheduled == null, "Start already 
called...");
+            this.scheduled = node.scheduler().recurring(this, 1, SECONDS);
+        }
+
+        protected abstract void doRun() throws Exception;
+
+        @Override
+        public final void run()
+        {
+            try
+            {
+                doRun();
+            }
+            catch (Throwable t)
+            {
+                node.agent().onUncaughtException(t);
+            }
+        }
+
+        @Override
+        public void close()
+        {
+            if (scheduled != null)
+            {
+                scheduled.cancel();
+                scheduled = null;
+            }
+        }
+    }
+
+    private static class BarrierService extends AbstractService
+    {
+        private final Supplier<BarrierType> typeSupplier;
+        private final Supplier<Boolean> includeRangeSupplier;
+        private final Supplier<Boolean> wholeOrPartialSupplier;
+
+        private BarrierService(Node node, RandomSource rs)
+        {
+            super(node, rs);
+            this.typeSupplier = 
mixedDistribution(BarrierType.values()).next(rs).asSupplier(rs);
+            this.includeRangeSupplier = 
Gens.bools().mixedDistribution().next(rs).asSupplier(rs);
+            this.wholeOrPartialSupplier = 
Gens.bools().mixedDistribution().next(rs).asSupplier(rs);
+        }
+
+        @Override
+        public void doRun()
+        {
+            Topology current = node.topology().current();
+            Ranges ranges = current.rangesForNode(node.id());
+            if (ranges.isEmpty())
+                return;
+            BarrierType type = typeSupplier.get();
+            if (type == BarrierType.local)
+            {
+                run(node, Keys.of(keysInsideRanges(ranges).next(rs)), 
current.epoch(), type);
+            }
+            else
+            {
+                List<Range> subset = new ArrayList<>();
+                for (Range range : ranges)
+                {
+                    if (includeRangeSupplier.get())
+                        subset.add(wholeOrPartialSupplier.get() ? range : 
rangeInsideRange(range).next(rs));
+                }
+                if (subset.isEmpty())
+                    return;
+                run(node, Ranges.of(subset.toArray(Range[]::new)), 
current.epoch(), type);
+            }
+        }
+
+        private <S extends Seekables<?, ?>> void run(Node node, S 
keysOrRanges, long epoch, BarrierType type)
+        {
+            Barrier.barrier(node, keysOrRanges, epoch, type).begin((s, f) -> {
+                if (f != null)
+                {
+                    // ignore specific errors
+                    if (f instanceof Invalidated || f instanceof Timeout || f 
instanceof Preempted || f instanceof Exhausted)
+                        return;
+                    node.agent().onUncaughtException(f);
+                }
+            });
+        }
+    }
+
     private static BiFunction<Id, Id, Link> partition(List<Id> nodes, 
RandomSource random, int rf, BiFunction<Id, Id, Link> up)
     {
         Collections.shuffle(nodes, random.asJdkRandom());
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index b33edcc0..ea97c5e0 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -19,6 +19,8 @@
 package accord.impl.basic;
 
 import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.function.BooleanSupplier;
@@ -31,16 +33,26 @@ import accord.api.DataStore;
 import accord.api.ProgressLog;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
+import accord.impl.InMemorySafeCommand;
+import accord.impl.InMemorySafeCommandsForKey;
+import accord.impl.InMemorySafeTimestampsForKey;
 import accord.impl.PrefixedIntHashKey;
 import accord.impl.basic.TaskExecutorService.Task;
+import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
+import accord.local.CommonAttributes;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
+import accord.local.SerializerSupport;
 import accord.local.ShardDistributor;
+import accord.messages.Message;
 import accord.primitives.Range;
+import accord.primitives.RoutableKey;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.RandomSource;
@@ -49,15 +61,15 @@ import accord.utils.async.AsyncChains;
 
 public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
 {
-    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService 
executorService, BooleanSupplier isLoadedCheck)
+    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService 
executorService, BooleanSupplier isLoadedCheck, Journal journal)
     {
-        super(time, agent, store, random, shardDistributor, 
progressLogFactory, DelayedCommandStore.factory(executorService, 
isLoadedCheck));
+        super(time, agent, store, random, shardDistributor, 
progressLogFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, 
journal));
     }
 
-    public static CommandStores.Factory factory(PendingQueue pending, 
BooleanSupplier isLoadedCheck)
+    public static CommandStores.Factory factory(PendingQueue pending, 
BooleanSupplier isLoadedCheck, Journal journal)
     {
         return (time, agent, store, random, shardDistributor, 
progressLogFactory) ->
-               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck);
+               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal);
     }
 
     @Override
@@ -101,12 +113,56 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         private final SimulatedDelayedExecutorService executor;
         private final Queue<Task<?>> pending = new LinkedList<>();
         private final BooleanSupplier isLoadedCheck;
+        private final Journal journal;
 
-        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder 
epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier 
isLoadedCheck)
+        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder 
epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier 
isLoadedCheck, Journal journal)
         {
             super(id, time, agent, store, progressLogFactory, 
epochUpdateHolder);
             this.executor = executor;
             this.isLoadedCheck = isLoadedCheck;
+            this.journal = journal;
+        }
+
+        @Override
+        protected void validateRead(Command current)
+        {
+            // "loading" the command doesn't make sense as we don't "store" 
the command...
+            if (current.txnId().kind() == Txn.Kind.EphemeralRead)
+                return;
+            //TODO (correctness): these type of txn must be durable but 
currently they are not... should make sure this is plugged into the C* journal 
properly for reply
+            if (current.txnId().kind() == Txn.Kind.LocalOnly)
+                return;
+            Command.WaitingOn waitingOn = null;
+            if (current.isStable() && !current.isTruncated())
+                waitingOn = current.asCommitted().waitingOn;
+            SerializerSupport.MessageProvider messages = 
journal.makeMessageProvider(current.txnId());
+            Command.WaitingOn finalWaitingOn = waitingOn;
+            CommonAttributes.Mutable mutable = current.mutable();
+            mutable.partialDeps(null).removePartialTxn();
+            Command reconstructed;
+            try
+            {
+                reconstructed = 
SerializerSupport.reconstruct(unsafeRangesForEpoch(), mutable, 
current.saveStatus(), current.executeAt(), 
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, 
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, 
messages);
+            }
+            catch (IllegalStateException t)
+            {
+                //TODO (correctness): journal doesn’t guarantee we pick the 
same records we used to state transition
+                // Journal stores a list of messages it saw in some order it 
defines, but when reconstructing a command we don't actually know what messages 
were used, this could
+                // lead to a case where deps mismatch, so ignoring this for now
+                if (t.getMessage() != null && t.getMessage().startsWith("Deps 
do not match; expected"))
+                    return;
+                throw t;
+            }
+            //TODO (correctness): journal doesn’t guarantee we pick the same 
records we used to state transition
+            if (current.partialDeps() != null && 
!current.partialDeps().rangeDeps.equals(reconstructed.partialDeps().rangeDeps))
+                return;
+            // for some reasons scope doesn't alaways match, this might be due 
to journal... what sucks is that this can also be a bug in the extract, so its
+            // hard to figure out what happened.
+            if (current.partialDeps() != null && 
!current.partialDeps().equals(reconstructed.partialDeps()))
+                return;
+            if (current.isCommitted() && !current.isTruncated() && 
!Objects.equals(current.asCommitted().waitingOn(), 
reconstructed.asCommitted().waitingOn()))
+                return;
+//            Invariants.checkState(current.equals(reconstructed), "Commands 
did not match: expected %s, given %s", current, reconstructed);
         }
 
         @Override
@@ -115,9 +171,9 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             return isLoadedCheck.getAsBoolean();
         }
 
-        private static CommandStore.Factory 
factory(SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck)
+        private static CommandStore.Factory 
factory(SimulatedDelayedExecutorService executor, BooleanSupplier 
isLoadedCheck, Journal journal)
         {
-            return (id, time, agent, store, progressLogFactory, 
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, 
progressLogFactory, rangesForEpoch, executor, isLoadedCheck);
+            return (id, time, agent, store, progressLogFactory, 
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, 
progressLogFactory, rangesForEpoch, executor, isLoadedCheck, journal);
         }
 
         @Override
@@ -190,5 +246,42 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         {
 
         }
+
+        @Override
+        protected InMemorySafeStore createSafeStore(PreLoadContext context, 
RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, 
Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey, 
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
+        {
+            return new DelayedSafeStore(this, ranges, context, commands, 
timestampsForKey, commandsForKeys);
+        }
+    }
+
+    public static class DelayedSafeStore extends 
InMemoryCommandStore.InMemorySafeStore
+    {
+        private final DelayedCommandStore commandStore;
+        public DelayedSafeStore(DelayedCommandStore commandStore, 
RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand> 
commands, Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey, 
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey)
+        {
+            super(commandStore, ranges, context, commands, timestampsForKey, 
commandsForKey);
+            this.commandStore = commandStore;
+        }
+
+        @Override
+        public void postExecute()
+        {
+            if (context instanceof Message)
+            {
+                Message m = (Message) context;
+                if (m.type() != null && !m.type().hasSideEffects())
+                {
+                    // double check there are no modifications
+                    commands.entrySet().forEach(e -> {
+                        InMemorySafeCommand safe = e.getValue();
+                        if (!safe.isModified()) return;
+                        commandStore.validateRead(safe.current());
+                        Command original = safe.original();
+                        if (original != null)
+                            commandStore.validateRead(original);
+                    });
+                }
+            }
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/Journal.java
new file mode 100644
index 00000000..7c0217b5
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import accord.impl.MessageListener;
+import accord.local.Node;
+import accord.local.SerializerSupport;
+import accord.messages.AbstractEpochRequest;
+import accord.messages.Accept;
+import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
+import accord.messages.BeginRecovery;
+import accord.messages.Commit;
+import accord.messages.LocalRequest;
+import accord.messages.Message;
+import accord.messages.MessageType;
+import accord.messages.PreAccept;
+import accord.messages.Propagate;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.primitives.Ballot;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.LongArrayList;
+
+import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
+import static accord.messages.MessageType.ACCEPT_REQ;
+import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
+import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
+import static accord.messages.MessageType.BEGIN_INVALIDATE_REQ;
+import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
+import static accord.messages.MessageType.COMMIT_INVALIDATE_REQ;
+import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
+import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
+import static accord.messages.MessageType.INFORM_DURABLE_REQ;
+import static accord.messages.MessageType.INFORM_OF_TXN_REQ;
+import static accord.messages.MessageType.PRE_ACCEPT_REQ;
+import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
+import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
+import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
+import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ;
+import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
+import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
+import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
+import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
+
+public class Journal implements LocalRequest.Handler, Runnable
+{
+    private static final TxnIdProvider EPOCH = msg -> 
((AbstractEpochRequest<?>) msg).txnId;
+    private static final TxnIdProvider TXN   = msg -> ((TxnRequest<?>) 
msg).txnId;
+    private static final TxnIdProvider LOCAL = msg -> ((LocalRequest<?>) 
msg).primaryTxnId();
+    private static final TxnIdProvider INVL  = msg -> ((Commit.Invalidate) 
msg).primaryTxnId();
+    private static final Map<MessageType, TxnIdProvider> typeToProvider = 
ImmutableMap.<MessageType, TxnIdProvider>builder()
+                                                                               
       .put(PRE_ACCEPT_REQ, TXN)
+                                                                               
       .put(ACCEPT_REQ, TXN)
+                                                                               
       .put(ACCEPT_INVALIDATE_REQ, EPOCH)
+                                                                               
       .put(COMMIT_SLOW_PATH_REQ, TXN)
+                                                                               
       .put(COMMIT_MAXIMAL_REQ, TXN)
+                                                                               
       .put(STABLE_FAST_PATH_REQ, TXN)
+                                                                               
       .put(STABLE_SLOW_PATH_REQ, TXN)
+                                                                               
       .put(STABLE_MAXIMAL_REQ, TXN)
+                                                                               
       .put(COMMIT_INVALIDATE_REQ, INVL)
+                                                                               
       .put(APPLY_MINIMAL_REQ, TXN)
+                                                                               
       .put(APPLY_MAXIMAL_REQ, TXN)
+                                                                               
       .put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, EPOCH)
+                                                                               
       .put(BEGIN_RECOVER_REQ, TXN)
+                                                                               
       .put(BEGIN_INVALIDATE_REQ, EPOCH)
+                                                                               
       .put(INFORM_OF_TXN_REQ, EPOCH)
+                                                                               
       .put(INFORM_DURABLE_REQ, TXN)
+                                                                               
       .put(SET_SHARD_DURABLE_REQ, EPOCH)
+                                                                               
       .put(SET_GLOBALLY_DURABLE_REQ, EPOCH)
+                                                                               
       .put(PROPAGATE_PRE_ACCEPT_MSG, LOCAL)
+                                                                               
       .put(PROPAGATE_STABLE_MSG, LOCAL)
+                                                                               
       .put(PROPAGATE_APPLY_MSG, LOCAL)
+                                                                               
       .put(PROPAGATE_OTHER_MSG, LOCAL)
+                                                                               
       .build();
+
+    private final Queue<RequestContext> unframedRequests = new ArrayDeque<>();
+    private final LongArrayList waitForEpochs = new LongArrayList();
+    private final Long2ObjectHashMap<ArrayList<RequestContext>> 
delayedRequests = new Long2ObjectHashMap<>();
+    private final Map<TxnId, Map<MessageType, Message>> writes = new 
HashMap<>();
+    private final MessageListener messageListener;
+    private Node node;
+
+    public Journal(MessageListener messageListener)
+    {
+        this.messageListener = messageListener;
+    }
+
+    public void start(Node node)
+    {
+        this.node = node;
+        node.scheduler().recurring(this, 1, TimeUnit.MILLISECONDS);
+    }
+
+    public void shutdown()
+    {
+        this.node = null;
+    }
+    
+    @Override
+    public void handle(LocalRequest<?> message, Node node)
+    {
+        messageListener.onMessage(NodeSink.Action.DELIVER, node.id(), 
node.id(), -1, message);
+        if (message.type().hasSideEffects())
+        {
+            // enqueue
+            unframedRequests.add(new RequestContext(message, () -> 
node.scheduler().now(() -> message.process(node))));
+            return;
+        }
+        message.process(node);
+    }
+
+    public void handle(Request request, Node.Id from, ReplyContext 
replyContext)
+    {
+        if (request.type() != null && request.type().hasSideEffects())
+        {
+            // enqueue
+            unframedRequests.add(new RequestContext(request, () -> 
node.receive(request, from, replyContext)));
+            return;
+        }
+        node.receive(request, from, replyContext);
+    }
+
+    private void save(Message request)
+    {
+        MessageType type = request.type();
+        TxnIdProvider provider = typeToProvider.get(type);
+        Invariants.nonNull(provider, "Unknown type %s: %s", type, request);
+        TxnId txnId = provider.txnId(request);
+        writes.computeIfAbsent(txnId, ignore -> new Testing()).put(type, 
request);
+    }
+
+    public SerializerSupport.MessageProvider makeMessageProvider(TxnId txnId)
+    {
+        return new MessageProvider(txnId, writes.getOrDefault(txnId, 
Map.of()));
+    }
+
+    private static class Testing extends LinkedHashMap<MessageType, Message>
+    {
+        public Map<MessageType, List<Message>> history()
+        {
+            LinkedHashMap<MessageType, List<Message>> history = new 
LinkedHashMap<>();
+            for (MessageType k : keySet())
+            {
+                Object current = super.get(k);
+                history.put(k, current instanceof List ? (List<Message>) 
current : Collections.singletonList((Message) current));
+            }
+            return history;
+        }
+
+        @Override
+        public Message get(Object key)
+        {
+            Object current = super.get(key);
+            if (current == null || current instanceof Message)
+                return (Message) current;
+            List<Message> messages = (List<Message>) current;
+            return messages.get(messages.size() - 1);
+        }
+
+        @Override
+        public Message put(MessageType key, Message value)
+        {
+            Object current = super.get(key);
+            if (current == null)
+                return super.put(key, value);
+            else if (current instanceof List)
+            {
+                List<Message> list = (List<Message>) current;
+                list.add(value);
+                return list.get(list.size() - 2);
+            }
+            else
+            {
+                List<Message> messages = new ArrayList<>();
+                messages.add((Message) current);
+                messages.add(value);
+                super.put(key, value);
+                return (Message) current;
+            }
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        if (this.node == null)
+            return;
+        try
+        {
+            doRun();
+        }
+        catch (Throwable t)
+        {
+            node.agent().onUncaughtException(t);
+        }
+    }
+
+    private void doRun()
+    {
+        ArrayList<RequestContext> requests = null;
+        // check to see if any pending epochs are in
+        waitForEpochs.sort(null);
+        for (int i = 0; i < waitForEpochs.size(); i++)
+        {
+            long waitForEpoch = waitForEpochs.getLong(i);
+            if (!node.topology().hasEpoch(waitForEpoch))
+                break;
+            List<RequestContext> delayed = 
delayedRequests.remove(waitForEpoch);
+            if (null == requests) requests = new ArrayList<>(delayed.size());
+            requests.addAll(delayed);
+        }
+        waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+        
+        // for anything queued, put into the pending epochs or schedule
+        RequestContext request;
+        while (null != (request = unframedRequests.poll()))
+        {
+            long waitForEpoch = request.waitForEpoch;
+            if (waitForEpoch != 0 && !node.topology().hasEpoch(waitForEpoch))
+            {
+                delayedRequests.computeIfAbsent(waitForEpoch, ignore -> new 
ArrayList<>()).add(request);
+                if (!waitForEpochs.containsLong(waitForEpoch))
+                    waitForEpochs.addLong(waitForEpoch);
+            }
+            else
+            {
+                if (null == requests) requests = new ArrayList<>();
+                requests.add(request);
+            }
+        }
+        
+        // schedule
+        if (requests != null)
+        {
+            requests.forEach(r -> save(r.message)); // save in batches to 
simulate journal more...
+            requests.forEach(Runnable::run);
+        }
+    }
+
+    @FunctionalInterface
+    interface TxnIdProvider
+    {
+        TxnId txnId(Message message);
+    }
+
+    private static class RequestContext implements Runnable
+    {
+        final long waitForEpoch;
+        final Message message;
+        final Runnable fn;
+
+        protected RequestContext(Request request, Runnable fn)
+        {
+            this.waitForEpoch = request.waitForEpoch();
+            this.message = request;
+            this.fn = fn;
+        }
+
+        @Override
+        public void run()
+        {
+            fn.run();
+        }
+    }
+
+    public static class MessageProvider implements 
SerializerSupport.MessageProvider
+    {
+        public final TxnId txnId;
+        private final Map<MessageType, Message> writes;
+
+        public MessageProvider(TxnId txnId, Map<MessageType, Message> writes)
+        {
+            this.txnId = txnId;
+            this.writes = writes;
+        }
+
+        @Override
+        public TxnId txnId()
+        {
+            return txnId;
+        }
+
+        @Override
+        public Set<MessageType> test(Set<MessageType> messages)
+        {
+            return Sets.intersection(writes.keySet(), messages);
+        }
+
+        @Override
+        public Set<MessageType> all()
+        {
+            return writes.keySet();
+        }
+
+        public Map<MessageType, Message> allMessages()
+        {
+            var all = all();
+            Map<MessageType, Message> map = 
Maps.newHashMapWithExpectedSize(all.size());
+            for (MessageType messageType : all)
+                map.put(messageType, get(messageType));
+            return map;
+        }
+
+        public  <T extends Message> T get(MessageType type)
+        {
+            return (T) writes.get(type);
+        }
+
+        @Override
+        public PreAccept preAccept()
+        {
+            return get(PRE_ACCEPT_REQ);
+        }
+
+        @Override
+        public BeginRecovery beginRecover()
+        {
+            return get(BEGIN_RECOVER_REQ);
+        }
+
+        @Override
+        public Propagate propagatePreAccept()
+        {
+            return get(PROPAGATE_PRE_ACCEPT_MSG);
+        }
+
+        @Override
+        public Accept accept(Ballot ballot)
+        {
+            return get(ACCEPT_REQ);
+        }
+
+        @Override
+        public Commit commitSlowPath()
+        {
+            return get(COMMIT_SLOW_PATH_REQ);
+        }
+
+        @Override
+        public Commit commitMaximal()
+        {
+            return get(COMMIT_MAXIMAL_REQ);
+        }
+
+        @Override
+        public Commit stableFastPath()
+        {
+            return get(STABLE_FAST_PATH_REQ);
+        }
+
+        @Override
+        public Commit stableSlowPath()
+        {
+            return get(STABLE_SLOW_PATH_REQ);
+        }
+
+        @Override
+        public Commit stableMaximal()
+        {
+            return get(STABLE_MAXIMAL_REQ);
+        }
+
+        @Override
+        public Propagate propagateStable()
+        {
+            return get(PROPAGATE_STABLE_MSG);
+        }
+
+        @Override
+        public Apply applyMinimal()
+        {
+            return get(APPLY_MINIMAL_REQ);
+        }
+
+        @Override
+        public Apply applyMaximal()
+        {
+            return get(APPLY_MAXIMAL_REQ);
+        }
+
+        @Override
+        public Propagate propagateApply()
+        {
+            return get(PROPAGATE_APPLY_MSG);
+        }
+
+        @Override
+        public Propagate propagateOther()
+        {
+            return get(PROPAGATE_OTHER_MSG);
+        }
+
+        @Override
+        public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+        {
+            return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+        }
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java 
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index d425bd34..d6ce5f22 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -19,6 +19,7 @@
 package accord.impl.list;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 
 import accord.local.SafeCommandStore;
@@ -107,6 +108,23 @@ public class ListRead implements Read
         return new ListRead(executor, isEphemeralRead, ((Seekables) 
userReadKeys).with(((ListRead)other).userReadKeys), 
((Seekables)keys).with(((ListRead)other).keys));
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ListRead listRead = (ListRead) o;
+        return isEphemeralRead == listRead.isEphemeralRead
+               && Objects.equals(userReadKeys, listRead.userReadKeys)
+               && Objects.equals(keys, listRead.keys);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public String toString()
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java 
b/accord-core/src/test/java/accord/impl/list/ListResult.java
index 5e8cb634..c9da1719 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -19,6 +19,7 @@
 package accord.impl.list;
 
 import java.util.Arrays;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -113,6 +114,40 @@ public class ListResult implements Result, Reply
         return status;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ListResult that = (ListResult) o;
+        return requestId == that.requestId
+               && Objects.equals(client, that.client)
+               && Objects.equals(txnId, that.txnId)
+               && Objects.equals(readKeys, that.readKeys)
+               && Objects.equals(responseKeys, that.responseKeys)
+               && equals(read, that.read)
+               && Objects.equals(update, that.update)
+               && status == that.status;
+    }
+
+    private static boolean equals(int[][] a, int[][] b)
+    {
+        if (a == b) return true;
+        if (a == null || b == null) return false;
+        if (a.length != b.length) return false;
+        for (int i = 0; i < a.length; i++)
+        {
+            if (!Arrays.equals(a[i], b[i])) return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public String toString()
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index d669fd8d..2e31cbec 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -47,6 +47,7 @@ import accord.messages.WaitUntilApplied;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -257,6 +258,16 @@ public class ListStore implements DataStore
         return String.format("(%s -> %s)", sp.syncId, sp.keysOrRanges);
     }
 
+    public String historySeekable(Seekable o)
+    {
+        switch (o.domain())
+        {
+            case Key: return history(o.asKey());
+            case Range: return history(Ranges.single(o.asRange()));
+            default: throw new IllegalArgumentException("Unknown domain: " + 
o.domain() + ", input=" + o);
+        }
+    }
+
     private String history(Ranges ranges)
     {
         return history("range", ranges, other -> other.intersects(ranges));
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java 
b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 1b9d0fd4..1ff9e7ee 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -23,6 +23,8 @@ import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Sets;
+
 import accord.impl.*;
 import accord.primitives.*;
 import org.slf4j.Logger;
@@ -64,6 +66,31 @@ public class ListWrite extends TreeMap<Key, int[]> 
implements Write
         });
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (o == this) return true;
+        if (!(o instanceof ListWrite)) return false;
+        ListWrite other = (ListWrite) o;
+        // Can not rely on Map.equals as our value is an array: (new int[] 
{2}).equals(new int[] {2}) == false!
+        if (!Sets.difference(keySet(), other.keySet()).isEmpty()
+            || !Sets.difference(other.keySet(), keySet()).isEmpty())
+            return false;
+        // keys match
+        for (Key k : keySet())
+        {
+            if (!Arrays.equals(get(k), other.get(k)))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public String toString()
     {
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java 
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 971c70e7..3247ebb1 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -213,6 +213,17 @@ public class AccordGens
         };
     }
 
+    public static Gen<Key> keysInsideRanges(Ranges ranges)
+    {
+        Invariants.checkArgument(!ranges.isEmpty(), "Ranges empty");
+        RoutingKey sample = ranges.get(0).end();
+        if (sample instanceof PrefixedIntHashKey)
+            return prefixedIntHashKeyInsideRanges(ranges);
+        if (sample instanceof IntKey.Routing)
+            return intKeysInsideRanges(ranges);
+        throw new IllegalArgumentException("Unsupported key type " + 
sample.getClass() + "; supported = PrefixedIntHashKey, IntKey");
+    }
+
     public static Gen<KeyDeps> keyDeps(Gen<? extends Key> keyGen)
     {
         return keyDeps(keyGen, txnIds());
@@ -374,6 +385,39 @@ public class AccordGens
         return ranges(sizeGen, keyGen, (ignore, a, b) -> factory.apply(a, b));
     }
 
+    public static Gen<Range> rangeInsideRange(Range range)
+    {
+        if (range.end() instanceof PrefixedIntHashKey)
+            return prefixedIntHashKeyRangeInsideRange(range);
+        throw new IllegalArgumentException("Unsupported type: " + 
range.start().getClass());
+    }
+
+    public static Gen<Range> prefixedIntHashKeyRangeInsideRange(Range range)
+    {
+        if (!(range.end() instanceof PrefixedIntHashKey))
+            throw new IllegalArgumentException("Only PrefixedIntHashKey 
supported; saw " + range.end().getClass());
+        PrefixedIntHashKey start = (PrefixedIntHashKey) range.start();
+        PrefixedIntHashKey end = (PrefixedIntHashKey) range.end();
+        if (start.hash + 1 == end.hash)
+        {
+            // range is of size 1, so can not split into a smaller range...
+            return ignore -> range;
+        }
+        return rs -> {
+            int a = rs.nextInt(start.hash, end.hash);
+            int b = rs.nextInt(start.hash, end.hash);
+            while (a == b)
+                b = rs.nextInt(start.hash, end.hash);
+            if (a > b)
+            {
+                int tmp = a;
+                a = b;
+                b = tmp;
+            }
+            return PrefixedIntHashKey.range(start.prefix, a, b);
+        };
+    }
+
     public static Gen<Ranges> prefixedIntHashKeyRanges(int numNodes, int rf)
     {
         return rs -> {
diff --git a/accord-core/src/test/java/accord/utils/Gens.java 
b/accord-core/src/test/java/accord/utils/Gens.java
index 3723fc60..fe44a81b 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -55,13 +56,23 @@ public class Gens {
         return ignore -> constant.get();
     }
 
-    public static <T> Gen<T> oneOf(Gen<T>... gens)
+    public static <T> Gen<T> oneOf(Gen<? extends T>... gens)
     {
+        switch (gens.length)
+        {
+            case 0: throw new IllegalArgumentException("Unable to select oneOf 
an empty list");
+            case 1: return (Gen<T>) gens[0];
+        }
         return oneOf(Arrays.asList(gens));
     }
 
-    public static <T> Gen<T> oneOf(List<Gen<T>> gens)
+    public static <T> Gen<T> oneOf(List<Gen<? extends T>> gens)
     {
+        switch (gens.size())
+        {
+            case 0: throw new IllegalArgumentException("Unable to select oneOf 
an empty list");
+            case 1: return (Gen<T>) gens.get(0);
+        }
         return rs -> rs.pick(gens).next(rs);
     }
 
@@ -465,6 +476,11 @@ public class Gens {
         return new StringDSL();
     }
 
+    public static BooleanSupplier supplier(Gen<Boolean> gen, RandomSource rs)
+    {
+        return () -> gen.next(rs);
+    }
+
     public static class BooleanDSL
     {
         public Gen<Boolean> all()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to