This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 256b35e2 Need to simulate Cassandra Journal in Accord BurnTest to detect issues earlier before they are seen in Cassandra (#87) 256b35e2 is described below commit 256b35e27d170db9fcd8024d5678b4f6e9d3a956 Author: dcapwell <dcapw...@apache.org> AuthorDate: Wed May 15 09:16:01 2024 -0700 Need to simulate Cassandra Journal in Accord BurnTest to detect issues earlier before they are seen in Cassandra (#87) patch by Benedict Elliott Smith, David Capwell; reviewed by Benedict Elliott Smith, David Capwell for CASSANDRA-19618 --- .../coordinate/AbstractCoordinatePreAccept.java | 8 +- .../accord/coordinate/CoordinatePreAccept.java | 2 +- .../accord/coordinate/CoordinationAdapter.java | 5 + .../java/accord/impl/InMemoryCommandStore.java | 26 +- .../main/java/accord/impl/InMemorySafeCommand.java | 28 ++ .../src/main/java/accord/local/Bootstrap.java | 5 +- .../src/main/java/accord/local/Command.java | 32 +- .../src/main/java/accord/local/CommandStore.java | 6 + .../src/main/java/accord/local/CommandsForKey.java | 5 +- .../main/java/accord/local/CommonAttributes.java | 6 + .../main/java/accord/local/SafeCommandStore.java | 2 +- .../main/java/accord/local/SerializerSupport.java | 166 ++++++-- .../src/main/java/accord/messages/Propagate.java | 1 + .../src/main/java/accord/primitives/Routables.java | 6 + .../src/main/java/accord/utils/Invariants.java | 5 + .../src/test/java/accord/burn/BurnTest.java | 50 ++- .../src/test/java/accord/impl/MessageListener.java | 19 + .../src/test/java/accord/impl/basic/Cluster.java | 146 ++++++- .../accord/impl/basic/DelayedCommandStores.java | 107 ++++- .../src/test/java/accord/impl/basic/Journal.java | 436 +++++++++++++++++++++ .../src/test/java/accord/impl/list/ListRead.java | 18 + .../src/test/java/accord/impl/list/ListResult.java | 35 ++ .../src/test/java/accord/impl/list/ListStore.java | 11 + .../src/test/java/accord/impl/list/ListWrite.java | 27 ++ .../src/test/java/accord/utils/AccordGens.java | 44 +++ accord-core/src/test/java/accord/utils/Gens.java | 20 +- 26 files changed, 1138 insertions(+), 78 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java index 39083dbd..b5c5b49d 100644 --- a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java +++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java @@ -96,6 +96,7 @@ abstract class AbstractCoordinatePreAccept<T, R> extends SettableResult<T> imple } final Node node; + @Nullable final TxnId txnId; final FullRoute<?> route; @@ -219,7 +220,7 @@ abstract class AbstractCoordinatePreAccept<T, R> extends SettableResult<T> imple onNewEpochTopologyMismatch(mismatch); return; } - topologies = node.topology().withUnsyncedEpochs(route, txnId.epoch(), latestEpoch); + topologies = node.topology().withUnsyncedEpochs(route, earliestEpoch(), latestEpoch); boolean equivalent = topologies.oldestEpoch() <= prevTopologies.currentEpoch(); for (long epoch = topologies.currentEpoch() ; equivalent && epoch > prevTopologies.currentEpoch() ; --epoch) equivalent = topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards()); @@ -236,6 +237,11 @@ abstract class AbstractCoordinatePreAccept<T, R> extends SettableResult<T> imple }); } + protected long earliestEpoch() + { + return txnId == null ? executeAtEpoch() : txnId.epoch(); + } + @Override public final void accept(T success, Throwable failure) { diff --git a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java index 5f25e41a..1c19fc60 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java @@ -152,7 +152,7 @@ abstract class CoordinatePreAccept<T> extends AbstractCoordinatePreAccept<T, Pre void onPreAccepted(Topologies topologies) { Timestamp executeAt = foldl(oks, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE); - onPreAccepted(topologies, executeAt, oks); + node.withEpoch(executeAt.epoch(), () -> onPreAccepted(topologies, executeAt, oks)); } abstract void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk> oks); diff --git a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java index 0b08dd2d..dc8de331 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java @@ -91,6 +91,11 @@ public interface CoordinationAdapter<R> public static <R> void stabilise(CoordinationAdapter<R> adapter, Node node, Topologies any, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback) { + if (!node.topology().hasEpoch(executeAt.epoch())) + { + node.withEpoch(executeAt.epoch(), () -> stabilise(adapter, node, any, route, ballot, txnId, txn, executeAt, deps, callback)); + return; + } Topologies coordinates = any.forEpochs(txnId.epoch(), txnId.epoch()); Topologies all; if (txnId.epoch() == executeAt.epoch()) all = coordinates; diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index e94bac11..5ee3f8d5 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -388,6 +388,8 @@ public abstract class InMemoryCommandStore extends CommandStore timestampsForKey.put(key, timestampsForKey((Key) key).createSafeReference()); } + protected void validateRead(Command current) {} + protected final InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges) { Map<TxnId, InMemorySafeCommand> commands = new HashMap<>(); @@ -395,6 +397,14 @@ public abstract class InMemoryCommandStore extends CommandStore Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey = new HashMap<>(); context.forEachId(txnId -> commands.put(txnId, lazyReference(txnId))); + for (InMemorySafeCommand safe : commands.values()) + { + GlobalCommand global = safe.unsafeGlobal(); + if (global == null) continue; + Command current = global.value(); + if (current == null) continue; + validateRead(current); + } for (Seekable seekable : context.keys()) { @@ -402,8 +412,18 @@ public abstract class InMemoryCommandStore extends CommandStore { case Key: RoutableKey key = (RoutableKey) seekable; - commandsForKey.put(key, commandsForKey((Key) key).createSafeReference()); - timestampsForKey.put(key, timestampsForKey((Key) key).createSafeReference()); + switch (context.keyHistory()) + { + case NONE: + continue; + case COMMANDS: + commandsForKey.put(key, commandsForKey((Key) key).createSafeReference()); + break; + case TIMESTAMPS: + timestampsForKey.put(key, timestampsForKey((Key) key).createSafeReference()); + break; + default: throw new UnsupportedOperationException("Unknown key history: " + context.keyHistory()); + } break; case Range: // load range cfks here @@ -633,7 +653,7 @@ public abstract class InMemoryCommandStore extends CommandStore public static class InMemorySafeStore extends AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeTimestampsForKey, InMemorySafeCommandsForKey> { private final InMemoryCommandStore commandStore; - private final Map<TxnId, InMemorySafeCommand> commands; + protected final Map<TxnId, InMemorySafeCommand> commands; private final Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey; private final Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey; private final RangesForEpoch ranges; diff --git a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java index 267d5692..946f255a 100644 --- a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java +++ b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java @@ -18,8 +18,11 @@ package accord.impl; +import java.util.Objects; import java.util.function.Supplier; +import javax.annotation.Nullable; + import accord.impl.InMemoryCommandStore.GlobalCommand; import accord.local.Command; import accord.local.Listeners; @@ -30,9 +33,11 @@ import static accord.utils.Invariants.illegalState; public class InMemorySafeCommand extends SafeCommand implements SafeState<Command> { + private static final Object INIT = new Object(); private static final Supplier<GlobalCommand> INVALIDATED = () -> null; private Supplier<GlobalCommand> lazy; + private Object original = INIT; private GlobalCommand global; public InMemorySafeCommand(TxnId txnId, GlobalCommand global) @@ -54,10 +59,26 @@ public class InMemorySafeCommand extends SafeCommand implements SafeState<Comman return global.value(); } + public boolean isModified() + { + return original != INIT && !Objects.equals(original, global.value()); + } + + @Nullable + public Command original() + { + touch(); + if (!isModified()) + return global.value(); + return (Command) original; + } + @Override protected void set(Command update) { touch(); + if (original == INIT) + original = global.value(); global.value(update); } @@ -83,6 +104,7 @@ public class InMemorySafeCommand extends SafeCommand implements SafeState<Comman public void invalidate() { lazy = INVALIDATED; + original = INIT; } @Override @@ -107,4 +129,10 @@ public class InMemorySafeCommand extends SafeCommand implements SafeState<Comman touch(); return global; } + + @Nullable + GlobalCommand unsafeGlobal() + { + return global; + } } diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index d268812f..e478eb21 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -261,7 +261,10 @@ class Bootstrap else { // TODO (expected): first check to see if we are still relevant - node.agent().onFailedBootstrap("SafeToRead", state.ranges, () -> started(state, null), failure); + CommandStore store = CommandStore.current(); + node.agent().onFailedBootstrap("SafeToRead", state.ranges, () -> { + store.maybeExecuteImmediately(() -> started(state, null)); + }, failure); } } diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index f67c9663..6b9d71af 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -370,10 +370,10 @@ public abstract class Command implements CommonAttributes case DefinitionErased: case DefinitionUnknown: case NoOp: - Invariants.checkState(partialTxn == null); + Invariants.checkState(partialTxn == null, "partialTxn is defined"); break; case DefinitionKnown: - Invariants.checkState(partialTxn != null); + Invariants.checkState(partialTxn != null, "partialTxn is null"); break; } } @@ -418,8 +418,8 @@ public abstract class Command implements CommonAttributes { default: throw new AssertionError("Unhandled Outcome: " + known.outcome); case Apply: - Invariants.checkState(writes != null); - Invariants.checkState(result != null); + Invariants.checkState(writes != null, "Writes is null"); + Invariants.checkState(result != null, "Result is null"); break; case Invalidated: Invariants.checkState(validate.durability().isMaybeInvalidated()); @@ -427,8 +427,8 @@ public abstract class Command implements CommonAttributes Invariants.checkState(validate.durability() != Local); case Erased: case WasApply: - Invariants.checkState(writes == null); - Invariants.checkState(result == null); + Invariants.checkState(writes == null, "Writes exist"); + Invariants.checkState(result == null, "Results exist"); break; } } @@ -840,15 +840,18 @@ public abstract class Command implements CommonAttributes public static Truncated truncatedApply(CommonAttributes common, SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result) { - // TODO (now) !!! uncomment and fix -// Invariants.checkArgument(!common.txnId().kind().awaitsOnlyDeps()); + Invariants.checkArgument(!common.txnId().kind().awaitsOnlyDeps()); Durability durability = checkTruncatedApplyInvariants(common, saveStatus, executeAt); return validate(new Truncated(common.txnId(), saveStatus, durability, common.route(), executeAt, EMPTY, writes, result)); } - public static Truncated truncatedApply(CommonAttributes common, SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result, Timestamp dependencyExecutesAt) + public static Truncated truncatedApply(CommonAttributes common, SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result, @Nullable Timestamp dependencyExecutesAt) { - Invariants.checkArgument(common.txnId().kind().awaitsOnlyDeps()); + if (!common.txnId().kind().awaitsOnlyDeps()) + { + Invariants.checkState(dependencyExecutesAt == null); + return truncatedApply(common, saveStatus, executeAt, writes, result); + } Durability durability = checkTruncatedApplyInvariants(common, saveStatus, executeAt); return validate(new TruncatedAwaitsOnlyDeps(common.txnId(), saveStatus, durability, common.route(), executeAt, EMPTY, writes, result, dependencyExecutesAt)); } @@ -926,6 +929,10 @@ public abstract class Command implements CommonAttributes public static class TruncatedAwaitsOnlyDeps extends Truncated { + /** + * TODO (desired): Ideally we would not store this differently than we do for earlier states (where we encode in WaitingOn), but we also + * don't want to waste the space and complexity budget in earlier phases. Consider how to improve. + */ @Nullable final Timestamp executesAtLeast; public TruncatedAwaitsOnlyDeps(CommonAttributes commonAttributes, SaveStatus saveStatus, @Nullable Timestamp executeAt, @Nullable Writes writes, @Nullable Result result, @Nullable Timestamp executesAtLeast) @@ -963,7 +970,7 @@ public abstract class Command implements CommonAttributes public static class PreAccepted extends AbstractCommand { - private final Timestamp executeAt; + private final @Nullable Timestamp executeAt; private final PartialTxn partialTxn; private final @Nullable PartialDeps partialDeps; @@ -993,7 +1000,7 @@ public abstract class Command implements CommonAttributes if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; PreAccepted that = (PreAccepted) o; - return executeAt.equals(that.executeAt) + return Objects.equals(executeAt, that.executeAt) && Objects.equals(partialTxn, that.partialTxn) && Objects.equals(partialDeps, that.partialDeps); } @@ -1692,6 +1699,7 @@ public abstract class Command implements CommonAttributes static Command.Accepted acceptInvalidated(Command command, Ballot ballot) { SaveStatus saveStatus = SaveStatus.get(Status.AcceptedInvalidate, command.known()); + // TODO (desired): This should be NonNull, but AcceptedInvalidated is represented by Command.Accepted because there’s no acceptedOrCommitted register in NotDefined return validate(new Command.Accepted(command, saveStatus, ballot, command.executeAt(), command.partialTxn(), null, ballot)); } diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index c79a2871..8e2f18eb 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -219,6 +219,12 @@ public abstract class CommandStore implements AgentExecutor public abstract boolean inStore(); + public void maybeExecuteImmediately(Runnable task) + { + if (inStore()) task.run(); + else execute(task); + } + public abstract AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer); public abstract <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply); diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java index 9e628b92..cba170c9 100644 --- a/accord-core/src/main/java/accord/local/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/CommandsForKey.java @@ -662,12 +662,13 @@ public class CommandsForKey implements CommandsSummary { if (newStatus.compareTo(cur.status) <= 0) { + // TODO (required): this validation is not safe for replay where we may have to "catch up" commands that are behind CFK // we can redundantly update the same transaction via notifyWaitingOnCommit since updates to CFK may be asynchronous // (particularly for invalidations). So we should expect that we might already represent the latest information for this transaction. // TODO (desired): consider only accepting this for Invalidation // TODO (desired): also clean-up special casing for AcceptedInvalidate, which exists because it currently has no effect on the CFK state // so it could be any of Transitively Known, Historic, PreAccept or Accept - Invariants.checkState(cur.status == newStatus || next.status() == Status.AcceptedInvalidate); + Invariants.checkState(cur.status == newStatus || next.status() == Status.AcceptedInvalidate, "Attempted update to CommandsForKey with %s, implying stale status; found %s", next, cur); if (!newStatus.hasInfo || next.acceptedOrCommitted().equals(prev.acceptedOrCommitted())) return this; } @@ -1220,7 +1221,7 @@ public class CommandsForKey implements CommandsSummary Key key = this.key; Keys keys = Keys.of(key); safeStore = safeStore; // make unsafe for compiler to permit in lambda - safeStore.commandStore().execute(PreLoadContext.contextFor(txnId, keys), safeStore0 -> { + safeStore.commandStore().execute(PreLoadContext.contextFor(txnId, keys, KeyHistory.COMMANDS), safeStore0 -> { SafeCommand safeCommand0 = safeStore0.get(txnId); safeCommand0.initialise(); Command command = safeCommand0.current(); diff --git a/accord-core/src/main/java/accord/local/CommonAttributes.java b/accord-core/src/main/java/accord/local/CommonAttributes.java index 9c180a9c..46c38588 100644 --- a/accord-core/src/main/java/accord/local/CommonAttributes.java +++ b/accord-core/src/main/java/accord/local/CommonAttributes.java @@ -136,6 +136,12 @@ public interface CommonAttributes return this; } + public Mutable removePartialTxn() + { + this.partialTxn = null; + return this; + } + @Override public PartialDeps partialDeps() { diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 8fdf127f..5c8e2834 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -246,7 +246,7 @@ public abstract class SafeCommandStore keys = updated.asCommitted().waitingOn.keys; // TODO (required): consider how execution works for transactions that await future deps and where the command store inherits additional keys in execution epoch Ranges ranges = ranges().allAt(updated.executeAt()); - PreLoadContext context = PreLoadContext.contextFor(txnId, keys); + PreLoadContext context = PreLoadContext.contextFor(txnId, keys, COMMANDS); // TODO (expected): execute immediately for any keys we already have loaded, and save only those we haven't for later if (canExecuteWith(context)) { diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java b/accord-core/src/main/java/accord/local/SerializerSupport.java index 7ca59e00..5384a7e3 100644 --- a/accord-core/src/main/java/accord/local/SerializerSupport.java +++ b/accord-core/src/main/java/accord/local/SerializerSupport.java @@ -19,6 +19,8 @@ package accord.local; import java.util.Set; +import javax.annotation.Nullable; + import com.google.common.collect.ImmutableSet; import accord.api.Result; @@ -28,6 +30,7 @@ import accord.local.CommandStores.RangesForEpoch; import accord.local.CommonAttributes.Mutable; import accord.messages.Accept; import accord.messages.Apply; +import accord.messages.ApplyThenWaitUntilApplied; import accord.messages.BeginRecovery; import accord.messages.Commit; import accord.messages.MessageType; @@ -38,18 +41,21 @@ import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Ranges; import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.Invariants; import static accord.messages.MessageType.APPLY_MAXIMAL_REQ; import static accord.messages.MessageType.APPLY_MINIMAL_REQ; +import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ; import static accord.messages.MessageType.BEGIN_RECOVER_REQ; import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ; import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ; import static accord.messages.MessageType.PRE_ACCEPT_REQ; import static accord.messages.MessageType.PROPAGATE_APPLY_MSG; -import static accord.messages.MessageType.PROPAGATE_STABLE_MSG; +import static accord.messages.MessageType.PROPAGATE_OTHER_MSG; import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG; +import static accord.messages.MessageType.PROPAGATE_STABLE_MSG; import static accord.messages.MessageType.STABLE_FAST_PATH_REQ; import static accord.messages.MessageType.STABLE_MAXIMAL_REQ; import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ; @@ -60,15 +66,40 @@ import static accord.utils.Invariants.illegalState; @VisibleForImplementation public class SerializerSupport { + private static final Set<MessageType> PRE_ACCEPT_TYPES = + ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG); + + private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES = + ImmutableSet.<MessageType>builder() + .addAll(PRE_ACCEPT_TYPES) + .add(COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ) + .build(); + + private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES = + ImmutableSet.<MessageType>builder() + .addAll(PRE_ACCEPT_COMMIT_TYPES) + .add(STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG) + .build(); + + private static final Set<MessageType> APPLY_TYPES = + ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG, APPLY_THEN_WAIT_UNTIL_APPLIED_REQ); + + private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES = + ImmutableSet.<MessageType>builder() + .addAll(PRE_ACCEPT_STABLE_TYPES) + .addAll(APPLY_TYPES) + .build(); + /** * Reconstructs Command from register values and protocol messages. */ - public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) + public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, @Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) { switch (status.status) { case NotDefined: - return Command.NotDefined.notDefined(attrs, promised); + return status == SaveStatus.Uninitialised ? Command.NotDefined.uninitialised(attrs.txnId()) + : Command.NotDefined.notDefined(attrs, promised); case PreAccepted: return preAccepted(rangesForEpoch, attrs, executeAt, promised, messageProvider); case AcceptedInvalidate: @@ -83,15 +114,12 @@ public class SerializerSupport return executed(rangesForEpoch, attrs, status, executeAt, promised, accepted, waitingOnProvider, messageProvider); case Truncated: case Invalidated: - return truncated(attrs, status, executeAt, messageProvider); + return truncated(attrs, status, executeAt, executesAtLeast, messageProvider); default: throw new IllegalStateException(); } } - private static final Set<MessageType> PRE_ACCEPT_TYPES = - ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG); - private static Command.PreAccepted preAccepted(RangesForEpoch rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, MessageProvider messageProvider) { Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES); @@ -118,24 +146,12 @@ public class SerializerSupport return Command.Accepted.accepted(attrs, status, executeAt, promised, accepted); } - private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES = - ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ); - - private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES = - ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, - COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ, STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG); - private static Command.Committed committed(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) { attrs = extract(rangesForEpoch, status, accepted, messageProvider, (attrs0, txn, deps, i1, i2) -> attrs0.partialTxn(txn).partialDeps(deps), attrs); return Command.Committed.committed(attrs, status, executeAt, promised, accepted, waitingOnProvider.provide(attrs.partialDeps())); } - private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES = - ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG, - COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ, STABLE_MAXIMAL_REQ, STABLE_FAST_PATH_REQ, PROPAGATE_STABLE_MSG, - APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); - private static Command.Executed executed(RangesForEpoch rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider) { return extract(rangesForEpoch, status, accepted, messageProvider, (attrs0, txn, deps, writes, result) -> { @@ -146,10 +162,7 @@ public class SerializerSupport }, attrs); } - private static final Set<MessageType> APPLY_TYPES = - ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG); - - private static Command.Truncated truncated(Mutable attrs, SaveStatus status, Timestamp executeAt, MessageProvider messageProvider) + private static Command.Truncated truncated(Mutable attrs, SaveStatus status, Timestamp executeAt, @Nullable Timestamp executesAtLeast, MessageProvider messageProvider) { Writes writes = null; Result result = null; @@ -162,15 +175,15 @@ public class SerializerSupport case TruncatedApplyWithDeps: Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES); checkState(!witnessed.isEmpty()); - if (witnessed.contains(APPLY_MINIMAL_REQ)) + if (witnessed.contains(APPLY_MAXIMAL_REQ)) { - Apply apply = messageProvider.applyMinimal(); + Apply apply = messageProvider.applyMaximal(); writes = apply.writes; result = apply.result; } - if (witnessed.contains(APPLY_MAXIMAL_REQ)) + else if (witnessed.contains(APPLY_MINIMAL_REQ)) { - Apply apply = messageProvider.applyMaximal(); + Apply apply = messageProvider.applyMinimal(); writes = apply.writes; result = apply.result; } @@ -180,8 +193,18 @@ public class SerializerSupport writes = propagate.writes; result = propagate.result; } + else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ)) + { + ApplyThenWaitUntilApplied apply = messageProvider.applyThenWaitUntilApplied(); + writes = apply.writes; + result = apply.result; + } + else + { + throw new UnsupportedOperationException("Unhandled types: " + witnessed); + } case TruncatedApply: - return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result); + return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, executesAtLeast); case ErasedOrInvalidated: return Command.Truncated.erasedOrInvalidated(attrs.txnId(), attrs.durability(), attrs.route()); case Erased: @@ -231,6 +254,7 @@ public class SerializerSupport case AcceptedInvalidate: case Accepted: case PreCommitted: + { PartialTxn txn = null; PartialDeps deps = null; @@ -247,7 +271,7 @@ public class SerializerSupport deps = slicePartialDeps(rangesForEpoch, accept); } return withContents.apply(param, txn, deps, null, null); - + } case Committed: { witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES); @@ -285,14 +309,25 @@ public class SerializerSupport else { checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), "Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); - if (witnessed.contains(COMMIT_SLOW_PATH_REQ)) + if (witnessed.contains(COMMIT_MAXIMAL_REQ)) + { + commit = messageProvider.commitMaximal(); + } + else if (witnessed.contains(COMMIT_SLOW_PATH_REQ)) { commit = messageProvider.commitSlowPath(); } + else if (witnessed.contains(PRE_ACCEPT_REQ) || witnessed.contains(BEGIN_RECOVER_REQ) || witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG)) + { + Commit slowPath = messageProvider.stableSlowPath(); + Ranges ranges = rangesForEpoch.allBetween(slowPath.txnId.epoch(), slowPath.executeAt.epoch()); + PartialTxn txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider).slice(ranges, true); + PartialDeps deps = slowPath.partialDeps.slice(ranges); + return withContents.apply(param, txn, deps, null, null); + } else { - checkState(witnessed.contains(COMMIT_MAXIMAL_REQ), "Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); - commit = messageProvider.commitMaximal(); + throw illegalState("Unable to find COMMIT_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); } } @@ -309,14 +344,14 @@ public class SerializerSupport Ranges ranges = rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch()); return withContents.apply(param, apply.txn.slice(ranges, true), apply.deps.slice(ranges), apply.writes, apply.result); } - else if (witnessed.contains(PROPAGATE_APPLY_MSG)) + else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ)) { - Propagate propagate = messageProvider.propagateApply(); - return sliceAndApply(rangesForEpoch, propagate, withContents, param, propagate.writes, propagate.result); + ApplyThenWaitUntilApplied apply = messageProvider.applyThenWaitUntilApplied(); + Ranges ranges = rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch()); + return withContents.apply(param, apply.txn.slice(ranges, true), apply.deps.slice(ranges), apply.writes, apply.result); } - else + else if (witnessed.contains(APPLY_MINIMAL_REQ)) { - checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable to find APPLY_MINIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider)); Apply apply = messageProvider.applyMinimal(); Commit commit; if (witnessed.contains(STABLE_MAXIMAL_REQ)) @@ -326,7 +361,18 @@ public class SerializerSupport else if (witnessed.contains(PROPAGATE_STABLE_MSG)) { Propagate propagate = messageProvider.propagateStable(); - return withContents.apply(param, propagate.partialTxn, propagate.stableDeps, apply.writes, apply.result); + var ranges = propagate.committedExecuteAt == null ? rangesForEpoch.allAt(propagate.txnId) : rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt); + return withContents.apply(param, propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), apply.writes, apply.result); + } + else if (witnessed.contains(PROPAGATE_APPLY_MSG)) + { + Propagate propagate = messageProvider.propagateApply(); + var ranges = propagate.committedExecuteAt == null ? rangesForEpoch.allAt(propagate.txnId) : rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt); + return withContents.apply(param, propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), apply.writes, apply.result); + } + else if (witnessed.contains(COMMIT_MAXIMAL_REQ)) + { + commit = messageProvider.commitMaximal(); } else if (witnessed.contains(COMMIT_SLOW_PATH_REQ)) { @@ -336,6 +382,12 @@ public class SerializerSupport { commit = messageProvider.stableFastPath(); } + else if (witnessed.contains(PRE_ACCEPT_REQ) || witnessed.contains(BEGIN_RECOVER_REQ) || witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG)) + { + PartialTxn txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider); + Ranges ranges = rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch()); + return withContents.apply(param, txn.slice(ranges, true), apply.deps.slice(ranges), apply.writes, apply.result); + } else { throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus; witnessed " + witnessed); @@ -343,6 +395,36 @@ public class SerializerSupport return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, apply.writes, apply.result); } + else if (witnessed.contains(PROPAGATE_APPLY_MSG)) + { + Propagate propagate = messageProvider.propagateApply(); + Invariants.nonNull(propagate.partialTxn, "Unable to find partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider)); + Invariants.nonNull(propagate.stableDeps, "Unable to find stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider)); + return sliceAndApply(rangesForEpoch, propagate, withContents, param, propagate.writes, propagate.result); + } + else if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG)) + { + // once propgate runs locally it merges the local state with the remote state, which may make this go from PRE_ACCEPT to PRE_APPLIED! + Propagate propagate = messageProvider.propagatePreAccept(); + Invariants.nonNull(propagate.partialTxn, "Unable to find partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider)); + Invariants.nonNull(propagate.stableDeps, "Unable to find stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider)); + + var ranges = propagate.committedExecuteAt == null ? rangesForEpoch.allAt(propagate.txnId) : rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt); + return withContents.apply(param, propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), propagate.writes, propagate.result); + } + else if (witnessed.contains(PROPAGATE_OTHER_MSG)) + { + // the txn/deps may have been erased, won't always be here... + Propagate propagate = messageProvider.propagateOther(); + var ranges = propagate.committedExecuteAt == null ? rangesForEpoch.allAt(propagate.txnId) : rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt); + PartialTxn txn = propagate.partialTxn == null ? null : propagate.partialTxn.slice(ranges, true); + PartialDeps deps = propagate.stableDeps == null ? null : propagate.stableDeps.slice(ranges); + return withContents.apply(param, txn, deps, propagate.writes, propagate.result); + } + else + { + throw illegalState("Unable to find messages that lead to PreApplied state; txn_id=%s, witnessed %s", messageProvider.txnId(), new LoggedMessageProvider(messageProvider)); + } } case NotDefined: @@ -406,6 +488,8 @@ public class SerializerSupport PartialTxn preAcceptedPartialTxn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider); if (partialTxn == null || partialTxn.keys().size() == 0) partialTxn = preAcceptedPartialTxn; else partialTxn = merge(preAcceptedPartialTxn, partialTxn); + if (partialTxn == null && witnessed.contains(COMMIT_MAXIMAL_REQ)) + partialTxn = messageProvider.commitMaximal().partialTxn; case StableWithTxnAndDeps: case CommitWithTxn: } @@ -421,6 +505,7 @@ public class SerializerSupport // TODO (required): randomised testing that we always restore the exact same state public interface MessageProvider { + TxnId txnId(); Set<MessageType> test(Set<MessageType> messages); Set<MessageType> all(); @@ -437,6 +522,7 @@ public class SerializerSupport Commit commitMaximal(); Commit stableFastPath(); + Commit stableSlowPath(); Commit stableMaximal(); @@ -447,6 +533,10 @@ public class SerializerSupport Apply applyMaximal(); Propagate propagateApply(); + + Propagate propagateOther(); + + ApplyThenWaitUntilApplied applyThenWaitUntilApplied(); } private static class LoggedMessageProvider diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index 49572c55..351c636e 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -513,6 +513,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> if (toEpoch >= committedExecuteAt.epoch()) return MessageType.PROPAGATE_APPLY_MSG; case Committed: + case Stable: return MessageType.PROPAGATE_STABLE_MSG; case PreCommitted: if (!achieved.definition.isKnown()) diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java index 9e7e53e7..b37bcc91 100644 --- a/accord-core/src/main/java/accord/primitives/Routables.java +++ b/accord-core/src/main/java/accord/primitives/Routables.java @@ -24,6 +24,8 @@ import accord.utils.*; import net.nicoulaj.compilecommand.annotations.Inline; import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static accord.utils.SortedArrays.Search.FLOOR; @@ -46,6 +48,10 @@ public interface Routables<K extends Routable> extends Iterable<K> int size(); boolean isEmpty(); + default Stream<K> stream() + { + return StreamSupport.stream(spliterator(), false); + } boolean intersects(AbstractRanges ranges); boolean intersects(AbstractKeys<?> keys); default boolean intersects(Routables<?> routables) diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java index d1061465..842ed5d3 100644 --- a/accord-core/src/main/java/accord/utils/Invariants.java +++ b/accord-core/src/main/java/accord/utils/Invariants.java @@ -50,6 +50,11 @@ public class Invariants throw createIllegalState(msg); } + public static IllegalStateException illegalState(String fmt, Object... args) + { + return illegalState(format(fmt, args)); + } + public static IllegalStateException illegalState() { throw illegalState(null); diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index 47240516..447b80b0 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -50,12 +51,15 @@ import accord.impl.MessageListener; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.Utils; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import accord.verify.CompositeVerifier; import accord.verify.ElleVerifier; import accord.verify.StrictSerializabilityVerifier; import accord.verify.Verifier; + import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -540,10 +544,50 @@ public class BurnTest } @Test - @Timeout(value = 3, unit = TimeUnit.MINUTES) public void testOne() { - run(1L, 1000); + run(System.nanoTime()); + } + + private static void run(long seed) + { + Duration timeout = Duration.ofMinutes(3); + Runnable fn = () -> run(seed, 1000); + AsyncResult.Settable<?> promise = AsyncResults.settable(); + Thread t = new Thread(() -> { + try + { + fn.run(); + promise.setSuccess(null); + } + catch (Throwable e) + { + promise.setFailure(e); + } + }); + t.setName("BurnTest with timeout"); + t.setDaemon(true); + try + { + t.start(); + AsyncChains.getBlocking(promise, timeout.toNanos(), TimeUnit.NANOSECONDS); + } + catch (Throwable thrown) + { + Throwable cause = thrown; + if (cause instanceof ExecutionException) + cause = cause.getCause(); + if (cause instanceof InterruptedException || cause instanceof TimeoutException) + t.interrupt(); + if (cause instanceof TimeoutException) + { + TimeoutException override = new TimeoutException("test did not complete within " + timeout); + override.setStackTrace(new StackTraceElement[0]); + cause = override; + } + logger.error("Exception running burn test for seed {}:", seed, t); + throw SimulationException.wrap(seed, cause); + } } private static void run(long seed, int operations) diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java b/accord-core/src/test/java/accord/impl/MessageListener.java index 71bbc0ec..6be9f8de 100644 --- a/accord-core/src/test/java/accord/impl/MessageListener.java +++ b/accord-core/src/test/java/accord/impl/MessageListener.java @@ -28,6 +28,8 @@ import accord.messages.SimpleReply; import accord.messages.TxnRequest; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.topology.Topology; + import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; @@ -49,6 +51,7 @@ public interface MessageListener void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id, Message message); void onClientAction(ClientAction action, Node.Id from, TxnId id, Object message); + void onTopologyChange(Topology topology); static MessageListener get() { @@ -71,6 +74,12 @@ public interface MessageListener { } + + @Override + public void onTopologyChange(Topology topology) + { + + } } class DebugListener implements MessageListener @@ -117,6 +126,16 @@ public interface MessageListener } } + private Topology previous = null; + + @Override + public void onTopologyChange(Topology topology) + { + if (previous != null) + logger.debug("Topology Change {} -> {}", previous.epoch(), topology.epoch()); + previous = topology; + } + private static Object normalizeClientMessage(Object o) { if (o instanceof Throwable) diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index b949c5ab..da5851f8 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -41,6 +41,7 @@ import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.BarrierType; import accord.api.MessageSink; import accord.api.Scheduler; import accord.burn.BurnTestConfigurationService; @@ -48,7 +49,12 @@ import accord.burn.TopologyUpdates; import accord.burn.random.FrequentLargeRange; import accord.config.LocalConfig; import accord.config.MutableLocalConfig; +import accord.coordinate.Barrier; import accord.coordinate.CoordinationAdapter; +import accord.coordinate.Exhausted; +import accord.coordinate.Invalidated; +import accord.coordinate.Preempted; +import accord.coordinate.Timeout; import accord.impl.CoordinateDurabilityScheduling; import accord.impl.MessageListener; import accord.impl.PrefixedIntHashKey; @@ -61,16 +67,20 @@ import accord.local.Node.Id; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.ShardDistributor; -import accord.messages.LocalRequest; import accord.messages.Message; import accord.messages.MessageType; import accord.messages.Reply; import accord.messages.Request; import accord.messages.SafeCallback; +import accord.primitives.Keys; +import accord.primitives.Range; import accord.primitives.Ranges; +import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.topology.Topology; import accord.topology.TopologyRandomizer; +import accord.utils.Gens; +import accord.utils.Invariants; import accord.utils.RandomSource; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; @@ -79,6 +89,9 @@ import static accord.impl.basic.Cluster.OverrideLinksKind.NONE; import static accord.impl.basic.Cluster.OverrideLinksKind.RANDOM_BIDIRECTIONAL; import static accord.impl.basic.NodeSink.Action.DELIVER; import static accord.impl.basic.NodeSink.Action.DROP; +import static accord.utils.AccordGens.keysInsideRanges; +import static accord.utils.AccordGens.rangeInsideRange; +import static accord.utils.Gens.mixedDistribution; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -127,6 +140,7 @@ public class Cluster implements Scheduler final RandomSource random; final LinkConfig linkConfig; final Function<Id, Node> lookup; + final Function<Id, Journal> journalLookup; final PendingQueue pending; final Runnable checkFailures; final List<Runnable> onDone = new ArrayList<>(); @@ -137,13 +151,14 @@ public class Cluster implements Scheduler int recurring; BiFunction<Id, Id, Link> links; - public Cluster(RandomSource random, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, Node> lookup, IntSupplier rf, Consumer<Packet> responseSink) + public Cluster(RandomSource random, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id, Node> lookup, Function<Id, Journal> journalLookup, IntSupplier rf, Consumer<Packet> responseSink) { this.random = random; this.messageListener = messageListener; this.pending = queueSupplier.get(); this.checkFailures = checkFailures; this.lookup = lookup; + this.journalLookup = journalLookup; this.responseSink = responseSink; this.linkConfig = defaultLinkConfig(random, rf); this.links = linkConfig.defaultLinks; @@ -216,7 +231,7 @@ public class Cluster implements Scheduler else callback.success(deliver.src, reply); } } - else on.receive((Request) deliver.message, deliver.src, deliver); + else journalLookup.apply(deliver.dst).handle((Request) deliver.message, deliver.src, deliver); } else { @@ -269,10 +284,11 @@ public class Cluster implements Scheduler Topology topology = topologyFactory.toTopology(nodes); Map<Id, Node> nodeMap = new LinkedHashMap<>(); Map<Id, AgentExecutor> executorMap = new LinkedHashMap<>(); + Map<Id, Journal> journalMap = new LinkedHashMap<>(); try { RandomSource random = randomSupplier.get(); - Cluster sinks = new Cluster(randomSupplier.get(), messageListener, queueSupplier, checkFailures, nodeMap::get, () -> topologyFactory.rf, responseSink); + Cluster sinks = new Cluster(randomSupplier.get(), messageListener, queueSupplier, checkFailures, nodeMap::get, journalMap::get, () -> topologyFactory.rf, responseSink); TopologyUpdates topologyUpdates = new TopologyUpdates(executorMap::get); TopologyRandomizer.Listener schemaApply = t -> { for (Node node : nodeMap.values()) @@ -280,9 +296,11 @@ public class Cluster implements Scheduler ListStore store = (ListStore) node.commandStores().dataStore(); store.onTopologyUpdate(node, t); } + messageListener.onTopologyChange(t); }; TopologyRandomizer configRandomizer = new TopologyRandomizer(randomSupplier, topology, topologyUpdates, nodeMap::get, schemaApply); List<CoordinateDurabilityScheduling> durabilityScheduling = new ArrayList<>(); + List<Service> services = new ArrayList<>(); for (Id id : nodes) { MessageSink messageSink = sinks.create(id, randomSupplier.get()); @@ -291,13 +309,15 @@ public class Cluster implements Scheduler BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) -> configRandomizer.onStale(id, sinceAtLeast, ranges); AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, onStale); executorMap.put(id, nodeExecutor); + Journal journal = new Journal(messageListener); + journalMap.put(id, journal); BurnTestConfigurationService configService = new BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, nodeMap::get, topologyUpdates); - BooleanSupplier isLoadedCheck = random.biasedUniformBools(0.5f); - Node node = new Node(id, messageSink, LocalRequest::process, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier), + BooleanSupplier isLoadedCheck = Gens.supplier(Gens.bools().mixedDistribution().next(random), random); + Node node = new Node(id, messageSink, journal, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier), () -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()), nodeExecutor.agent(), randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending, isLoadedCheck), new CoordinationAdapter.DefaultFactory(), + SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending, isLoadedCheck, journal), new CoordinationAdapter.DefaultFactory(), localConfig); CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node); // TODO (desired): randomise @@ -306,6 +326,7 @@ public class Cluster implements Scheduler durabilityScheduling.add(durability); nodeMap.put(id, node); durabilityScheduling.add(new CoordinateDurabilityScheduling(node)); + services.add(new BarrierService(node, randomSupplier.get())); } Runnable updateDurabilityRate; @@ -328,6 +349,7 @@ public class Cluster implements Scheduler schemaApply.onUpdate(topology); // startup + journalMap.entrySet().forEach(e -> e.getValue().start(nodeMap.get(e.getKey()))); AsyncResult<?> startup = AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult(); while (sinks.processPending()); Assertions.assertTrue(startup.isDone()); @@ -341,10 +363,12 @@ public class Cluster implements Scheduler Scheduled reconfigure = sinks.recurring(configRandomizer::maybeUpdateTopology, 1, SECONDS); durabilityScheduling.forEach(CoordinateDurabilityScheduling::start); + services.forEach(Service::start); noMoreWorkSignal.accept(() -> { reconfigure.cancel(); durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop); + services.forEach(Service::close); }); readySignal.accept(nodeMap); @@ -357,6 +381,7 @@ public class Cluster implements Scheduler chaos.cancel(); reconfigure.cancel(); durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop); + services.forEach(Service::close); sinks.links = sinks.linkConfig.defaultLinks; // give progress log et al a chance to finish @@ -379,10 +404,117 @@ public class Cluster implements Scheduler } finally { + journalMap.values().forEach(Journal::shutdown); nodeMap.values().forEach(Node::shutdown); } } + private interface Service extends AutoCloseable + { + void start(); + @Override + void close(); + } + + private static abstract class AbstractService implements Service, Runnable + { + protected final Node node; + protected final RandomSource rs; + private Scheduled scheduled; + + protected AbstractService(Node node, RandomSource rs) + { + this.node = node; + this.rs = rs; + } + + @Override + public void start() + { + Invariants.checkState(scheduled == null, "Start already called..."); + this.scheduled = node.scheduler().recurring(this, 1, SECONDS); + } + + protected abstract void doRun() throws Exception; + + @Override + public final void run() + { + try + { + doRun(); + } + catch (Throwable t) + { + node.agent().onUncaughtException(t); + } + } + + @Override + public void close() + { + if (scheduled != null) + { + scheduled.cancel(); + scheduled = null; + } + } + } + + private static class BarrierService extends AbstractService + { + private final Supplier<BarrierType> typeSupplier; + private final Supplier<Boolean> includeRangeSupplier; + private final Supplier<Boolean> wholeOrPartialSupplier; + + private BarrierService(Node node, RandomSource rs) + { + super(node, rs); + this.typeSupplier = mixedDistribution(BarrierType.values()).next(rs).asSupplier(rs); + this.includeRangeSupplier = Gens.bools().mixedDistribution().next(rs).asSupplier(rs); + this.wholeOrPartialSupplier = Gens.bools().mixedDistribution().next(rs).asSupplier(rs); + } + + @Override + public void doRun() + { + Topology current = node.topology().current(); + Ranges ranges = current.rangesForNode(node.id()); + if (ranges.isEmpty()) + return; + BarrierType type = typeSupplier.get(); + if (type == BarrierType.local) + { + run(node, Keys.of(keysInsideRanges(ranges).next(rs)), current.epoch(), type); + } + else + { + List<Range> subset = new ArrayList<>(); + for (Range range : ranges) + { + if (includeRangeSupplier.get()) + subset.add(wholeOrPartialSupplier.get() ? range : rangeInsideRange(range).next(rs)); + } + if (subset.isEmpty()) + return; + run(node, Ranges.of(subset.toArray(Range[]::new)), current.epoch(), type); + } + } + + private <S extends Seekables<?, ?>> void run(Node node, S keysOrRanges, long epoch, BarrierType type) + { + Barrier.barrier(node, keysOrRanges, epoch, type).begin((s, f) -> { + if (f != null) + { + // ignore specific errors + if (f instanceof Invalidated || f instanceof Timeout || f instanceof Preempted || f instanceof Exhausted) + return; + node.agent().onUncaughtException(f); + } + }); + } + } + private static BiFunction<Id, Id, Link> partition(List<Id> nodes, RandomSource random, int rf, BiFunction<Id, Id, Link> up) { Collections.shuffle(nodes, random.asJdkRandom()); diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index b33edcc0..ea97c5e0 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -19,6 +19,8 @@ package accord.impl.basic; import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.Callable; import java.util.function.BooleanSupplier; @@ -31,16 +33,26 @@ import accord.api.DataStore; import accord.api.ProgressLog; import accord.impl.InMemoryCommandStore; import accord.impl.InMemoryCommandStores; +import accord.impl.InMemorySafeCommand; +import accord.impl.InMemorySafeCommandsForKey; +import accord.impl.InMemorySafeTimestampsForKey; import accord.impl.PrefixedIntHashKey; import accord.impl.basic.TaskExecutorService.Task; +import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores; +import accord.local.CommonAttributes; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.PreLoadContext; import accord.local.SafeCommandStore; +import accord.local.SerializerSupport; import accord.local.ShardDistributor; +import accord.messages.Message; import accord.primitives.Range; +import accord.primitives.RoutableKey; +import accord.primitives.Txn; +import accord.primitives.TxnId; import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.RandomSource; @@ -49,15 +61,15 @@ import accord.utils.async.AsyncChains; public class DelayedCommandStores extends InMemoryCommandStores.SingleThread { - private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService executorService, BooleanSupplier isLoadedCheck) + private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService executorService, BooleanSupplier isLoadedCheck, Journal journal) { - super(time, agent, store, random, shardDistributor, progressLogFactory, DelayedCommandStore.factory(executorService, isLoadedCheck)); + super(time, agent, store, random, shardDistributor, progressLogFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal)); } - public static CommandStores.Factory factory(PendingQueue pending, BooleanSupplier isLoadedCheck) + public static CommandStores.Factory factory(PendingQueue pending, BooleanSupplier isLoadedCheck, Journal journal) { return (time, agent, store, random, shardDistributor, progressLogFactory) -> - new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck); + new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal); } @Override @@ -101,12 +113,56 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private final SimulatedDelayedExecutorService executor; private final Queue<Task<?>> pending = new LinkedList<>(); private final BooleanSupplier isLoadedCheck; + private final Journal journal; - public DelayedCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck) + public DelayedCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal) { super(id, time, agent, store, progressLogFactory, epochUpdateHolder); this.executor = executor; this.isLoadedCheck = isLoadedCheck; + this.journal = journal; + } + + @Override + protected void validateRead(Command current) + { + // "loading" the command doesn't make sense as we don't "store" the command... + if (current.txnId().kind() == Txn.Kind.EphemeralRead) + return; + //TODO (correctness): these type of txn must be durable but currently they are not... should make sure this is plugged into the C* journal properly for reply + if (current.txnId().kind() == Txn.Kind.LocalOnly) + return; + Command.WaitingOn waitingOn = null; + if (current.isStable() && !current.isTruncated()) + waitingOn = current.asCommitted().waitingOn; + SerializerSupport.MessageProvider messages = journal.makeMessageProvider(current.txnId()); + Command.WaitingOn finalWaitingOn = waitingOn; + CommonAttributes.Mutable mutable = current.mutable(); + mutable.partialDeps(null).removePartialTxn(); + Command reconstructed; + try + { + reconstructed = SerializerSupport.reconstruct(unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, messages); + } + catch (IllegalStateException t) + { + //TODO (correctness): journal doesn’t guarantee we pick the same records we used to state transition + // Journal stores a list of messages it saw in some order it defines, but when reconstructing a command we don't actually know what messages were used, this could + // lead to a case where deps mismatch, so ignoring this for now + if (t.getMessage() != null && t.getMessage().startsWith("Deps do not match; expected")) + return; + throw t; + } + //TODO (correctness): journal doesn’t guarantee we pick the same records we used to state transition + if (current.partialDeps() != null && !current.partialDeps().rangeDeps.equals(reconstructed.partialDeps().rangeDeps)) + return; + // for some reasons scope doesn't alaways match, this might be due to journal... what sucks is that this can also be a bug in the extract, so its + // hard to figure out what happened. + if (current.partialDeps() != null && !current.partialDeps().equals(reconstructed.partialDeps())) + return; + if (current.isCommitted() && !current.isTruncated() && !Objects.equals(current.asCommitted().waitingOn(), reconstructed.asCommitted().waitingOn())) + return; +// Invariants.checkState(current.equals(reconstructed), "Commands did not match: expected %s, given %s", current, reconstructed); } @Override @@ -115,9 +171,9 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread return isLoadedCheck.getAsBoolean(); } - private static CommandStore.Factory factory(SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck) + private static CommandStore.Factory factory(SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal) { - return (id, time, agent, store, progressLogFactory, rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, progressLogFactory, rangesForEpoch, executor, isLoadedCheck); + return (id, time, agent, store, progressLogFactory, rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, progressLogFactory, rangesForEpoch, executor, isLoadedCheck, journal); } @Override @@ -190,5 +246,42 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread { } + + @Override + protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys) + { + return new DelayedSafeStore(this, ranges, context, commands, timestampsForKey, commandsForKeys); + } + } + + public static class DelayedSafeStore extends InMemoryCommandStore.InMemorySafeStore + { + private final DelayedCommandStore commandStore; + public DelayedSafeStore(DelayedCommandStore commandStore, RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey) + { + super(commandStore, ranges, context, commands, timestampsForKey, commandsForKey); + this.commandStore = commandStore; + } + + @Override + public void postExecute() + { + if (context instanceof Message) + { + Message m = (Message) context; + if (m.type() != null && !m.type().hasSideEffects()) + { + // double check there are no modifications + commands.entrySet().forEach(e -> { + InMemorySafeCommand safe = e.getValue(); + if (!safe.isModified()) return; + commandStore.validateRead(safe.current()); + Command original = safe.original(); + if (original != null) + commandStore.validateRead(original); + }); + } + } + } } } diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java b/accord-core/src/test/java/accord/impl/basic/Journal.java new file mode 100644 index 00000000..7c0217b5 --- /dev/null +++ b/accord-core/src/test/java/accord/impl/basic/Journal.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.basic; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import accord.impl.MessageListener; +import accord.local.Node; +import accord.local.SerializerSupport; +import accord.messages.AbstractEpochRequest; +import accord.messages.Accept; +import accord.messages.Apply; +import accord.messages.ApplyThenWaitUntilApplied; +import accord.messages.BeginRecovery; +import accord.messages.Commit; +import accord.messages.LocalRequest; +import accord.messages.Message; +import accord.messages.MessageType; +import accord.messages.PreAccept; +import accord.messages.Propagate; +import accord.messages.ReplyContext; +import accord.messages.Request; +import accord.messages.TxnRequest; +import accord.primitives.Ballot; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.collections.LongArrayList; + +import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ; +import static accord.messages.MessageType.ACCEPT_REQ; +import static accord.messages.MessageType.APPLY_MAXIMAL_REQ; +import static accord.messages.MessageType.APPLY_MINIMAL_REQ; +import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ; +import static accord.messages.MessageType.BEGIN_INVALIDATE_REQ; +import static accord.messages.MessageType.BEGIN_RECOVER_REQ; +import static accord.messages.MessageType.COMMIT_INVALIDATE_REQ; +import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ; +import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ; +import static accord.messages.MessageType.INFORM_DURABLE_REQ; +import static accord.messages.MessageType.INFORM_OF_TXN_REQ; +import static accord.messages.MessageType.PRE_ACCEPT_REQ; +import static accord.messages.MessageType.PROPAGATE_APPLY_MSG; +import static accord.messages.MessageType.PROPAGATE_OTHER_MSG; +import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG; +import static accord.messages.MessageType.PROPAGATE_STABLE_MSG; +import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ; +import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ; +import static accord.messages.MessageType.STABLE_FAST_PATH_REQ; +import static accord.messages.MessageType.STABLE_MAXIMAL_REQ; +import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ; + +public class Journal implements LocalRequest.Handler, Runnable +{ + private static final TxnIdProvider EPOCH = msg -> ((AbstractEpochRequest<?>) msg).txnId; + private static final TxnIdProvider TXN = msg -> ((TxnRequest<?>) msg).txnId; + private static final TxnIdProvider LOCAL = msg -> ((LocalRequest<?>) msg).primaryTxnId(); + private static final TxnIdProvider INVL = msg -> ((Commit.Invalidate) msg).primaryTxnId(); + private static final Map<MessageType, TxnIdProvider> typeToProvider = ImmutableMap.<MessageType, TxnIdProvider>builder() + .put(PRE_ACCEPT_REQ, TXN) + .put(ACCEPT_REQ, TXN) + .put(ACCEPT_INVALIDATE_REQ, EPOCH) + .put(COMMIT_SLOW_PATH_REQ, TXN) + .put(COMMIT_MAXIMAL_REQ, TXN) + .put(STABLE_FAST_PATH_REQ, TXN) + .put(STABLE_SLOW_PATH_REQ, TXN) + .put(STABLE_MAXIMAL_REQ, TXN) + .put(COMMIT_INVALIDATE_REQ, INVL) + .put(APPLY_MINIMAL_REQ, TXN) + .put(APPLY_MAXIMAL_REQ, TXN) + .put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, EPOCH) + .put(BEGIN_RECOVER_REQ, TXN) + .put(BEGIN_INVALIDATE_REQ, EPOCH) + .put(INFORM_OF_TXN_REQ, EPOCH) + .put(INFORM_DURABLE_REQ, TXN) + .put(SET_SHARD_DURABLE_REQ, EPOCH) + .put(SET_GLOBALLY_DURABLE_REQ, EPOCH) + .put(PROPAGATE_PRE_ACCEPT_MSG, LOCAL) + .put(PROPAGATE_STABLE_MSG, LOCAL) + .put(PROPAGATE_APPLY_MSG, LOCAL) + .put(PROPAGATE_OTHER_MSG, LOCAL) + .build(); + + private final Queue<RequestContext> unframedRequests = new ArrayDeque<>(); + private final LongArrayList waitForEpochs = new LongArrayList(); + private final Long2ObjectHashMap<ArrayList<RequestContext>> delayedRequests = new Long2ObjectHashMap<>(); + private final Map<TxnId, Map<MessageType, Message>> writes = new HashMap<>(); + private final MessageListener messageListener; + private Node node; + + public Journal(MessageListener messageListener) + { + this.messageListener = messageListener; + } + + public void start(Node node) + { + this.node = node; + node.scheduler().recurring(this, 1, TimeUnit.MILLISECONDS); + } + + public void shutdown() + { + this.node = null; + } + + @Override + public void handle(LocalRequest<?> message, Node node) + { + messageListener.onMessage(NodeSink.Action.DELIVER, node.id(), node.id(), -1, message); + if (message.type().hasSideEffects()) + { + // enqueue + unframedRequests.add(new RequestContext(message, () -> node.scheduler().now(() -> message.process(node)))); + return; + } + message.process(node); + } + + public void handle(Request request, Node.Id from, ReplyContext replyContext) + { + if (request.type() != null && request.type().hasSideEffects()) + { + // enqueue + unframedRequests.add(new RequestContext(request, () -> node.receive(request, from, replyContext))); + return; + } + node.receive(request, from, replyContext); + } + + private void save(Message request) + { + MessageType type = request.type(); + TxnIdProvider provider = typeToProvider.get(type); + Invariants.nonNull(provider, "Unknown type %s: %s", type, request); + TxnId txnId = provider.txnId(request); + writes.computeIfAbsent(txnId, ignore -> new Testing()).put(type, request); + } + + public SerializerSupport.MessageProvider makeMessageProvider(TxnId txnId) + { + return new MessageProvider(txnId, writes.getOrDefault(txnId, Map.of())); + } + + private static class Testing extends LinkedHashMap<MessageType, Message> + { + public Map<MessageType, List<Message>> history() + { + LinkedHashMap<MessageType, List<Message>> history = new LinkedHashMap<>(); + for (MessageType k : keySet()) + { + Object current = super.get(k); + history.put(k, current instanceof List ? (List<Message>) current : Collections.singletonList((Message) current)); + } + return history; + } + + @Override + public Message get(Object key) + { + Object current = super.get(key); + if (current == null || current instanceof Message) + return (Message) current; + List<Message> messages = (List<Message>) current; + return messages.get(messages.size() - 1); + } + + @Override + public Message put(MessageType key, Message value) + { + Object current = super.get(key); + if (current == null) + return super.put(key, value); + else if (current instanceof List) + { + List<Message> list = (List<Message>) current; + list.add(value); + return list.get(list.size() - 2); + } + else + { + List<Message> messages = new ArrayList<>(); + messages.add((Message) current); + messages.add(value); + super.put(key, value); + return (Message) current; + } + } + } + + @Override + public void run() + { + if (this.node == null) + return; + try + { + doRun(); + } + catch (Throwable t) + { + node.agent().onUncaughtException(t); + } + } + + private void doRun() + { + ArrayList<RequestContext> requests = null; + // check to see if any pending epochs are in + waitForEpochs.sort(null); + for (int i = 0; i < waitForEpochs.size(); i++) + { + long waitForEpoch = waitForEpochs.getLong(i); + if (!node.topology().hasEpoch(waitForEpoch)) + break; + List<RequestContext> delayed = delayedRequests.remove(waitForEpoch); + if (null == requests) requests = new ArrayList<>(delayed.size()); + requests.addAll(delayed); + } + waitForEpochs.removeIfLong(epoch -> !delayedRequests.containsKey(epoch)); + + // for anything queued, put into the pending epochs or schedule + RequestContext request; + while (null != (request = unframedRequests.poll())) + { + long waitForEpoch = request.waitForEpoch; + if (waitForEpoch != 0 && !node.topology().hasEpoch(waitForEpoch)) + { + delayedRequests.computeIfAbsent(waitForEpoch, ignore -> new ArrayList<>()).add(request); + if (!waitForEpochs.containsLong(waitForEpoch)) + waitForEpochs.addLong(waitForEpoch); + } + else + { + if (null == requests) requests = new ArrayList<>(); + requests.add(request); + } + } + + // schedule + if (requests != null) + { + requests.forEach(r -> save(r.message)); // save in batches to simulate journal more... + requests.forEach(Runnable::run); + } + } + + @FunctionalInterface + interface TxnIdProvider + { + TxnId txnId(Message message); + } + + private static class RequestContext implements Runnable + { + final long waitForEpoch; + final Message message; + final Runnable fn; + + protected RequestContext(Request request, Runnable fn) + { + this.waitForEpoch = request.waitForEpoch(); + this.message = request; + this.fn = fn; + } + + @Override + public void run() + { + fn.run(); + } + } + + public static class MessageProvider implements SerializerSupport.MessageProvider + { + public final TxnId txnId; + private final Map<MessageType, Message> writes; + + public MessageProvider(TxnId txnId, Map<MessageType, Message> writes) + { + this.txnId = txnId; + this.writes = writes; + } + + @Override + public TxnId txnId() + { + return txnId; + } + + @Override + public Set<MessageType> test(Set<MessageType> messages) + { + return Sets.intersection(writes.keySet(), messages); + } + + @Override + public Set<MessageType> all() + { + return writes.keySet(); + } + + public Map<MessageType, Message> allMessages() + { + var all = all(); + Map<MessageType, Message> map = Maps.newHashMapWithExpectedSize(all.size()); + for (MessageType messageType : all) + map.put(messageType, get(messageType)); + return map; + } + + public <T extends Message> T get(MessageType type) + { + return (T) writes.get(type); + } + + @Override + public PreAccept preAccept() + { + return get(PRE_ACCEPT_REQ); + } + + @Override + public BeginRecovery beginRecover() + { + return get(BEGIN_RECOVER_REQ); + } + + @Override + public Propagate propagatePreAccept() + { + return get(PROPAGATE_PRE_ACCEPT_MSG); + } + + @Override + public Accept accept(Ballot ballot) + { + return get(ACCEPT_REQ); + } + + @Override + public Commit commitSlowPath() + { + return get(COMMIT_SLOW_PATH_REQ); + } + + @Override + public Commit commitMaximal() + { + return get(COMMIT_MAXIMAL_REQ); + } + + @Override + public Commit stableFastPath() + { + return get(STABLE_FAST_PATH_REQ); + } + + @Override + public Commit stableSlowPath() + { + return get(STABLE_SLOW_PATH_REQ); + } + + @Override + public Commit stableMaximal() + { + return get(STABLE_MAXIMAL_REQ); + } + + @Override + public Propagate propagateStable() + { + return get(PROPAGATE_STABLE_MSG); + } + + @Override + public Apply applyMinimal() + { + return get(APPLY_MINIMAL_REQ); + } + + @Override + public Apply applyMaximal() + { + return get(APPLY_MAXIMAL_REQ); + } + + @Override + public Propagate propagateApply() + { + return get(PROPAGATE_APPLY_MSG); + } + + @Override + public Propagate propagateOther() + { + return get(PROPAGATE_OTHER_MSG); + } + + @Override + public ApplyThenWaitUntilApplied applyThenWaitUntilApplied() + { + return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ); + } + } +} diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java index d425bd34..d6ce5f22 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRead.java +++ b/accord-core/src/test/java/accord/impl/list/ListRead.java @@ -19,6 +19,7 @@ package accord.impl.list; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import accord.local.SafeCommandStore; @@ -107,6 +108,23 @@ public class ListRead implements Read return new ListRead(executor, isEphemeralRead, ((Seekables) userReadKeys).with(((ListRead)other).userReadKeys), ((Seekables)keys).with(((ListRead)other).keys)); } + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ListRead listRead = (ListRead) o; + return isEphemeralRead == listRead.isEphemeralRead + && Objects.equals(userReadKeys, listRead.userReadKeys) + && Objects.equals(keys, listRead.keys); + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } + @Override public String toString() { diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java index 5e8cb634..c9da1719 100644 --- a/accord-core/src/test/java/accord/impl/list/ListResult.java +++ b/accord-core/src/test/java/accord/impl/list/ListResult.java @@ -19,6 +19,7 @@ package accord.impl.list; import java.util.Arrays; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -113,6 +114,40 @@ public class ListResult implements Result, Reply return status; } + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ListResult that = (ListResult) o; + return requestId == that.requestId + && Objects.equals(client, that.client) + && Objects.equals(txnId, that.txnId) + && Objects.equals(readKeys, that.readKeys) + && Objects.equals(responseKeys, that.responseKeys) + && equals(read, that.read) + && Objects.equals(update, that.update) + && status == that.status; + } + + private static boolean equals(int[][] a, int[][] b) + { + if (a == b) return true; + if (a == null || b == null) return false; + if (a.length != b.length) return false; + for (int i = 0; i < a.length; i++) + { + if (!Arrays.equals(a[i], b[i])) return false; + } + return true; + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } + @Override public String toString() { diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index d669fd8d..2e31cbec 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -47,6 +47,7 @@ import accord.messages.WaitUntilApplied; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.RoutableKey; +import accord.primitives.Seekable; import accord.primitives.SyncPoint; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -257,6 +258,16 @@ public class ListStore implements DataStore return String.format("(%s -> %s)", sp.syncId, sp.keysOrRanges); } + public String historySeekable(Seekable o) + { + switch (o.domain()) + { + case Key: return history(o.asKey()); + case Range: return history(Ranges.single(o.asRange())); + default: throw new IllegalArgumentException("Unknown domain: " + o.domain() + ", input=" + o); + } + } + private String history(Ranges ranges) { return history("range", ranges, other -> other.intersects(ranges)); diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java index 1b9d0fd4..1ff9e7ee 100644 --- a/accord-core/src/test/java/accord/impl/list/ListWrite.java +++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java @@ -23,6 +23,8 @@ import java.util.TreeMap; import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.collect.Sets; + import accord.impl.*; import accord.primitives.*; import org.slf4j.Logger; @@ -64,6 +66,31 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write }); } + @Override + public boolean equals(Object o) + { + if (o == this) return true; + if (!(o instanceof ListWrite)) return false; + ListWrite other = (ListWrite) o; + // Can not rely on Map.equals as our value is an array: (new int[] {2}).equals(new int[] {2}) == false! + if (!Sets.difference(keySet(), other.keySet()).isEmpty() + || !Sets.difference(other.keySet(), keySet()).isEmpty()) + return false; + // keys match + for (Key k : keySet()) + { + if (!Arrays.equals(get(k), other.get(k))) + return false; + } + return true; + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } + @Override public String toString() { diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java b/accord-core/src/test/java/accord/utils/AccordGens.java index 971c70e7..3247ebb1 100644 --- a/accord-core/src/test/java/accord/utils/AccordGens.java +++ b/accord-core/src/test/java/accord/utils/AccordGens.java @@ -213,6 +213,17 @@ public class AccordGens }; } + public static Gen<Key> keysInsideRanges(Ranges ranges) + { + Invariants.checkArgument(!ranges.isEmpty(), "Ranges empty"); + RoutingKey sample = ranges.get(0).end(); + if (sample instanceof PrefixedIntHashKey) + return prefixedIntHashKeyInsideRanges(ranges); + if (sample instanceof IntKey.Routing) + return intKeysInsideRanges(ranges); + throw new IllegalArgumentException("Unsupported key type " + sample.getClass() + "; supported = PrefixedIntHashKey, IntKey"); + } + public static Gen<KeyDeps> keyDeps(Gen<? extends Key> keyGen) { return keyDeps(keyGen, txnIds()); @@ -374,6 +385,39 @@ public class AccordGens return ranges(sizeGen, keyGen, (ignore, a, b) -> factory.apply(a, b)); } + public static Gen<Range> rangeInsideRange(Range range) + { + if (range.end() instanceof PrefixedIntHashKey) + return prefixedIntHashKeyRangeInsideRange(range); + throw new IllegalArgumentException("Unsupported type: " + range.start().getClass()); + } + + public static Gen<Range> prefixedIntHashKeyRangeInsideRange(Range range) + { + if (!(range.end() instanceof PrefixedIntHashKey)) + throw new IllegalArgumentException("Only PrefixedIntHashKey supported; saw " + range.end().getClass()); + PrefixedIntHashKey start = (PrefixedIntHashKey) range.start(); + PrefixedIntHashKey end = (PrefixedIntHashKey) range.end(); + if (start.hash + 1 == end.hash) + { + // range is of size 1, so can not split into a smaller range... + return ignore -> range; + } + return rs -> { + int a = rs.nextInt(start.hash, end.hash); + int b = rs.nextInt(start.hash, end.hash); + while (a == b) + b = rs.nextInt(start.hash, end.hash); + if (a > b) + { + int tmp = a; + a = b; + b = tmp; + } + return PrefixedIntHashKey.range(start.prefix, a, b); + }; + } + public static Gen<Ranges> prefixedIntHashKeyRanges(int numNodes, int rf) { return rs -> { diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index 3723fc60..fe44a81b 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Objects; import java.util.Set; +import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -55,13 +56,23 @@ public class Gens { return ignore -> constant.get(); } - public static <T> Gen<T> oneOf(Gen<T>... gens) + public static <T> Gen<T> oneOf(Gen<? extends T>... gens) { + switch (gens.length) + { + case 0: throw new IllegalArgumentException("Unable to select oneOf an empty list"); + case 1: return (Gen<T>) gens[0]; + } return oneOf(Arrays.asList(gens)); } - public static <T> Gen<T> oneOf(List<Gen<T>> gens) + public static <T> Gen<T> oneOf(List<Gen<? extends T>> gens) { + switch (gens.size()) + { + case 0: throw new IllegalArgumentException("Unable to select oneOf an empty list"); + case 1: return (Gen<T>) gens.get(0); + } return rs -> rs.pick(gens).next(rs); } @@ -465,6 +476,11 @@ public class Gens { return new StringDSL(); } + public static BooleanSupplier supplier(Gen<Boolean> gen, RandomSource rs) + { + return () -> gen.next(rs); + } + public static class BooleanDSL { public Gen<Boolean> all() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org