This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 29164d49b7e0975a243db434d081c69b273196a6 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Fri Nov 24 12:47:09 2023 +0000 Various fixes accept misses deps in newer epoch new IllegalStateException -> illegalState() fixup Fix bad propagation when find a single Erased response wip: fix partially bootstrapped key dep fix partially bootstrapped key dep fixup fix deps calculation for sync point and transactions fix inverted condition for recovery Do not adopt Erased unless globally durably decided add TODOs add TODOs use stronger guarantees to invalidate local commands use stronger guarantees to invalidate local commands comments fix redundant dependency purging (again) --- .../main/java/accord/api/ConfigurationService.java | 2 +- .../src/main/java/accord/coordinate/Barrier.java | 3 +- .../main/java/accord/coordinate/BlockOnDeps.java | 7 +- .../main/java/accord/coordinate/CheckShards.java | 4 +- .../java/accord/coordinate/FetchCoordinator.java | 3 +- .../src/main/java/accord/coordinate/Infer.java | 9 +- .../main/java/accord/coordinate/Invalidate.java | 3 +- .../main/java/accord/coordinate/MaybeRecover.java | 8 ++ .../java/accord/coordinate/ProposeAndExecute.java | 3 +- .../java/accord/coordinate/ProposeSyncPoint.java | 4 +- .../src/main/java/accord/coordinate/Recover.java | 5 +- .../java/accord/coordinate/RecoverWithRoute.java | 5 +- .../accord/coordinate/RecoverWithSomeRoute.java | 3 +- .../main/java/accord/coordinate/TxnExecute.java | 3 +- .../accord/coordinate/tracking/ReadTracker.java | 5 +- .../java/accord/impl/AbstractSafeCommandStore.java | 5 +- .../main/java/accord/impl/ErasedSafeCommand.java | 8 +- .../java/accord/impl/InMemoryCommandStore.java | 20 +-- .../main/java/accord/impl/InMemorySafeCommand.java | 4 +- .../main/java/accord/impl/SimpleProgressLog.java | 5 +- .../src/main/java/accord/local/Bootstrap.java | 3 +- .../src/main/java/accord/local/Command.java | 10 +- .../src/main/java/accord/local/CommandStore.java | 148 +++++++++++++-------- .../src/main/java/accord/local/CommandStores.java | 5 +- .../src/main/java/accord/local/Commands.java | 15 ++- accord-core/src/main/java/accord/local/Node.java | 7 +- .../main/java/accord/local/RedundantStatus.java | 3 +- .../main/java/accord/local/SafeCommandStore.java | 8 +- .../src/main/java/accord/local/SaveStatus.java | 11 +- .../main/java/accord/local/SerializerSupport.java | 7 +- accord-core/src/main/java/accord/local/Status.java | 4 +- .../main/java/accord/messages/AbstractExecute.java | 5 +- .../src/main/java/accord/messages/Accept.java | 2 +- .../accord/messages/ApplyThenWaitUntilApplied.java | 6 +- .../main/java/accord/messages/BeginRecovery.java | 7 +- .../src/main/java/accord/messages/PreAccept.java | 7 +- .../src/main/java/accord/messages/Propagate.java | 55 ++++---- .../main/java/accord/primitives/AbstractKeys.java | 14 ++ .../src/main/java/accord/primitives/KeyDeps.java | 87 ++++++++---- .../src/main/java/accord/primitives/Keys.java | 5 + .../main/java/accord/primitives/PartialTxn.java | 6 +- .../src/main/java/accord/primitives/Range.java | 3 +- .../src/main/java/accord/primitives/RangeDeps.java | 1 + .../src/main/java/accord/topology/Topologies.java | 4 +- .../src/main/java/accord/topology/Topology.java | 3 +- .../main/java/accord/topology/TopologyManager.java | 3 +- .../src/main/java/accord/utils/ArrayBuffers.java | 4 +- .../src/main/java/accord/utils/Invariants.java | 6 +- .../java/accord/utils/ReducingIntervalMap.java | 4 +- .../main/java/accord/utils/RelationMultiMap.java | 16 ++- .../src/main/java/accord/utils/SortedArrays.java | 7 +- .../java/accord/utils/ThreadPoolScheduler.java | 4 +- .../main/java/accord/utils/async/AsyncChains.java | 6 +- .../main/java/accord/utils/async/AsyncResult.java | 6 +- .../main/java/accord/utils/async/AsyncResults.java | 8 +- .../accord/burn/random/FrequentLargeRange.java | 6 +- .../src/test/java/accord/impl/list/ListAgent.java | 1 - .../test/java/accord/impl/list/ListRequest.java | 7 +- .../src/test/java/accord/impl/list/ListResult.java | 2 + .../test/java/accord/local/CheckedCommands.java | 10 +- .../test/java/accord/messages/ReadDataTest.java | 3 +- .../java/accord/utils/async/AsyncChainsTest.java | 9 +- .../src/test/java/accord/verify/ElleVerifier.java | 3 + .../accord/verify/SerializabilityVerifier.java | 6 +- .../verify/StrictSerializabilityVerifier.java | 5 +- .../src/main/java/accord/maelstrom/Body.java | 3 +- .../src/main/java/accord/maelstrom/Json.java | 4 +- .../main/java/accord/maelstrom/MaelstromReply.java | 4 +- .../java/accord/maelstrom/MaelstromRequest.java | 4 +- .../java/accord/maelstrom/MaelstromResult.java | 4 +- .../src/main/java/accord/maelstrom/Packet.java | 4 +- 71 files changed, 443 insertions(+), 231 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index 6ad9f20..49dd8d6 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -112,7 +112,7 @@ public interface ConfigurationService * Informs listeners of new topology. This is guaranteed to be called sequentially for each epoch after * the initial topology returned by `currentTopology` on startup. * - * TODO (required): document what this Future represents, or maybe refactor it away - only used for testing + * TODO (desired): document what this Future represents, or maybe refactor it away - only used for testing */ AsyncResult<Void> onTopologyUpdate(Topology topology, boolean startSync); diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java b/accord-core/src/main/java/accord/coordinate/Barrier.java index 5fb1966..f9d94a6 100644 --- a/accord-core/src/main/java/accord/coordinate/Barrier.java +++ b/accord-core/src/main/java/accord/coordinate/Barrier.java @@ -44,6 +44,7 @@ import static accord.local.PreLoadContext.contextFor; import static accord.primitives.Txn.Kind.Kinds.Any; import static accord.utils.Invariants.checkArgument; import static accord.utils.Invariants.checkState; +import static accord.utils.Invariants.illegalState; /** * Local or global barriers that return a result once all transactions have their side effects visible. @@ -284,7 +285,7 @@ public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractRes @Override public BarrierTxn reduce(BarrierTxn o1, BarrierTxn o2) { - throw new IllegalStateException("Should not be possible to find multiple transactions"); + throw illegalState("Should not be possible to find multiple transactions"); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java b/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java index f8b030c..475fd8c 100644 --- a/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java +++ b/accord-core/src/main/java/accord/coordinate/BlockOnDeps.java @@ -39,6 +39,7 @@ import accord.primitives.TxnId; import accord.topology.Topologies; import static accord.coordinate.tracking.RequestStatus.Failed; +import static accord.utils.Invariants.illegalState; /** * Block on deps at quorum for a sync point transaction, and then move the transaction to the applied state @@ -102,16 +103,16 @@ public class BlockOnDeps implements Callback<ReadReply> ReadNack nack = (ReadNack) reply; switch (nack) { - default: throw new IllegalStateException(); + default: throw illegalState(); case Redundant: // WaitUntilApplied only sends Redundant on truncation which implies durable and applied isDone = true; callback.accept(txn.result(txnId, txnId, null), null); break; case NotCommitted: - throw new IllegalStateException("Received `NotCommitted` response after sending maximal commit as part of `BlockOnDeps`"); + throw illegalState("Received `NotCommitted` response after sending maximal commit as part of `BlockOnDeps`"); case Invalid: - onFailure(from, new IllegalStateException("Submitted a read command to a replica that did not own the range")); + onFailure(from, illegalState("Submitted a read command to a replica that did not own the range")); break; } } diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java index 656bab7..e612baa 100644 --- a/accord-core/src/main/java/accord/coordinate/CheckShards.java +++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java @@ -27,6 +27,8 @@ import accord.messages.CheckStatus.IncludeInfo; import accord.primitives.*; import accord.topology.Topologies; +import static accord.utils.Invariants.illegalState; + /** * A result of null indicates the transaction is globally persistent * A result of CheckStatusOk indicates the maximum status found for the transaction, which may be used to assess progress @@ -102,7 +104,7 @@ public abstract class CheckShards<U extends Unseekables<?>> extends ReadCoordina { default: throw new AssertionError(String.format("Unexpected status: %s", reply)); case NotOwned: - finishOnFailure(new IllegalStateException("Submitted command to a replica that did not own the range"), true); + finishOnFailure(illegalState("Submitted command to a replica that did not own the range"), true); return Action.Aborted; } } diff --git a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java index 707a329..9b9ec06 100644 --- a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java +++ b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java @@ -33,6 +33,7 @@ import accord.utils.Invariants; import static accord.primitives.Routables.Slice.Minimal; import static accord.utils.Invariants.checkArgument; +import static accord.utils.Invariants.illegalState; /** * This abstract coordinator is to be extended by implementations. @@ -244,7 +245,7 @@ public abstract class FetchCoordinator protected StartingRangeFetch starting(Node.Id to, Ranges ranges) { if (isDone) - throw new IllegalStateException(); + throw illegalState(); // TODO (required, consider): should we cancel those we have superseded? return fetchRanges.starting(ranges); diff --git a/accord-core/src/main/java/accord/coordinate/Infer.java b/accord-core/src/main/java/accord/coordinate/Infer.java index d1fca2d..7b1956a 100644 --- a/accord-core/src/main/java/accord/coordinate/Infer.java +++ b/accord-core/src/main/java/accord/coordinate/Infer.java @@ -318,11 +318,6 @@ public class Infer return InvalidIfNot.NotKnownToBeInvalid; } - public enum YesNoMaybe - { - No, Maybe, Yes - } - public static boolean safeToCleanup(SafeCommandStore safeStore, Command command, Route<?> fetchedWith, @Nullable Timestamp executeAt) { Invariants.checkArgument(fetchedWith != null || command.route() != null); @@ -333,7 +328,9 @@ public class Infer Route<?> route = command.route(); if (route == null) route = fetchedWith; - // TODO (required): is it safe to cleanup without an executeAt? + // TODO (required): is it safe to cleanup without an executeAt? We don't know for sure which ranges it might participate in. + // We can infer the upper bound of execution by the "execution" of any ExclusiveSyncPoint used to infer the invalidation. + // We should begin evaluating and tracking this. executeAt = command.executeAtIfKnown(Timestamp.nonNullOrMax(executeAt, txnId)); Ranges coordinateRanges = safeStore.ranges().coordinates(txnId); Ranges acceptRanges = executeAt.epoch() == txnId.epoch() ? coordinateRanges : safeStore.ranges().allBetween(txnId, executeAt); diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java index d6b4e16..068d590 100644 --- a/accord-core/src/main/java/accord/coordinate/Invalidate.java +++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java @@ -47,6 +47,7 @@ import static accord.local.Status.Accepted; import static accord.local.Status.PreAccepted; import static accord.primitives.ProgressToken.INVALIDATED; import static accord.primitives.ProgressToken.TRUNCATED; +import static accord.utils.Invariants.illegalState; public class Invalidate implements Callback<InvalidateReply> { @@ -155,7 +156,7 @@ public class Invalidate implements Callback<InvalidateReply> switch (maxReply.status) { - default: throw new IllegalStateException(); + default: throw illegalState(); case Truncated: isDone = true; callback.accept(TRUNCATED, null); diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index 48c3754..df26168 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -85,6 +85,12 @@ public class MaybeRecover extends CheckShards<Route<?>> { default: throw new AssertionError(); case Unknown: + // TODO (required): ErasedOrInvalidated takes Unknown here. This is probably wrong, consider it more carefully. + // Specifically, we should probably not propose invalidation if it's possible the command has been Erased. + // We should probably introduce a MaybeErased Outcome. + // If we only have a partial route, and all end-points we contact are MaybeErased, we are either stale or + // we have enough local information to know the command's outcome is durable. + // If any shard is not MaybeErased but is also Unknown, then we are safe to Invalidate. if (known.canProposeInvalidation() && !Route.isFullRoute(full.route)) { // for correctness reasons, we have not necessarily preempted the initial pre-accept round and @@ -116,6 +122,8 @@ public class MaybeRecover extends CheckShards<Route<?>> break; case Erased: + // TODO (required): this isn't valid. This is either an invalidated command or we're stale. Most likely the latter. + // this is because Erased is only permitted to be adopted when every shard has made the command durable. Invariants.checkState(!full.knownFor(route.participants()).isInvalidated()); safeEraseAndCallback(node, txnId, someRoute, full.toProgressToken(), callback); } diff --git a/accord-core/src/main/java/accord/coordinate/ProposeAndExecute.java b/accord-core/src/main/java/accord/coordinate/ProposeAndExecute.java index b9687ff..0163c8e 100644 --- a/accord-core/src/main/java/accord/coordinate/ProposeAndExecute.java +++ b/accord-core/src/main/java/accord/coordinate/ProposeAndExecute.java @@ -54,7 +54,8 @@ class ProposeAndExecute extends Propose<Result> @Override void onAccepted() { - Deps deps = Deps.merge(acceptOks, ok -> ok.deps); + // TODO (required): disable merging with original deps by flag, and confirm fault detected + Deps deps = this.deps.with(Deps.merge(acceptOks, ok -> ok.deps)); Execute.execute(node, txnId, txn, route, executeAt, deps, callback); } } diff --git a/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java index 66db85e..e01af73 100644 --- a/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ProposeSyncPoint.java @@ -70,11 +70,13 @@ public class ProposeSyncPoint<S extends Seekables<?, ?>> extends Propose<SyncPoi @Override void onAccepted() { + // TODO (required): disable merging with original deps by flag, and confirm fault detected + Deps deps = this.deps.with(Deps.merge(acceptOks, ok -> ok.deps)); if (txnId.rw() == ExclusiveSyncPoint) { Apply.sendMaximal(node, txnId, route, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null)); node.configService().reportEpochClosed((Ranges)keysOrRanges, txnId.epoch()); - callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, (FullRangeRoute) route, true), null); + callback.accept(new SyncPoint<S>(txnId, deps, keysOrRanges, route, true), null); } else { diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index 25a7ea4..d76d336 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -63,6 +63,7 @@ import static accord.coordinate.tracking.RequestStatus.Success; import static accord.local.Status.Committed; import static accord.messages.BeginRecovery.RecoverOk.maxAcceptedOrLater; import static accord.utils.Invariants.debug; +import static accord.utils.Invariants.illegalState; // TODO (low priority, cleanup): rename to Recover (verb); rename Recover message to not clash public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throwable> @@ -237,7 +238,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw Timestamp executeAt = acceptOrCommit.executeAt; switch (acceptOrCommit.status) { - default: throw new IllegalStateException("Unknown status: " + acceptOrCommit.status); + default: throw illegalState("Unknown status: " + acceptOrCommit.status); case Truncated: callback.accept(ProgressToken.TRUNCATED, null); return; @@ -317,7 +318,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw case NotDefined: case PreAccepted: - throw new IllegalStateException("Should only be possible to have Accepted or later commands"); + throw illegalState("Should only be possible to have Accepted or later commands"); } } diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java index 32a0797..0259a7c 100644 --- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java +++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java @@ -50,6 +50,7 @@ import static accord.primitives.ProgressToken.APPLIED; import static accord.primitives.ProgressToken.INVALIDATED; import static accord.primitives.ProgressToken.TRUNCATED; import static accord.primitives.Route.castToFullRoute; +import static accord.utils.Invariants.illegalState; public class RecoverWithRoute extends CheckShards<FullRoute<?>> { @@ -156,7 +157,7 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> { // TODO (required): this logic should be put in Infer, alongside any similar inferences in Recover if (witnessedByInvalidation != null && witnessedByInvalidation.compareTo(Status.PreAccepted) > 0) - throw new IllegalStateException("We previously invalidated, finding a status that should be recoverable"); + throw illegalState("We previously invalidated, finding a status that should be recoverable"); Invalidate.invalidate(node, txnId, route, witnessedByInvalidation != null, callback); } else @@ -222,7 +223,7 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> case Invalidated: if (witnessedByInvalidation != null && witnessedByInvalidation.hasBeen(Status.PreCommitted)) - throw new IllegalStateException("We previously invalidated, finding a status that should be recoverable"); + throw illegalState("We previously invalidated, finding a status that should be recoverable"); Propagate.propagate(node, txnId, sourceEpoch, success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, f)); break; diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithSomeRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithSomeRoute.java index a5d4883..c2a2ccb 100644 --- a/accord-core/src/main/java/accord/coordinate/RecoverWithSomeRoute.java +++ b/accord-core/src/main/java/accord/coordinate/RecoverWithSomeRoute.java @@ -31,6 +31,7 @@ import accord.utils.Invariants; import static accord.primitives.Route.castToFullRoute; import static accord.primitives.Route.isFullRoute; +import static accord.utils.Invariants.illegalState; /** * A result of null indicates the transaction is globally persistent @@ -96,7 +97,7 @@ public class RecoverWithSomeRoute extends CheckShards<Route<?>> implements BiCon else { if (witnessedByInvalidation != null && witnessedByInvalidation.compareTo(Status.PreAccepted) > 0) - throw new IllegalStateException("We previously invalidated, finding a status that should be recoverable"); + throw illegalState("We previously invalidated, finding a status that should be recoverable"); Invalidate.invalidate(node, txnId, route, true, callback); } } diff --git a/accord-core/src/main/java/accord/coordinate/TxnExecute.java b/accord-core/src/main/java/accord/coordinate/TxnExecute.java index 690099e..9c41be9 100644 --- a/accord-core/src/main/java/accord/coordinate/TxnExecute.java +++ b/accord-core/src/main/java/accord/coordinate/TxnExecute.java @@ -44,6 +44,7 @@ import org.agrona.collections.IntHashSet; import static accord.coordinate.ReadCoordinator.Action.Approve; import static accord.coordinate.ReadCoordinator.Action.ApprovePartial; +import static accord.utils.Invariants.illegalState; public class TxnExecute extends ReadCoordinator<ReadReply> implements Execute { @@ -119,7 +120,7 @@ public class TxnExecute extends ReadCoordinator<ReadReply> implements Execute // also try sending a read command to another replica, in case they're ready to serve a response return Action.TryAlternative; case Invalid: - callback.accept(null, new IllegalStateException("Submitted a read command to a replica that did not own the range")); + callback.accept(null, illegalState("Submitted a read command to a replica that did not own the range")); return Action.Aborted; } } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java index 5563945..e9cf939 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java @@ -34,6 +34,7 @@ import accord.topology.Topologies; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*; import static accord.primitives.Routables.Slice.Minimal; +import static accord.utils.Invariants.illegalState; import static com.google.common.collect.Sets.newHashSetWithExpectedSize; public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker> @@ -218,7 +219,7 @@ public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker> protected void recordInFlightRead(Id node) { if (!inflight.add(node)) - throw new IllegalStateException(node + " already in flight"); + throw illegalState(node + " already in flight"); recordResponse(this, node, ReadShardTracker::recordInFlightRead, false); } @@ -226,7 +227,7 @@ public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker> private boolean receiveResponseIsSlow(Id node) { if (!inflight.remove(node)) - throw new IllegalStateException("Nothing in flight for " + node); + throw illegalState("Nothing in flight for " + node); return slow != null && slow.remove(node); } diff --git a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java index 0965f46..89058bf 100644 --- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java +++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java @@ -32,6 +32,9 @@ import accord.local.*; import accord.primitives.*; import accord.utils.Invariants; +import static accord.utils.Invariants.illegalState; +import static java.lang.String.format; + public abstract class AbstractSafeCommandStore<CommandType extends SafeCommand, TimestampsForKeyType extends SafeTimestampsForKey, CommandsForKeyType extends SafeCommandsForKey, @@ -108,7 +111,7 @@ public abstract class AbstractSafeCommandStore<CommandType extends SafeCommand, { CommandType command = getCommandInternal(txnId); if (command == null) - throw new IllegalStateException(String.format("%s was not specified in PreLoadContext", txnId)); + throw illegalState(format("%s was not specified in PreLoadContext", txnId)); if (command.isEmpty()) command.uninitialised(); return command; diff --git a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java index 48cde3b..675f62f 100644 --- a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java +++ b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java @@ -24,18 +24,22 @@ import accord.local.Listeners; import accord.local.SafeCommand; import accord.local.SaveStatus; import accord.primitives.TxnId; +import accord.utils.Invariants; import static accord.local.Listeners.Immutable.EMPTY; +import static accord.local.SaveStatus.Erased; +import static accord.local.SaveStatus.ErasedOrInvalidated; import static accord.local.Status.Durability.UniversalOrInvalidated; public class ErasedSafeCommand extends SafeCommand { final Command erased; - public ErasedSafeCommand(TxnId txnId) + public ErasedSafeCommand(TxnId txnId, SaveStatus saveStatus) { super(txnId); - this.erased = new Command.Truncated(txnId, SaveStatus.Erased, UniversalOrInvalidated, null, null, EMPTY, null, null); + Invariants.checkArgument(saveStatus == Erased || saveStatus == ErasedOrInvalidated); + this.erased = new Command.Truncated(txnId, saveStatus, UniversalOrInvalidated, null, null, EMPTY, null, null); } @Override diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 523212e..2ff29d2 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -81,6 +81,8 @@ import static accord.local.Status.Committed; import static accord.local.Status.Truncated; import static accord.local.Status.NotDefined; import static accord.primitives.Routables.Slice.Minimal; +import static accord.utils.Invariants.illegalState; +import static java.lang.String.format; public abstract class InMemoryCommandStore extends CommandStore { @@ -526,7 +528,7 @@ public abstract class InMemoryCommandStore extends CommandStore public SafeCommandStore beginOperation(PreLoadContext context) { if (current != null) - throw new IllegalStateException("Another operation is in progress or it's store was not cleared"); + throw illegalState("Another operation is in progress or it's store was not cleared"); current = createSafeStore(context, updateRangesForEpoch()); return current; } @@ -534,7 +536,7 @@ public abstract class InMemoryCommandStore extends CommandStore public void completeOperation(SafeCommandStore store) { if (store != current) - throw new IllegalStateException("This operation has already been cleared"); + throw illegalState("This operation has already been cleared"); try { current.complete(); @@ -638,7 +640,7 @@ public abstract class InMemoryCommandStore extends CommandStore return globalCommand.value(); if (entry.uninitialised) return uninitialised(entry); - throw new IllegalStateException("Could not find command for CFK for " + entry); + throw illegalState("Could not find command for CFK for " + entry); } @Override @@ -1019,7 +1021,8 @@ public abstract class InMemoryCommandStore extends CommandStore // TODO (desired, efficiency): this can be made more efficient by batching by epoch if (rangesForEpoch.coordinates(txnId).contains(key)) return; // already coordinates, no need to replicate - if (!rangesForEpoch.allBefore(txnId.epoch()).contains(key)) + // TODO (required): check this logic, esp. next line, matches C* + if (!rangesForEpoch.allAfter(txnId.epoch()).contains(key)) return; CommandsForKeys.registerNotWitnessed(this, key, txnId); @@ -1036,7 +1039,8 @@ public abstract class InMemoryCommandStore extends CommandStore Ranges ranges = deps.rangeDeps.ranges(txnId); if (rangesForEpoch.coordinates(txnId).intersects(ranges)) return; // already coordinates, no need to replicate - if (!rangesForEpoch.allBefore(txnId.epoch()).intersects(ranges)) + // TODO (required): check this logic, esp. next line, matches C* + if (!rangesForEpoch.allAfter(txnId.epoch()).intersects(ranges)) return; historicalRangeCommands.merge(txnId, ranges.slice(allRanges), Ranges::with); @@ -1319,7 +1323,7 @@ public abstract class InMemoryCommandStore extends CommandStore { boolean result = queue.add(runnable); if (!result) - throw new IllegalStateException("could not add item to queue"); + throw illegalState("could not add item to queue"); maybeRun(); } @@ -1399,9 +1403,9 @@ public abstract class InMemoryCommandStore extends CommandStore Thread current = Thread.currentThread(); Thread expected = thread; if (expected == null) - throw new IllegalStateException(String.format("Command store called from wrong thread; unexpected %s", current)); + throw illegalState(format("Command store called from wrong thread; unexpected %s", current)); if (expected != current) - throw new IllegalStateException(String.format("Command store called from the wrong thread. Expected %s, got %s", expected, current)); + throw illegalState(format("Command store called from the wrong thread. Expected %s, got %s", expected, current)); } @Override diff --git a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java index 9c4c2bd..267d569 100644 --- a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java +++ b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java @@ -26,6 +26,8 @@ import accord.local.Listeners; import accord.local.SafeCommand; import accord.primitives.TxnId; +import static accord.utils.Invariants.illegalState; + public class InMemorySafeCommand extends SafeCommand implements SafeState<Command> { private static final Supplier<GlobalCommand> INVALIDATED = () -> null; @@ -92,7 +94,7 @@ public class InMemorySafeCommand extends SafeCommand implements SafeState<Comman private void touch() { if (invalidated()) - throw new IllegalStateException("Cannot access invalidated " + this); + throw illegalState("Cannot access invalidated " + this); if (lazy != null) { global = lazy.get(); diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java index 575252d..6daac4b 100644 --- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java +++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java @@ -73,6 +73,7 @@ import static accord.local.SaveStatus.LocalExecution.NotReady; import static accord.local.SaveStatus.LocalExecution.WaitingToApply; import static accord.local.Status.Durability.MajorityOrInvalidated; import static accord.local.Status.PreApplied; +import static accord.utils.Invariants.illegalState; // TODO (desired, consider): consider propagating invalidations in the same way as we do applied // TODO (expected): report long-lived recurring transactions / operations @@ -151,7 +152,7 @@ public class SimpleProgressLog implements ProgressLog.Factory case Done: return false; case Investigating: - throw new IllegalStateException("Unexpected progress: " + progress); + throw illegalState("Unexpected progress: " + progress); case Expected: Invariants.paranoid(!isFree()); progress = NoProgress; @@ -237,7 +238,7 @@ public class SimpleProgressLog implements ProgressLog.Factory case NotWitnessed: // can't make progress if we haven't witnessed it yet case Committed: // can't make progress if we aren't yet ReadyToExecute case Done: // shouldn't be trying to make progress, as we're done - throw new IllegalStateException("Unexpected status: " + status); + throw illegalState("Unexpected status: " + status); case ReadyToExecute: // TODO (expected): we should have already exited this loop if this conditions is true diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index f852f97..6423fe2 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -46,6 +46,7 @@ import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; import static accord.primitives.Txn.Kind.LocalOnly; import static accord.primitives.Txn.Kind.NoOp; +import static accord.utils.Invariants.illegalState; /** * Captures state associated with a command store's adoption of a collection of new ranges. @@ -272,7 +273,7 @@ class Bootstrap private synchronized void cancel(SafeToRead state) { if (state.startedAt != Integer.MAX_VALUE) - throw new IllegalStateException("Tried to cancel starting a fetch that had already started"); + throw illegalState("Tried to cancel starting a fetch that had already started"); state.startedAt = Integer.MIN_VALUE; // unlink from other overlaps, and remove ourselves from states collection diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index cb157b7..e19be7a 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -53,6 +53,7 @@ import static accord.local.Status.Durability.ShardUniversal; import static accord.local.Status.Durability.UniversalOrInvalidated; import static accord.local.Status.Invalidated; import static accord.local.Status.KnownExecuteAt.ExecuteAtKnown; +import static accord.utils.Invariants.illegalState; import static accord.utils.SortedArrays.forEachIntersection; import static accord.utils.Utils.ensureImmutable; import static accord.utils.Utils.ensureMutable; @@ -182,7 +183,7 @@ public abstract class Command implements CommonAttributes { if (actual != expected) { - throw new IllegalStateException(format("Cannot instantiate %s for status %s. %s expected", + throw illegalState(format("Cannot instantiate %s for status %s. %s expected", actual.getSimpleName(), status, expected.getSimpleName())); } return status; @@ -210,7 +211,7 @@ public abstract class Command implements CommonAttributes case Truncated: return validateCommandClass(status, Truncated.class, klass); default: - throw new IllegalStateException("Unhandled status " + status); + throw illegalState("Unhandled status " + status); } } @@ -746,6 +747,11 @@ public abstract class Command implements CommonAttributes return erased(command.txnId(), durability, command.route()); } + public static Truncated erasedOrInvalidated(TxnId txnId, Status.Durability durability, Route<?> route) + { + return validate(new Truncated(txnId, SaveStatus.ErasedOrInvalidated, durability, route, null, EMPTY, null, null)); + } + public static Truncated erased(TxnId txnId, Status.Durability durability, Route<?> route) { return validate(new Truncated(txnId, SaveStatus.Erased, durability, route, null, EMPTY, null, null)); diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 737afe6..adeb217 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -28,9 +28,9 @@ import javax.annotation.Nullable; import accord.api.Agent; import accord.local.CommandStores.RangesForEpoch; +import accord.primitives.Keys; import accord.primitives.Range; import accord.primitives.Txn.Kind.Kinds; -import accord.utils.RelationMultiMap.SortedRelationList; import accord.utils.async.AsyncChain; import accord.api.ConfigurationService.EpochReady; @@ -72,6 +72,7 @@ import static accord.local.PreLoadContext.empty; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; +import static accord.utils.Invariants.illegalState; /** * Single threaded internal shard of accord transaction metadata @@ -293,7 +294,7 @@ public abstract class CommandStore implements AgentExecutor setRejectBefore(newRejectBefore); } - public final void markExclusiveSyncPointApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) + public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.rw() == ExclusiveSyncPoint); @@ -372,28 +373,28 @@ public abstract class CommandStore implements AgentExecutor { CommandStore cs = maybeCurrent(); if (cs == null) - throw new IllegalStateException("Attempted to access current CommandStore, but not running in a CommandStore"); + throw illegalState("Attempted to access current CommandStore, but not running in a CommandStore"); return cs; } protected static void register(CommandStore store) { if (!store.inStore()) - throw new IllegalStateException("Unable to register a CommandStore when not running in it; store " + store); + throw illegalState("Unable to register a CommandStore when not running in it; store " + store); CURRENT_STORE.set(store); } public static void checkInStore() { CommandStore store = maybeCurrent(); - if (store == null) throw new IllegalStateException("Expected to be running in a CommandStore but is not"); + if (store == null) throw illegalState("Expected to be running in a CommandStore but is not"); } public static void checkNotInStore() { CommandStore store = maybeCurrent(); if (store != null) - throw new IllegalStateException("Expected to not be running in a CommandStore, but running in " + store); + throw illegalState("Expected to not be running in a CommandStore, but running in " + store); } /** @@ -487,6 +488,7 @@ public abstract class CommandStore implements AgentExecutor setRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore)); DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); +// markUnsafeToRead(ranges); } // TODO (expected): we can immediately truncate dependencies locally once an exclusiveSyncPoint applies, we don't need to wait for the whole shard @@ -567,22 +569,58 @@ public abstract class CommandStore implements AgentExecutor // Note: we do not need to track the bootstraps we implicitly depend upon, because we will not serve any read requests until this has completed // and since we are a timestamp store, and we write only this will sort itself out naturally // TODO (required): make sure we have no races on HLC around SyncPoint else this resolution may not work (we need to know the micros equivalent timestamp of the snapshot) + class KeyState + { + Map<Integer, Keys> partiallyBootstrapping; + + /** + * Are the participating ranges for the txn fully covered by bootstrapping ranges for this command store + */ + boolean isFullyBootstrapping(WaitingOn.Update builder, Range range, int txnIdx) + { + if (builder.deps.keyDeps.foldEachKey(txnIdx, range, true, (r0, k, p) -> p && r0.contains(k))) + return true; + + if (partiallyBootstrapping == null) + partiallyBootstrapping = new HashMap<>(); + Keys prev = partiallyBootstrapping.get(txnIdx); + Keys remaining = prev; + if (remaining == null) remaining = builder.deps.keyDeps.participatingKeys(txnIdx); + else Invariants.checkState(!remaining.isEmpty()); + remaining = remaining.subtract(range); + if (prev == null) Invariants.checkState(!remaining.isEmpty()); + partiallyBootstrapping.put(txnIdx, remaining); + return remaining.isEmpty(); + } + } KeyDeps keyDeps = builder.deps.keyDeps; - redundantBefore().foldl(keyDeps.keys(), (e, b, d, p2, ki, kj) -> { - boolean isAppliedOrInvalidated = e.locallyAppliedOrInvalidatedBefore.compareTo(e.bootstrappedAt) >= 0; - int txnIdx = d.txnIds().find(isAppliedOrInvalidated ? e.locallyAppliedOrInvalidatedBefore : e.bootstrappedAt); - if (txnIdx < 0) txnIdx = -1 - txnIdx; - while (ki < kj) + redundantBefore().foldl(keyDeps.keys(), (e, s, d, b) -> { + // TODO (desired, efficiency): foldlInt so we can track the lower rangeidx bound and not revisit unnecessarily + // find the txnIdx below which we are known to be fully redundant locally due to having been applied or invalidated + int bootstrapIdx = d.txnIds().find(e.bootstrappedAt); + if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx; + int appliedIdx = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); + if (appliedIdx < 0) appliedIdx = -1 - appliedIdx; + + // remove intersecting transactions with known redundant txnId + // note that we must exclude all transactions that are pre-bootstrap, and perform the more complicated dance below, + // as these transactions may be only partially applied, and we may need to wait for them on another key. + if (appliedIdx > bootstrapIdx) + { + d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0, s0, txnIdx) -> { + b0.removeWaitingOn(txnIdx); + }); + } + + if (bootstrapIdx > 0) { - SortedRelationList<TxnId> txnIdsForKey = d.txnIdsForKeyIndex(ki++); - int ti = txnIdsForKey.findNext(0, txnIdx); - if (ti < 0) - ti = -1 - ti; - for (int i = 0 ; i < ti ; ++i) - b.removeWaitingOn(txnIdsForKey.getValueIndex(i), isAppliedOrInvalidated); + d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0, s0, r, txnIdx) -> { + if (b0.isWaitingOn(txnIdx) && s0.isFullyBootstrapping(b0, r, txnIdx)) + b0.removeWaitingOn(txnIdx); + }); } - return b; - }, builder, keyDeps, null, ignore -> false); + return s; + }, new KeyState(), keyDeps, builder, ignore -> false); /** * If we have to handle bootstrapping ranges for range transactions, these may only partially cover the @@ -591,21 +629,19 @@ public abstract class CommandStore implements AgentExecutor */ class RangeState { - final WaitingOn.Update builder; - int txnIdx; Range range; + int bootstrapIdx, appliedIdx; Map<Integer, Ranges> partiallyBootstrapping; - RangeState(WaitingOn.Update builder) - { - this.builder = builder; - } - /** * Are the participating ranges for the txn fully covered by bootstrapping ranges for this command store */ boolean isFullyBootstrapping(int rangeTxnIdx) { + // if all deps for the txnIdx are contained in the range, don't inflate any shared object state + if (builder.deps.rangeDeps.foldEachRange(rangeTxnIdx, range, true, (r1, r2, p) -> p && r1.contains(r2))) + return true; + if (partiallyBootstrapping == null) partiallyBootstrapping = new HashMap<>(); Ranges prev = partiallyBootstrapping.get(rangeTxnIdx); @@ -614,51 +650,45 @@ public abstract class CommandStore implements AgentExecutor else Invariants.checkState(!remaining.isEmpty()); remaining = remaining.subtract(Ranges.of(range)); if (prev == null) Invariants.checkState(!remaining.isEmpty()); - partiallyBootstrapping.put(txnIdx, remaining); + partiallyBootstrapping.put(rangeTxnIdx, remaining); return remaining.isEmpty(); } } RangeDeps rangeDeps = builder.deps.rangeDeps; // TODO (required, consider): slice to only those ranges we own, maybe don't even construct rangeDeps.covering() - redundantBefore().foldl(participants, (e, s, d, ps, pi, pj) -> { - // TODO (desired, efficiency): foldlInt so we can track the lower rangeidx bound and not revisit unnecessarily - WaitingOn.Update b = s.builder; + redundantBefore().foldl(participants, (e, s, d, b) -> { + int bootstrapIdx = d.txnIds().find(e.bootstrappedAt); + if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx; + s.bootstrapIdx = bootstrapIdx; + + int appliedIdx = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); + if (appliedIdx < 0) appliedIdx = -1 - appliedIdx; + s.appliedIdx = appliedIdx; + + // remove intersecting transactions with known redundant txnId + if (appliedIdx > bootstrapIdx) { - // find the txnIdx below which we are known to be fully redundant locally due to having been applied or invalidated - int tmp = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); - s.txnIdx = tmp < 0 ? -1 - tmp : tmp; + // TODO (desired): + // TODO (desired): move the bounds check into forEach, matching structure used for keys + d.forEach(e.range, b, s, (b0, s0, txnIdx) -> { + if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx) + b0.removeWaitingOnRangeIdx(txnIdx); + }); } - // remove intersecting transactions with known redundant txnId - d.forEach(ps, e.range, pi, pj, b, s, (b0, s0, txnIdx) -> { - if (txnIdx < s0.txnIdx) - b0.removeWaitingOnRangeIdx(txnIdx); - }); - if (e.bootstrappedAt.compareTo(e.locallyAppliedOrInvalidatedBefore) > 0) + + if (bootstrapIdx > 0) { - // if we have any ranges where bootstrap is ahead of the latest known fully redundant txnId, - // we have to do a more complicated dance since this may imply only partial redundancy - // (we may still depend on the transaction for some other range) - { - int tmp = d.txnIds().find(e.bootstrappedAt); - s.txnIdx = tmp < 0 ? -1 - tmp : tmp; - } + // if we have any ranges where bootstrap is involved, we have to do a more complicated dance since + // this may imply only partial redundancy (we may still depend on the transaction for some other range) s.range = e.range; - d.forEach(ps, e.range, pi, pj, b, s, (b0, s0, txnIdx) -> { - if (txnIdx < s0.txnIdx && b0.isWaitingOnRangeIdx(txnIdx)) - { - if (b0.deps.rangeDeps.foldEachRange(txnIdx, s0.range, true, (r1, r2, p) -> p && r1.contains(r2))) - { - b0.removeWaitingOnRangeIdx(txnIdx); - } - else if (s0.isFullyBootstrapping(txnIdx)) - { - b0.removeWaitingOnRangeIdx(txnIdx); - } - } + // TODO (desired): move the bounds check into forEach, matching structure used for keys + d.forEach(e.range, b, s, (b0, s0, txnIdx) -> { + if (txnIdx < s0.bootstrapIdx && b0.isWaitingOnRangeIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx)) + b0.removeWaitingOnRangeIdx(txnIdx); }); } return s; - }, new RangeState(builder), rangeDeps, participants, ignore -> false); + }, new RangeState(), rangeDeps, builder, ignore -> false); } public final boolean hasLocallyRedundantDependencies(TxnId minimumDependencyId, Timestamp executeAt, Participants<?> participantsOfWaitingTxn) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0dc8cea..11e635b 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -68,6 +68,7 @@ import static accord.local.PreLoadContext.empty; import static accord.primitives.EpochSupplier.constant; import static accord.primitives.Routables.Slice.Minimal; import static accord.utils.Invariants.checkArgument; +import static accord.utils.Invariants.illegalState; import static java.util.stream.Collectors.toList; /** @@ -250,7 +251,7 @@ public abstract class CommandStores public @Nonnull Ranges allAfter(long fromInclusive) { - return allInternal(floorIndex(fromInclusive), ranges.length); + return allInternal(Math.max(0, floorIndex(fromInclusive)), ranges.length); } public @Nonnull Ranges allUntil(long toInclusive) @@ -649,7 +650,7 @@ public abstract class CommandStores public CommandStore any() { ShardHolder[] shards = current.shards; - if (shards.length == 0) throw new IllegalStateException("Unable to get CommandStore; non defined"); + if (shards.length == 0) throw illegalState("Unable to get CommandStore; non defined"); return shards[supplier.random.nextInt(shards.length)].store; } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 0d8e14e..3692519 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -64,6 +64,7 @@ import static accord.local.Cleanup.ERASE; import static accord.local.Cleanup.TRUNCATE; import static accord.local.Cleanup.shouldCleanup; import static accord.local.Command.Truncated.erased; +import static accord.local.Command.Truncated.invalidated; import static accord.local.Command.Truncated.truncatedApply; import static accord.local.Command.Truncated.truncatedApplyWithOutcome; import static accord.local.Commands.EnsureAction.Add; @@ -505,7 +506,7 @@ public class Commands switch (updated.status()) { default: - throw new IllegalStateException("Unexpected status: " + updated.status()); + throw illegalState("Unexpected status: " + updated.status()); case NotDefined: case PreAccepted: case Accepted: @@ -667,13 +668,13 @@ public class Commands { Ranges ranges = safeStore.ranges().allAt(command.txnId().epoch()); ranges = command.route().slice(ranges, Minimal).participants().toRanges(); - safeStore.commandStore().markExclusiveSyncPointApplied(safeStore, command.txnId(), ranges); + safeStore.commandStore().markExclusiveSyncPointLocallyApplied(safeStore, command.txnId(), ranges); } safeStore.notifyListeners(safeCommand); return true; } default: - throw new IllegalStateException("Unexpected status: " + command.status()); + throw illegalState("Unexpected status: " + command.status()); } } @@ -772,7 +773,7 @@ public class Commands } else { - throw new IllegalStateException("We have a dependency to wait on, but have already finished waiting"); + throw illegalState("We have a dependency to wait on, but have already finished waiting"); } } @@ -849,8 +850,8 @@ public class Commands case TRUNCATE: Invariants.checkState(command.saveStatus().compareTo(TruncatedApply) < 0); - if (command.hasBeen(PreCommitted)) result = truncatedApply(command, Route.tryCastToFullRoute(maybeFullRoute)); - else result = command; // do nothing; we don't have enough information + if (!command.hasBeen(PreCommitted)) result = invalidated(command); + else result = truncatedApply(command, Route.tryCastToFullRoute(maybeFullRoute)); break; case ERASE: @@ -1319,7 +1320,7 @@ public class Commands { switch (action) { - default: throw new IllegalStateException("Unexpected action: " + action); + default: throw illegalState("Unexpected action: " + action); case Ignore: return true; diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 6fc883e..3f6acf2 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -95,6 +95,9 @@ import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import net.nicoulaj.compilecommand.annotations.Inline; +import static accord.utils.Invariants.illegalState; +import static java.lang.String.format; + public class Node implements ConfigurationService.Listener, NodeTimeService { private static final Logger logger = LoggerFactory.getLogger(Node.class); @@ -520,7 +523,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService { CommandStore current = CommandStore.maybeCurrent(); if (current != null && current != executor) - throw new IllegalStateException(String.format("Used wrong CommandStore %s; current is %s", executor, current)); + throw illegalState(format("Used wrong CommandStore %s; current is %s", executor, current)); } // send to a specific node @@ -661,7 +664,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService if (ranges.isEmpty()) // should not really happen, but pick some other replica to serve as home key ranges = topology().globalForEpoch(txnId.epoch()).ranges(); if (ranges.isEmpty()) - throw new IllegalStateException("Unable to select a HomeKey as the topology does not have any ranges for epoch " + txnId.epoch()); + throw illegalState("Unable to select a HomeKey as the topology does not have any ranges for epoch " + txnId.epoch()); Range range = ranges.get(random.nextInt(ranges.size())); return range.someIntersectingRoutingKey(null); } diff --git a/accord-core/src/main/java/accord/local/RedundantStatus.java b/accord-core/src/main/java/accord/local/RedundantStatus.java index 63eecd3..3afa8c1 100644 --- a/accord-core/src/main/java/accord/local/RedundantStatus.java +++ b/accord-core/src/main/java/accord/local/RedundantStatus.java @@ -65,7 +65,8 @@ public enum RedundantStatus /** * The relevant owned ranges are redundant, meaning any intersecting transaction is known to be either applied or invalidated - * via a sync point that has applied locally and on all healthy shards. + * via a sync point that has applied locally AND on all healthy shards. Note that this status CANNOT be taken if + * we are not ALSO LOCALLY_REDUNDANT, so we can use this to cleanup commands < PreCommitted. * * Note that this status overrides PRE_BOOTSTRAP_OR_STALE, since it implies the transaction has applied. */ diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 1616a42..015c75b 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -35,6 +35,7 @@ import accord.primitives.Deps; import accord.primitives.EpochSupplier; import accord.primitives.Participants; import accord.primitives.Ranges; +import accord.primitives.Route; import accord.primitives.Seekable; import accord.primitives.Seekables; import accord.primitives.Timestamp; @@ -44,6 +45,9 @@ import accord.primitives.Unseekables; import static accord.local.Cleanup.NO; import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY; +import static accord.local.SaveStatus.Erased; +import static accord.local.SaveStatus.ErasedOrInvalidated; +import static accord.primitives.Route.isFullRoute; /** * A CommandStore with exclusive access; a reference to this should not be retained outside of the scope of the method @@ -90,7 +94,7 @@ public abstract class SafeCommandStore if (command.saveStatus().isUninitialised()) { if (commandStore().durableBefore().isUniversal(txnId, unseekable)) - return new ErasedSafeCommand(txnId); + return new ErasedSafeCommand(txnId, ErasedOrInvalidated); } return maybeTruncate(safeCommand, command, txnId, null); } @@ -124,7 +128,7 @@ public abstract class SafeCommandStore if (command.saveStatus().isUninitialised()) { if (Cleanup.isSafeToCleanup(commandStore().durableBefore(), txnId, unseekables)) - return new ErasedSafeCommand(txnId); + return new ErasedSafeCommand(txnId, isFullRoute(unseekables) ? Erased : ErasedOrInvalidated); } return maybeTruncate(safeCommand, command, toEpoch, unseekables); } diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java b/accord-core/src/main/java/accord/local/SaveStatus.java index 1043c32..7851056 100644 --- a/accord-core/src/main/java/accord/local/SaveStatus.java +++ b/accord-core/src/main/java/accord/local/SaveStatus.java @@ -78,7 +78,9 @@ public enum SaveStatus TruncatedApplyWithDeps (Status.Truncated, Full, DefinitionErased, ExecuteAtKnown, DepsKnown, Outcome.Apply, CleaningUp), TruncatedApplyWithOutcome (Status.Truncated, Full, DefinitionErased, ExecuteAtKnown, DepsErased, Outcome.Apply, CleaningUp), TruncatedApply (Status.Truncated, Full, DefinitionErased, ExecuteAtKnown, DepsErased, Outcome.WasApply, CleaningUp), - // Expunged means the command is either entirely pre-bootstrap or stale and we don't have enough information to move it through the normal redundant->truncation path (i.e. it might be truncated, it might be applied) + // ErasedOrInvalidated means the command is redundant for the shard and data being queried, but no FullRoute is known, so it is not known to be globally Erased + ErasedOrInvalidated (Status.Truncated, Maybe, DefinitionUnknown,ExecuteAtUnknown, DepsUnknown, Unknown, CleaningUp), + // NOTE: Erased should ONLY be adopted on a replica that knows EVERY shard has successfully applied the transaction at all healthy replicas. Erased (Status.Truncated, Maybe, DefinitionErased, ExecuteAtErased, DepsErased, Outcome.Erased, CleaningUp), Invalidated (Status.Invalidated, CleaningUp), ; @@ -256,6 +258,13 @@ public enum SaveStatus switch (status) { default: throw new AssertionError("Unexpected status: " + status); + case ErasedOrInvalidated: + if (known.outcome.isInvalidated()) + return Invalidated; + + if (!known.outcome.isOrWasApply() || known.executeAt == ExecuteAtKnown) + return ErasedOrInvalidated; + case Erased: if (!known.outcome.isOrWasApply() || known.executeAt != ExecuteAtKnown) return Erased; diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java b/accord-core/src/main/java/accord/local/SerializerSupport.java index 6816d4e..d08de28 100644 --- a/accord-core/src/main/java/accord/local/SerializerSupport.java +++ b/accord-core/src/main/java/accord/local/SerializerSupport.java @@ -50,6 +50,7 @@ import static accord.messages.MessageType.PROPAGATE_COMMIT_MSG; import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG; import static accord.primitives.PartialTxn.merge; import static accord.utils.Invariants.checkState; +import static accord.utils.Invariants.illegalState; @VisibleForImplementation public class SerializerSupport @@ -203,7 +204,7 @@ public class SerializerSupport } else { - throw new IllegalStateException(errorMessage); + throw illegalState(errorMessage); } /* @@ -253,7 +254,7 @@ public class SerializerSupport switch (status) { default: - throw new IllegalStateException("Unhandled SaveStatus: " + status); + throw illegalState("Unhandled SaveStatus: " + status); case TruncatedApplyWithOutcome: case TruncatedApplyWithDeps: Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES); @@ -278,6 +279,8 @@ public class SerializerSupport } case TruncatedApply: return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result); + case ErasedOrInvalidated: + return Command.Truncated.erasedOrInvalidated(attrs.txnId(), attrs.durability(), attrs.route()); case Erased: return Command.Truncated.erased(attrs.txnId(), attrs.durability(), attrs.route()); case Invalidated: diff --git a/accord-core/src/main/java/accord/local/Status.java b/accord-core/src/main/java/accord/local/Status.java index 1f67b4b..f69817e 100644 --- a/accord-core/src/main/java/accord/local/Status.java +++ b/accord-core/src/main/java/accord/local/Status.java @@ -275,9 +275,6 @@ public enum Status switch (outcome) { default: throw new AssertionError(); - case Erased: - return Status.Truncated; - case Invalidated: return Status.Invalidated; @@ -286,6 +283,7 @@ public enum Status if (executeAt.isDecidedAndKnownToExecute() && definition.isKnown() && deps.hasDecidedDeps()) return PreApplied; + case Erased: case Unknown: if (executeAt.isDecidedAndKnownToExecute() && definition.isKnown() && deps.hasDecidedDeps()) return Committed; diff --git a/accord-core/src/main/java/accord/messages/AbstractExecute.java b/accord-core/src/main/java/accord/messages/AbstractExecute.java index 6784f88..e9c4f68 100644 --- a/accord-core/src/main/java/accord/messages/AbstractExecute.java +++ b/accord-core/src/main/java/accord/messages/AbstractExecute.java @@ -43,6 +43,7 @@ import static accord.local.SaveStatus.LocalExecution.WaitingToExecute; import static accord.local.Status.Committed; import static accord.messages.ReadData.ReadNack.NotCommitted; import static accord.messages.ReadData.ReadNack.Redundant; +import static accord.utils.Invariants.illegalState; import static accord.utils.MapReduceConsume.forEach; public abstract class AbstractExecute extends ReadData implements Command.TransientListener, EpochSupplier @@ -250,7 +251,7 @@ public abstract class AbstractExecute extends ReadData implements Command.Transi // nothing to see here break; case RETURNED: - throw new IllegalStateException("ReadOk was sent, yet ack called again"); + throw illegalState("ReadOk was sent, yet ack called again"); default: throw new AssertionError("Unknown state: " + state); } @@ -270,7 +271,7 @@ public abstract class AbstractExecute extends ReadData implements Command.Transi switch (state) { case RETURNED: - throw new IllegalStateException("ReadOk was sent, yet ack called again"); + throw illegalState("ReadOk was sent, yet ack called again"); case OBSOLETE: logger.debug("After the read completed for txn {}, the result was marked obsolete", txnId); if (fail != null) diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java index 687372d..0090f2e 100644 --- a/accord-core/src/main/java/accord/messages/Accept.java +++ b/accord-core/src/main/java/accord/messages/Accept.java @@ -111,7 +111,7 @@ public class Accept extends TxnRequest.WithUnsynced<Accept.AcceptReply> private PartialDeps calculatePartialDeps(SafeCommandStore safeStore) { - Ranges ranges = safeStore.ranges().allBetween(minUnsyncedEpoch, txnId); + Ranges ranges = safeStore.ranges().allBetween(minUnsyncedEpoch, executeAt); return PreAccept.calculatePartialDeps(safeStore, txnId, keys, EpochSupplier.constant(minUnsyncedEpoch), executeAt, ranges); } diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java index e590f7e..3b901db 100644 --- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java @@ -35,6 +35,8 @@ import accord.primitives.Seekables; import accord.primitives.TxnId; import accord.primitives.Writes; +import static accord.utils.Invariants.illegalState; + /* * Used by local and global inclusive sync points to effect the sync point at each node * Combines commit, execute (with nothing really to execute), and apply into one request/response @@ -86,9 +88,9 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied switch (applyReply) { default: - throw new IllegalStateException("Unexpected ApplyReply"); + throw illegalState("Unexpected ApplyReply"); case Insufficient: - throw new IllegalStateException("ApplyThenWaitUntilApplied is always sent with a maximal `Commit` so how can `Apply` have an `Insufficient` result"); + throw illegalState("ApplyThenWaitUntilApplied is always sent with a maximal `Commit` so how can `Apply` have an `Insufficient` result"); case Redundant: case Applied: // In both cases it's fine to continue to process and return a response saying diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index 5341aea..a820dab 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -51,6 +51,7 @@ import static accord.local.Status.Phase; import static accord.local.Status.PreAccepted; import static accord.local.Status.PreCommitted; import static accord.messages.PreAccept.calculatePartialDeps; +import static accord.utils.Invariants.illegalState; public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> { @@ -97,7 +98,7 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> switch (Commands.recover(safeStore, safeCommand, txnId, partialTxn, route, progressKey, ballot)) { default: - throw new IllegalStateException("Unhandled Outcome"); + throw illegalState("Unhandled Outcome"); case Redundant: case Truncated: @@ -356,7 +357,7 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> { try (Deps.Builder builder = Deps.builder()) { - safeStore.mapReduce(keys, ranges, KeyHistory.ALL, startedBefore.rw().witnesses(), STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null, + safeStore.mapReduce(keys, ranges, KeyHistory.ALL, startedBefore.rw().witnessedBy(), STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null, (p1, keyOrRange, txnId, executeAt, status, deps, prev) -> builder.add(keyOrRange, txnId), null, (Deps.AbstractBuilder<Deps>)builder); return builder.build(); } @@ -386,7 +387,7 @@ public class BeginRecovery extends TxnRequest<BeginRecovery.RecoverReply> * witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction * has not witnessed us we can safely invalidate it. */ - return safeStore.mapReduce(keys, ranges, KeyHistory.ALL, startedAfter.rw().witnesses(), EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null, + return safeStore.mapReduce(keys, ranges, KeyHistory.ALL, startedAfter.rw().witnessedBy(), EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null, (p1, keyOrRange, txnId, executeAt, status, deps, prev) -> true, null, false, i -> i); } } diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java index 0b9b6c7..2d3483a 100644 --- a/accord-core/src/main/java/accord/messages/PreAccept.java +++ b/accord-core/src/main/java/accord/messages/PreAccept.java @@ -262,7 +262,12 @@ public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> implements builder.add(testTxnId, keyOrRange, status, testExecuteAt, deps.get()); return in; }, null, builder); - return builder.buildPartialDeps(safeStore, ranges); + + try (Deps.AbstractBuilder<PartialDeps> redundantBuilder = new PartialDeps.Builder(ranges)) + { + PartialDeps redundant = safeStore.commandStore().redundantBefore().collectDeps(keys, redundantBuilder, minEpoch, executeAt).build(); + return builder.buildPartialDeps(safeStore, ranges).with(redundant); + } } /** diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index e1a3679..8bc333d 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -52,6 +52,7 @@ import static accord.local.Status.Phase.Cleanup; import static accord.local.Status.PreApplied; import static accord.messages.CheckStatus.WithQuorum.HasQuorum; import static accord.primitives.Routables.Slice.Minimal; +import static accord.utils.Invariants.illegalState; public class Propagate implements EpochSupplier, LocalRequest<Status.Known> { @@ -74,7 +75,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> // this is a WHOLE NODE measure, so if commit epoch has more ranges we do not count as committed if we can only commit in coordination epoch public final Status.Known achieved; public final FoundKnownMap known; - public final boolean isTruncated; + public final boolean isShardTruncated; @Nullable public final PartialTxn partialTxn; @Nullable public final PartialDeps committedDeps; public final long toEpoch; @@ -100,7 +101,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> @Nullable RoutingKey progressKey, Status.Known achieved, FoundKnownMap known, - boolean isTruncated, + boolean isShardTruncated, @Nullable PartialTxn partialTxn, @Nullable PartialDeps committedDeps, long toEpoch, @@ -118,7 +119,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> this.progressKey = progressKey; this.achieved = achieved; this.known = known; - this.isTruncated = isTruncated; + this.isShardTruncated = isShardTruncated; this.partialTxn = partialTxn; this.committedDeps = committedDeps; this.toEpoch = toEpoch; @@ -144,7 +145,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> return; } - Invariants.checkState(sourceEpoch == txnId.epoch() || (full.executeAt != null && sourceEpoch == full.executeAt.epoch()) || full.maxKnowledgeSaveStatus == SaveStatus.Erased); + Invariants.checkState(sourceEpoch == txnId.epoch() || (full.executeAt != null && sourceEpoch == full.executeAt.epoch()) || full.maxKnowledgeSaveStatus == SaveStatus.Erased || full.maxKnowledgeSaveStatus == SaveStatus.ErasedOrInvalidated); full = full.finish(route, withQuorum); route = Invariants.nonNull(full.route); @@ -189,7 +190,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> } } - boolean isTruncated = withQuorum == HasQuorum && full.isTruncatedResponse(covering); + boolean isShardTruncated = withQuorum == HasQuorum && full.isTruncatedResponse(covering); PartialTxn partialTxn = full.partialTxn; if (achieved.definition.isKnown()) @@ -200,7 +201,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> committedDeps = full.committedDeps.slice(sliceRanges).reconstitutePartial(covering); Propagate propagate = - new Propagate(txnId, route, full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.durability, full.homeKey, progressKey, achieved, full.map, isTruncated, partialTxn, committedDeps, toEpoch, full.executeAtIfKnown(), full.writes, full.result, callback); + new Propagate(txnId, route, full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.durability, full.homeKey, progressKey, achieved, full.map, isShardTruncated, partialTxn, committedDeps, toEpoch, full.executeAtIfKnown(), full.writes, full.result, callback); node.localRequest(propagate); } @@ -214,14 +215,13 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> @Override public Seekables<?, ?> keys() { - // TODO (now): this is insufficient, as we may applyOrUpgradeTruncated - // since we don't calculateDeps, we might not need to load CFK to register with it - if (achieved.definition.isKnown()) + if (partialTxn != null) return partialTxn.keys(); - else if (achieved.deps.hasProposedOrDecidedDeps()) + + if (committedDeps != null) return committedDeps.keyDeps.keys(); - else - return Keys.EMPTY; + + return Keys.EMPTY; } @Override @@ -259,10 +259,9 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> Timestamp executeAtIfKnown = command.executeAtIfKnown(committedExecuteAt); Status.Known achieved = this.achieved; - if (isTruncated) + if (isShardTruncated) { // TODO (required): do not markShardStale for reads; in general optimise handling of case where we cannot recover a known no-op transaction - achieved = applyOrUpgradeTruncated(safeStore, safeCommand, command, executeAtIfKnown); if (achieved == null) return null; @@ -295,13 +294,13 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> switch (propagate) { - default: throw new IllegalStateException("Unexpected status: " + propagate); - case Truncated: throw new IllegalStateException("Status expected to be handled elsewhere: " + propagate); + default: throw illegalState("Unexpected status: " + propagate); + case Truncated: throw illegalState("Status expected to be handled elsewhere: " + propagate); case Accepted: case AcceptedInvalidate: // we never "propagate" accepted statuses as these are essentially votes, // and contribute nothing to our local state machine - throw new IllegalStateException("Invalid states to propagate: " + propagate); + throw illegalState("Invalid states to propagate: " + propagate); case Invalidated: Commands.commitInvalidate(safeStore, safeCommand, route); @@ -406,12 +405,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> return null; } - // if the command has been truncated globally, then we should expect to apply it - // if we cannot obtain enough information from a majority to do so then we have been left behind - Status.Known required = PreApplied.minKnown; - Status.Known requireExtra = required.subtract(command.known()); // the extra information we need to reach pre-applied - Ranges achieveRanges = known.knownFor(requireExtra, ranges); // the ranges for which we can successfully achieve this - + participants = participants.slice(ranges, Minimal); if (participants.isEmpty()) { // we only coordinate this transaction, so being unable to retrieve its state does not imply any staleness @@ -420,6 +414,12 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> return null; } + // if the command has been truncated globally, then we should expect to apply it + // if we cannot obtain enough information from a majority to do so then we have been left behind + Status.Known required = PreApplied.minKnown; + Status.Known requireExtra = required.subtract(command.known()); // the extra information we need to reach pre-applied + Ranges achieveRanges = known.knownFor(requireExtra, ranges); // the ranges for which we can already successfully achieve this + // any ranges we execute but cannot achieve the pre-applied status for have been left behind and are stale Ranges staleRanges = ranges.subtract(achieveRanges); Participants<?> staleParticipants = participants.slice(staleRanges, Minimal); @@ -470,6 +470,7 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> public MessageType type() { // TODO (now): this logic doesn't work now we permit upgrading; need to pick the maximum *possible* status we might propagate + // might be better to switch (achieved.propagatesStatus()) { case Applied: @@ -499,11 +500,11 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> { switch (outcome) { - default: throw new IllegalStateException("Unknown outcome: " + outcome); + default: throw illegalState("Unknown outcome: " + outcome); case Redundant: case Success: return; - case Insufficient: throw new IllegalStateException("Should have enough information"); + case Insufficient: throw illegalState("Should have enough information"); } } @@ -511,11 +512,11 @@ public class Propagate implements EpochSupplier, LocalRequest<Status.Known> { switch (outcome) { - default: throw new IllegalStateException("Unknown outcome: " + outcome); + default: throw illegalState("Unknown outcome: " + outcome); case Redundant: case Success: return; - case Insufficient: throw new IllegalStateException("Should have enough information"); + case Insufficient: throw illegalState("Should have enough information"); } } diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java index 905c910..83c1799 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java +++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java @@ -188,6 +188,20 @@ public abstract class AbstractKeys<K extends RoutableKey> implements Iterable<K> return SortedArrays.subtractWithMultipleMatches(keys, ranges.ranges, factory, (k, r) -> -r.compareTo(k), Range::compareTo); } + protected static <K extends RoutableKey> K[] subtract(Range range, K[] keys) + { + boolean isStartInclusive = range.startInclusive(); + int start = Arrays.binarySearch(keys, 0, keys.length, range.start(), RoutableKey::compareTo); + if (start < 0) start = -1 - start; + else if (!isStartInclusive) ++start; + int end = Arrays.binarySearch(keys, 0, keys.length, range.end(), RoutableKey::compareTo); + if (end < 0) end = -1 - end; + else if (!isStartInclusive) ++end; + if (start >= end) + return Arrays.copyOf(keys, 0); + return Arrays.copyOfRange(keys, start, end); + } + protected K[] intersect(AbstractKeys<K> that, ObjectBuffers<K> buffers) { return SortedArrays.linearIntersection(this.keys, that.keys, buffers); diff --git a/accord-core/src/main/java/accord/primitives/KeyDeps.java b/accord-core/src/main/java/accord/primitives/KeyDeps.java index dae6729..d4b47e8 100644 --- a/accord-core/src/main/java/accord/primitives/KeyDeps.java +++ b/accord-core/src/main/java/accord/primitives/KeyDeps.java @@ -22,8 +22,10 @@ import accord.api.Key; import accord.api.RoutingKey; import accord.utils.ArrayBuffers; import accord.utils.IndexedBiConsumer; +import accord.utils.IndexedTriConsumer; import accord.utils.SortedArrays.SortedArrayList; import accord.utils.SymmetricComparator; +import accord.utils.TriFunction; import java.util.*; import java.util.function.BiConsumer; @@ -32,14 +34,16 @@ import java.util.function.Function; import java.util.function.Predicate; import static accord.utils.ArrayBuffers.*; +import static accord.utils.Invariants.illegalState; import static accord.utils.RelationMultiMap.*; import static accord.utils.SortedArrays.Search.FAST; +// TODO (desired, consider): switch to RoutingKey? Would mean adopting execution dependencies less precisely, but saving ser/deser of large keys +// TODO (expected): consider which projection we should default to on de/serialise /** * A collection of dependencies for a transaction, organised by the key the dependency is adopted via. * An inverse map from TxnId to Key may also be constructed and stored in this collection. */ -// TODO (desired, consider): switch to RoutingKey? Would mean adopting execution dependencies less precisely, but saving ser/deser of large keys public class KeyDeps implements Iterable<Map.Entry<Key, TxnId>> { public static final KeyDeps NONE = new KeyDeps(Keys.EMPTY, NO_TXNIDS, NO_INTS); @@ -270,14 +274,19 @@ public class KeyDeps implements Iterable<Map.Entry<Key, TxnId>> public Keys participatingKeys(TxnId txnId) { - int txnIdIndex = Arrays.binarySearch(txnIds, txnId); - if (txnIdIndex < 0) + int txnIdx = Arrays.binarySearch(txnIds, txnId); + if (txnIdx < 0) return Keys.EMPTY; + return participatingKeys(txnIdx); + } + + public Keys participatingKeys(int txnIdx) + { int[] txnIdsToKeys = txnIdsToKeys(); - int start = txnIdIndex == 0 ? txnIds.length : txnIdsToKeys[txnIdIndex - 1]; - int end = txnIdsToKeys[txnIdIndex]; + int start = txnIdx == 0 ? txnIds.length : txnIdsToKeys[txnIdx - 1]; + int end = txnIdsToKeys[txnIdx]; if (start == end) return Keys.EMPTY; @@ -292,14 +301,14 @@ public class KeyDeps implements Iterable<Map.Entry<Key, TxnId>> { int txnIdIndex = Arrays.binarySearch(txnIds, txnId); if (txnIdIndex < 0) - throw new IllegalStateException("Cannot create a RouteFragment without any keys"); + throw illegalState("Cannot create a RouteFragment without any keys"); int[] txnIdsToKeys = txnIdsToKeys(); int start = txnIdIndex == 0 ? txnIds.length : txnIdsToKeys[txnIdIndex - 1]; int end = txnIdsToKeys[txnIdIndex]; if (start == end) - throw new IllegalStateException("Cannot create a RouteFragment without any keys"); + throw illegalState("Cannot create a RouteFragment without any keys"); RoutingKey[] result = new RoutingKey[end - start]; result[0] = keys.get(txnIdsToKeys[start]).toUnseekable(); @@ -410,29 +419,57 @@ public class KeyDeps implements Iterable<Map.Entry<Key, TxnId>> public <P1, P2> void forEach(Ranges ranges, int inclIdx, int exclIdx, P1 p1, P2 p2, IndexedBiConsumer<P1, P2> forEach) { - int[] keysToTxnIds = keysToTxnIds(); for (int i = 0; i < ranges.size(); ++i) + forEach(ranges.get(i), inclIdx, exclIdx, p1, p2, forEach); + } + + public <P1, P2> void forEach(Range range, P1 p1, P2 p2, IndexedBiConsumer<P1, P2> forEach) + { + forEach(range, 0, keys.size(), p1, p2, forEach); + } + + public <P1, P2> void forEach(Range range, int inclKeyIdx, int exclKeyIdx, P1 p1, P2 p2, IndexedBiConsumer<P1, P2> forEach) + { + forEach(range, inclKeyIdx, exclKeyIdx, forEach, p1, p2, IndexedBiConsumer::accept); + } + + public <P1, P2, P3> void forEach(Range range, P1 p1, P2 p2, P3 p3, IndexedTriConsumer<P1, P2, P3> forEach) + { + forEach(range, 0, keys.size(), p1, p2, p3, forEach); + } + + public <P1, P2, P3> void forEach(Range range, int inclKeyIdx, int exclKeyIdx, P1 p1, P2 p2, P3 p3, IndexedTriConsumer<P1, P2, P3> forEach) + { + int[] keysToTxnIds = keysToTxnIds(); + int start = keys.indexOf(range.start()); + if (start < 0) start = -1 - start; + else if (!range.startInclusive()) ++start; + start = startOffset(start); + + int end = keys.indexOf(range.end()); + if (end < 0) end = -1 - end; + else if (range.endInclusive()) ++end; + end = startOffset(end); + + while (start < end) { - Range range = ranges.get(i); - int start = keys.indexOf(range.start()); - if (start < 0) start = -1 - start; - else if (!range.startInclusive()) ++start; - start = startOffset(start); - - int end = keys.indexOf(range.end()); - if (end < 0) end = -1 - end; - else if (range.endInclusive()) ++end; - end = startOffset(end); - - while (start < end) - { - int txnIdx = keysToTxnIds[start++]; - if (txnIdx >= inclIdx && txnIdx < exclIdx) - forEach.accept(p1, p2, txnIdx); - } + int txnIdx = keysToTxnIds[start++]; + if (txnIdx >= inclKeyIdx && txnIdx < exclKeyIdx) + forEach.accept(p1, p2, p3, txnIdx); } } + public <P1, V> V foldEachKey(int txnIdx, P1 p1, V accumulate, TriFunction<P1, Key, V, V> fold) + { + int[] txnIdsToKeys = txnIdsToKeys(); + + int start = txnIdx == 0 ? txnIds.length : txnIdsToKeys[txnIdx - 1]; + int end = txnIdsToKeys[txnIdx]; + for (int i = start; i < end ; ++i) + accumulate = fold.apply(p1, keys.get(txnIdsToKeys[i]), accumulate); + return accumulate; + } + public Keys keys() { return keys; diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java index 74db63a..cfbb812 100644 --- a/accord-core/src/main/java/accord/primitives/Keys.java +++ b/accord-core/src/main/java/accord/primitives/Keys.java @@ -104,6 +104,11 @@ public class Keys extends AbstractKeys<Key> implements Seekables<Key, Keys> return new Keys(trg); } + public Keys subtract(Range range) + { + return wrap(subtract(range, keys)); + } + public static Keys of(Key key) { return new Keys(new Key[] { key }); diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java b/accord-core/src/main/java/accord/primitives/PartialTxn.java index 262b3d6..06d470e 100644 --- a/accord-core/src/main/java/accord/primitives/PartialTxn.java +++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java @@ -26,6 +26,8 @@ import accord.api.Update; import javax.annotation.Nullable; +import static accord.utils.Invariants.illegalState; + public interface PartialTxn extends Txn { // TODO (expected): we no longer need this if everyone has a FullRoute @@ -125,7 +127,7 @@ public interface PartialTxn extends Txn public Txn reconstitute(FullRoute<?> route) { if (!covers(route) || query() == null) - throw new IllegalStateException("Incomplete PartialTxn: " + this + ", route: " + route); + throw illegalState("Incomplete PartialTxn: " + this + ", route: " + route); return new Txn.InMemory(kind(), keys(), read(), query(), update()); } @@ -134,7 +136,7 @@ public interface PartialTxn extends Txn public PartialTxn reconstitutePartial(Ranges covering) { if (!covers(covering)) - throw new IllegalStateException("Incomplete PartialTxn: " + this + ", covering: " + covering); + throw illegalState("Incomplete PartialTxn: " + this + ", covering: " + covering); if (this.covering.containsAll(covering)) return this; diff --git a/accord-core/src/main/java/accord/primitives/Range.java b/accord-core/src/main/java/accord/primitives/Range.java index 1a8f937..a656c9b 100644 --- a/accord-core/src/main/java/accord/primitives/Range.java +++ b/accord-core/src/main/java/accord/primitives/Range.java @@ -28,6 +28,7 @@ import java.util.Objects; import javax.annotation.Nullable; +import static accord.utils.Invariants.illegalState; import static accord.utils.SortedArrays.Search.CEIL; import static accord.utils.SortedArrays.Search.FAST; @@ -210,7 +211,7 @@ public abstract class Range implements Comparable<RoutableKey>, Unseekable, Seek if (start.compareTo(end) >= 0) throw new IllegalArgumentException(start + " >= " + end); if (startInclusive() == endInclusive()) - throw new IllegalStateException("Range must have one side inclusive, and the other exclusive. Range of different types should not be mixed."); + throw illegalState("Range must have one side inclusive, and the other exclusive. Range of different types should not be mixed."); this.start = start; this.end = end; } diff --git a/accord-core/src/main/java/accord/primitives/RangeDeps.java b/accord-core/src/main/java/accord/primitives/RangeDeps.java index cbff7aa..72d9558 100644 --- a/accord-core/src/main/java/accord/primitives/RangeDeps.java +++ b/accord-core/src/main/java/accord/primitives/RangeDeps.java @@ -88,6 +88,7 @@ public class RangeDeps implements Iterable<Map.Entry<Range, TxnId>> private SearchableRangeList searchable; private Ranges covering; + // TODO (expected): merge by TxnId key, not by range, so that we can merge overlapping ranges for same TxnId public static <T1, T2> RangeDeps merge(List<T1> merge, Function<T1, T2> getter1, Function<T2, RangeDeps> getter2) { try (LinearMerger<Range, TxnId, RangeDeps> linearMerger = new LinearMerger<>(ADAPTER)) diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java index 10cbf9d..0a037d6 100644 --- a/accord-core/src/main/java/accord/topology/Topologies.java +++ b/accord-core/src/main/java/accord/topology/Topologies.java @@ -28,6 +28,8 @@ import accord.utils.Invariants; import java.util.*; +import static accord.utils.Invariants.illegalState; + // TODO (desired, efficiency/clarity): since Topologies are rarely needed, should optimise API for single topology case // (e.g. at least implementing Topologies by Topology) public interface Topologies extends TopologySorter @@ -438,7 +440,7 @@ public interface Topologies extends TopologySorter switch (topologies.size()) { case 0: - throw new IllegalStateException("Unable to build an empty Topologies"); + throw illegalState("Unable to build an empty Topologies"); case 1: return new Single(sorter, topologies.get(0)); default: diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 5a0c9e9..99336ec 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -52,6 +52,7 @@ import org.agrona.collections.IntArrayList; import javax.annotation.Nullable; +import static accord.utils.Invariants.illegalArgument; import static accord.utils.SortedArrays.Search.FLOOR; import static accord.utils.SortedArrays.exponentialSearch; @@ -231,7 +232,7 @@ public class Topology { int i = subsetOfRanges.indexOf(key); if (i < 0) - throw new IllegalArgumentException("Range not found for " + key); + throw illegalArgument("Range not found for " + key); return shards[supersetIndexes[i]]; } diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 8884781..2ef247f 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -49,6 +49,7 @@ import static accord.coordinate.tracking.RequestStatus.Success; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; import static accord.primitives.Routables.Slice.Minimal; import static accord.utils.Invariants.checkArgument; +import static accord.utils.Invariants.illegalState; import static accord.utils.Invariants.nonNull; /** @@ -637,7 +638,7 @@ public class TopologyManager { EpochState epochState = epochs.get(epoch); if (epochState == null) - throw new IllegalStateException("Unknown epoch " + epoch); + throw illegalState("Unknown epoch " + epoch); return epochState.local(); } diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java b/accord-core/src/main/java/accord/utils/ArrayBuffers.java index 85eaa2f..a554559 100644 --- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java +++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java @@ -27,6 +27,8 @@ import java.lang.reflect.Array; import java.util.Arrays; import java.util.function.IntFunction; +import static accord.utils.Invariants.illegalState; + /** * A set of utility classes and interfaces for managing a collection of buffers for arrays of certain types. * @@ -569,7 +571,7 @@ public class ArrayBuffers public int lengthOfLast(T[] buffer) { if (length == -1) - throw new IllegalStateException("Attempted to get last length but no call to complete called"); + throw illegalState("Attempted to get last length but no call to complete called"); return length; } } diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java index 361c29a..43603fe 100644 --- a/accord-core/src/main/java/accord/utils/Invariants.java +++ b/accord-core/src/main/java/accord/utils/Invariants.java @@ -39,14 +39,14 @@ public class Invariants return DEBUG; } - public static void illegalState(String msg) + public static IllegalStateException illegalState(String msg) { throw new IllegalStateException(msg); } - private static void illegalState() + public static IllegalStateException illegalState() { - illegalState(null); + throw illegalState(null); } public static RuntimeException illegalArgument(String msg) diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java index 9c3184c..eeccfac 100644 --- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java @@ -29,6 +29,8 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; +import static accord.utils.Invariants.illegalState; + /** * Represents a map of ranges where precisely one value is bound to each point in the continuum of ranges, * and a simple function is sufficient to merge values inserted to overlapping ranges. @@ -419,7 +421,7 @@ public class ReducingIntervalMap<K extends Comparable<? super K>, V> if (leftIsInclusive == rightIsInclusive) return leftIsInclusive; else if (leftIsDecisive && rightIsDecisive) - throw new IllegalStateException("Mismatching bound inclusivity/exclusivity"); + throw illegalState("Mismatching bound inclusivity/exclusivity"); else if (leftIsDecisive) return leftIsInclusive; else diff --git a/accord-core/src/main/java/accord/utils/RelationMultiMap.java b/accord-core/src/main/java/accord/utils/RelationMultiMap.java index 394e539..e017a1b 100644 --- a/accord-core/src/main/java/accord/utils/RelationMultiMap.java +++ b/accord-core/src/main/java/accord/utils/RelationMultiMap.java @@ -27,8 +27,10 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static accord.utils.ArrayBuffers.*; +import static accord.utils.Invariants.illegalState; import static accord.utils.SortedArrays.remap; import static accord.utils.SortedArrays.remapToSuperset; +import static java.lang.String.format; /** * Instead of creating a parent object and having generic key/value methods, we introduce a bunch of static helper methods. @@ -49,6 +51,10 @@ import static accord.utils.SortedArrays.remapToSuperset; * ... * } * } + * + * // TODO (expected, performance): it would be trivial to special-case transactions that intersect on all keys (e.g. by setting the top integer bit, and otherwise propagating the prior limit). + * This would also neatly optimise cases where we only have a single key. + * // TODO (desired, performance): it would also be simple to bitmask our integers, compressing to e.g. byte or short boundaries as permitted. We could do this for all values, including our offsets header. */ public class RelationMultiMap { @@ -153,11 +159,13 @@ public class RelationMultiMap { // TODO (low priority, efficiency): this allocates a significant amount of memory: would be preferable to be able to sort using a pre-defined scratch buffer Arrays.sort(keysToValues, keyOffset, totalCount); + int removed = 0; for (int i = keyOffset + 1 ; i < totalCount ; ++i) { - if (keysToValues[i - 1].equals(keysToValues[i])) - throw new IllegalArgumentException("TxnId for " + keys[keyCount - 1] + " are not unique: " + Arrays.asList(keysToValues).subList(keyOffset, totalCount)); + if (keysToValues[i - 1].equals(keysToValues[i])) ++removed; + else if (removed > 0) keysToValues[i - removed] = keysToValues[i]; } + totalCount -= removed; } keyLimits[keyCount - 1] = totalCount; @@ -812,7 +820,7 @@ public class RelationMultiMap .append(", right[").append(rightKeyIndex).append("] = ").append(rightKeys[rightKeyIndex]).append("\n"); sb.append("leftKeys = ").append(Arrays.stream(leftKeys, 0, leftKeyLength).map(Object::toString).collect(Collectors.joining())).append('\n'); sb.append("rightKeys = ").append(Arrays.stream(rightKeys, 0, rightKeyLength).map(Object::toString).collect(Collectors.joining())).append('\n'); - throw new IllegalStateException(sb.toString()); + throw illegalState(sb.toString()); } static int[] copy(int[] src, int to, int length, IntBuffers bufferManager) @@ -1052,7 +1060,7 @@ public class RelationMultiMap { K key = keys[i]; V value = values[keysToValues[i]]; - throw new IllegalStateException(String.format("Duplicate value (%s) found for key %s", value, key)); + throw illegalState(format("Duplicate value (%s) found for key %s", value, key)); } i++; } diff --git a/accord-core/src/main/java/accord/utils/SortedArrays.java b/accord-core/src/main/java/accord/utils/SortedArrays.java index 255bc89..8617d52 100644 --- a/accord-core/src/main/java/accord/utils/SortedArrays.java +++ b/accord-core/src/main/java/accord/utils/SortedArrays.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import static accord.utils.ArrayBuffers.uncached; import static accord.utils.Invariants.checkArgument; +import static accord.utils.Invariants.illegalState; import static accord.utils.SortedArrays.Search.FAST; // TODO (low priority, efficiency): improvements: @@ -1137,14 +1138,14 @@ public class SortedArrays if (j < 0) { if (i > 0 && src[i] == src[i-1]) - throw new IllegalStateException("Unexpected value in source: " + src[i] + " at index " + i + " duplicates index " + (i - 1)); - throw new IllegalStateException("Unexpected value in source: " + src[i] + " at index " + i + " does not exist in target array"); + throw illegalState("Unexpected value in source: " + src[i] + " at index " + i + " duplicates index " + (i - 1)); + throw illegalState("Unexpected value in source: " + src[i] + " at index " + i + " does not exist in target array"); } } result[i++] = j++; } if (i != srcLength) - throw new IllegalStateException("Unexpected value in source: " + src[i] + " at index " + i + " does not exist in target array"); + throw illegalState("Unexpected value in source: " + src[i] + " at index " + i + " does not exist in target array"); return result; } diff --git a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java index 2c9dd4b..0a10292 100644 --- a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java +++ b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java @@ -26,6 +26,8 @@ import accord.api.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static accord.utils.Invariants.illegalState; + public class ThreadPoolScheduler implements Scheduler { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolScheduler.class); @@ -91,7 +93,7 @@ public class ThreadPoolScheduler implements Scheduler try { if (!exec.awaitTermination(1L, TimeUnit.MINUTES)) - throw new IllegalStateException("did not terminate"); + throw illegalState("did not terminate"); } catch (InterruptedException e) { diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java index 73a0fd8..fcc2956 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java @@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory; import accord.api.VisibleForImplementation; import accord.utils.Invariants; +import static accord.utils.Invariants.illegalState; + public abstract class AsyncChains<V> implements AsyncChain<V> { private static final Logger logger = LoggerFactory.getLogger(AsyncChains.class); @@ -718,7 +720,7 @@ public abstract class AsyncChains<V> implements AsyncChain<V> } catch (TimeoutException e) { - throw new IllegalStateException("Should not throw timeout exception e"); + throw illegalState("Should not throw timeout exception e"); } } @@ -801,7 +803,7 @@ public abstract class AsyncChains<V> implements AsyncChain<V> } catch (TimeoutException e) { - throw new IllegalStateException("Should not throw timeout exception e"); + throw illegalState("Should not throw timeout exception e"); } } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java b/accord-core/src/main/java/accord/utils/async/AsyncResult.java index 944d517..451f291 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java @@ -20,6 +20,8 @@ package accord.utils.async; import java.util.function.BiConsumer; +import static accord.utils.Invariants.illegalState; + /** * Handle for async computations that supports multiple listeners and registering * listeners after the computation has started @@ -58,7 +60,7 @@ public interface AsyncResult<V> extends AsyncChain<V> default void setSuccess(V value) { if (!trySuccess(value)) - throw new IllegalStateException("Result has already been set on " + this); + throw illegalState("Result has already been set on " + this); } boolean tryFailure(Throwable throwable); @@ -67,7 +69,7 @@ public interface AsyncResult<V> extends AsyncChain<V> { if (!tryFailure(throwable)) { - IllegalStateException e = new IllegalStateException("Result has already been set on " + this); + IllegalStateException e = illegalState("Result has already been set on " + this); e.addSuppressed(throwable); throw e; } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java index 77b7b4b..0fee26a 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java @@ -28,6 +28,8 @@ import java.util.function.Function; import accord.api.VisibleForImplementation; import accord.utils.Invariants; +import static accord.utils.Invariants.illegalState; + public class AsyncResults { public static final AsyncResult<Void> SUCCESS_VOID = success(null); @@ -93,7 +95,7 @@ public class AsyncResults } if (failures != null) { - IllegalStateException f = new IllegalStateException("Callbacks threw"); + IllegalStateException f = illegalState("Callbacks threw"); failures.forEach(f::addSuppressed); throw f; } @@ -145,7 +147,7 @@ public class AsyncResults { if (!trySetResult(result, failure)) { - IllegalStateException f = new IllegalStateException("Result has already been set on " + this); + IllegalStateException f = illegalState("Result has already been set on " + this); if (failure != null) f.addSuppressed(failure); throw f; @@ -224,7 +226,7 @@ public class AsyncResults { Result<V> result = getResult(); if (result.failure == null) - throw new IllegalStateException("Result succeeded"); + throw illegalState("Result succeeded"); return result.failure; } diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java index 7c77b97..f91af27 100644 --- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java +++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java @@ -26,6 +26,8 @@ import accord.utils.RandomSource; import java.time.Duration; import java.util.concurrent.TimeUnit; +import static accord.utils.Invariants.illegalState; + public class FrequentLargeRange implements LongGen { private final LongGen small, large; @@ -140,9 +142,9 @@ public class FrequentLargeRange implements LongGen public FrequentLargeRange build() { if (small == null) - throw new IllegalStateException("Small range undefined"); + throw illegalState("Small range undefined"); if (large == null) - throw new IllegalStateException("Large range undefined"); + throw illegalState("Large range undefined"); if (ratio == null) ratio(1, 11); if (maxRuns == null) diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 35800b5..57ae4e1 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -89,7 +89,6 @@ public class ListAgent implements Agent @Override public void onUncaughtException(Throwable t) { - // TODO (required, testing): ensure reported to runner onFailure.accept(t); } diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java index cd58a8a..ec1ace1 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRequest.java +++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java @@ -25,6 +25,7 @@ import accord.api.Result; import accord.api.RoutingKey; import accord.coordinate.CheckShards; import accord.coordinate.CoordinationFailed; +import accord.coordinate.Exhausted; import accord.coordinate.Invalidated; import accord.coordinate.Truncated; import accord.coordinate.Timeout; @@ -54,6 +55,7 @@ import static accord.local.Status.Phase.Cleanup; import static accord.local.Status.PreApplied; import static accord.local.Status.PreCommitted; +import static accord.utils.Invariants.illegalState; public class ListRequest implements Request { @@ -193,7 +195,8 @@ public class ListRequest implements Request node.reply(client, replyContext, ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null); return; } - if (f instanceof Timeout || f instanceof SimulatedFault) checkOnResult(finalHomeKey, txnId, attempt + 1, f); + // some arbitrarily large limit to attempts + if (attempt < 1000 && (f instanceof Timeout || f instanceof SimulatedFault || f instanceof Exhausted)) checkOnResult(finalHomeKey, txnId, attempt + 1, f); else { node.reply(client, replyContext, ListResult.failure(client, ((Packet)replyContext).requestId, txnId), null); @@ -248,7 +251,7 @@ public class ListRequest implements Request public void process(Node node, Id client, ReplyContext replyContext) { if (id != null) - throw new IllegalStateException("Called process multiple times"); + throw illegalState("Called process multiple times"); txn = gen.apply(node); id = node.nextTxnId(txn.kind(), txn.keys().domain()); listener.onClientAction(MessageListener.ClientAction.SUBMIT, node.id(), id, txn); diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java index 5e8cb63..614a1f6 100644 --- a/accord-core/src/test/java/accord/impl/list/ListResult.java +++ b/accord-core/src/test/java/accord/impl/list/ListResult.java @@ -30,6 +30,8 @@ import accord.messages.Reply; import accord.primitives.Seekables; import accord.primitives.TxnId; +import static accord.utils.Invariants.illegalState; + public class ListResult implements Result, Reply { public enum Status diff --git a/accord-core/src/test/java/accord/local/CheckedCommands.java b/accord-core/src/test/java/accord/local/CheckedCommands.java index 9633be8..fe4acf7 100644 --- a/accord-core/src/test/java/accord/local/CheckedCommands.java +++ b/accord-core/src/test/java/accord/local/CheckedCommands.java @@ -33,32 +33,34 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Writes; +import static accord.utils.Invariants.illegalState; + public class CheckedCommands { public static void preaccept(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey) { SafeCommand safeCommand = safeStore.get(txnId, txnId, route); Commands.AcceptOutcome result = Commands.preaccept(safeStore, safeCommand, txnId, txnId.epoch(), partialTxn, route, progressKey); - if (result != Commands.AcceptOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + result); + if (result != Commands.AcceptOutcome.Success) throw illegalState("Command mutation rejected: " + result); } public static void accept(SafeCommandStore safeStore, TxnId txnId, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps) { Commands.AcceptOutcome result = Commands.accept(safeStore, txnId, ballot, route, keys, progressKey, executeAt, partialDeps); - if (result != Commands.AcceptOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + result); + if (result != Commands.AcceptOutcome.Success) throw illegalState("Command mutation rejected: " + result); } public static void commit(SafeCommandStore safeStore, TxnId txnId, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps) { SafeCommand safeCommand = safeStore.get(txnId, txnId, route); Commands.CommitOutcome result = Commands.commit(safeStore, safeCommand, txnId, route, progressKey, partialTxn, executeAt, partialDeps); - if (result != Commands.CommitOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + result); + if (result != Commands.CommitOutcome.Success) throw illegalState("Command mutation rejected: " + result); } public static void apply(SafeCommandStore safeStore, TxnId txnId, Route<?> route, @Nullable RoutingKey progressKey, Timestamp executeAt, @Nullable PartialDeps partialDeps, @Nullable PartialTxn partialTxn, Writes writes, Result result) { SafeCommand safeCommand = safeStore.get(txnId, txnId, route); Commands.ApplyOutcome outcome = Commands.apply(safeStore, safeCommand, txnId, route, progressKey, executeAt, partialDeps, partialTxn, writes, result); - if (outcome != Commands.ApplyOutcome.Success) throw new IllegalStateException("Command mutation rejected: " + outcome); + if (outcome != Commands.ApplyOutcome.Success) throw illegalState("Command mutation rejected: " + outcome); } } diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java b/accord-core/src/test/java/accord/messages/ReadDataTest.java index 97b44a8..c700e28 100644 --- a/accord-core/src/test/java/accord/messages/ReadDataTest.java +++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java @@ -74,6 +74,7 @@ import org.mockito.stubbing.Answer; import static accord.Utils.createNode; import static accord.Utils.id; +import static accord.utils.Invariants.illegalState; import static accord.utils.Utils.listOf; import static accord.utils.async.AsyncChains.getUninterruptibly; import static org.mockito.ArgumentMatchers.any; @@ -109,7 +110,7 @@ class ReadDataTest @Override public AsyncChain<Data> answer(InvocationOnMock ignore) throws Throwable { - if (called) throw new IllegalStateException("Multiple calls"); + if (called) throw illegalState("Multiple calls"); return readResult; } }); diff --git a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java index 938ad25..973bf77 100644 --- a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java +++ b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java @@ -38,6 +38,7 @@ import org.assertj.core.api.AbstractThrowableAssert; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import static accord.utils.Invariants.illegalState; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class AsyncChainsTest @@ -155,7 +156,7 @@ public class AsyncChainsTest .map(i -> i + 1) .begin((success, failure) -> { if (failure == null) - throw new IllegalStateException("Should see failure"); + throw illegalState("Should see failure"); }); AsyncChains.ofRunnable(ignore -> { @@ -165,7 +166,7 @@ public class AsyncChainsTest .beginAsResult() .addCallback((success, failure) -> { if (failure == null) - throw new IllegalStateException("Expected to fail"); + throw illegalState("Expected to fail"); }); AsyncChains.<Integer>ofCallable(fn -> fn.run(), () -> { @@ -173,7 +174,7 @@ public class AsyncChainsTest }).map(i -> i + 1).map(i -> i + 1) .begin((success, failure) -> { if (failure == null) - throw new IllegalStateException("Should see failure"); + throw illegalState("Should see failure"); }); AsyncChains.ofCallable(fn -> fn.run(), () -> 42 @@ -183,7 +184,7 @@ public class AsyncChainsTest }) .begin((success, failure) -> { if (failure == null) - throw new IllegalStateException("Should see failure"); + throw illegalState("Should see failure"); }); } diff --git a/accord-core/src/test/java/accord/verify/ElleVerifier.java b/accord-core/src/test/java/accord/verify/ElleVerifier.java index a336ae5..2e0d02f 100644 --- a/accord-core/src/test/java/accord/verify/ElleVerifier.java +++ b/accord-core/src/test/java/accord/verify/ElleVerifier.java @@ -50,6 +50,9 @@ public class ElleVerifier implements Verifier { public static boolean allowed() { + if (!Boolean.parseBoolean(System.getProperty("accord.verify.elle", "true"))) + return false; + // Elle only works on JDK 11 int jdkVersion = Integer.parseInt(StandardSystemProperty.JAVA_VERSION.value().split("\\.")[0]); return !(jdkVersion == 1 /* 1.8 */ || jdkVersion == 8); diff --git a/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java b/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java index 109a974..8ab2109 100644 --- a/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java +++ b/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import java.util.function.Consumer; import java.util.stream.IntStream; +import static accord.utils.Invariants.illegalState; + /** * Nomenclature: * register: the values associated with a given key @@ -445,7 +447,7 @@ public class SerializabilityVerifier public void witnessRead(int key, int[] sequence) { if (bufReads[key] != null) - throw new IllegalStateException("Can buffer only one read observation for each key"); + throw illegalState("Can buffer only one read observation for each key"); bufReads[key] = sequence; // if we have a write, then for causality sequence is implicitly longer by one to include the write bufNewPeerSteps[key] = bufWrites[key] >= 0 ? sequence.length + 1 : sequence.length; @@ -457,7 +459,7 @@ public class SerializabilityVerifier public void witnessWrite(int key, int id) { if (bufWrites[key] >= 0) - throw new IllegalStateException("Can buffer only one write observation for each key"); + throw illegalState("Can buffer only one write observation for each key"); bufWrites[key] = id; if (bufReads[key] != null) bufNewPeerSteps[key] = bufReads[key].length; diff --git a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java index 738486e..64d97ce 100644 --- a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java +++ b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java @@ -33,6 +33,7 @@ import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static accord.utils.Invariants.illegalState; import static java.util.stream.Collectors.joining; /** @@ -812,7 +813,7 @@ public class StrictSerializabilityVerifier implements Verifier public void witnessRead(int key, int[] sequence) { if (bufReads[key] != null) - throw new IllegalStateException("Can buffer only one read observation for each key"); + throw illegalState("Can buffer only one read observation for each key"); bufReads[key] = sequence; // if we have a write, then for causality sequence is implicitly longer by one to include the write bufNewPeerSteps[key] = bufWrites[key] >= 0 ? sequence.length + 1 : sequence.length; @@ -824,7 +825,7 @@ public class StrictSerializabilityVerifier implements Verifier public void witnessWrite(int key, int id) { if (bufWrites[key] >= 0) - throw new IllegalStateException("Can buffer only one write observation for each key"); + throw illegalState("Can buffer only one write observation for each key"); bufWrites[key] = id; if (bufReads[key] != null) bufNewPeerSteps[key] = bufReads[key].length + 1; diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Body.java b/accord-maelstrom/src/main/java/accord/maelstrom/Body.java index 4705e84..76b3100 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Body.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Body.java @@ -32,6 +32,7 @@ import accord.local.Node.Id; import accord.primitives.Txn; import accord.maelstrom.Packet.Type; +import static accord.utils.Invariants.illegalState; import static accord.utils.Utils.toArray; public class Body @@ -155,7 +156,7 @@ public class Body in.endArray(); break; default: - throw new IllegalStateException("Unexpected field " + field); + throw illegalState("Unexpected field " + field); } } in.endObject(); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java index a745304..d76047e 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java @@ -35,6 +35,8 @@ import com.google.gson.stream.JsonWriter; import accord.local.Node.Id; import accord.api.Key; +import static accord.utils.Invariants.illegalState; + public class Json { public static final Gson GSON; @@ -268,7 +270,7 @@ public class Json String kind = in.nextName(); switch (kind) { - default: throw new IllegalStateException("Invalid kind: " + kind); + default: throw illegalState("Invalid kind: " + kind); case "r": in.beginArray(); while (in.hasNext()) diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java index 2cb9fd2..f18d829 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java @@ -32,6 +32,8 @@ import com.google.gson.stream.JsonWriter; import accord.maelstrom.Packet.Type; import accord.messages.Reply; +import static accord.utils.Invariants.illegalState; + public class MaelstromReply extends Body implements Reply { final MaelstromResult result; @@ -98,7 +100,7 @@ public class MaelstromReply extends Body implements Reply Key key = MaelstromKey.readKey(in); switch (op) { - default: throw new IllegalStateException("Invalid op: " + op); + default: throw illegalState("Invalid op: " + op); case "r": { Value value = Value.read(in); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java index 5dfcf3e..307218e 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java @@ -36,6 +36,8 @@ import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonToken; import com.google.gson.stream.JsonWriter; +import static accord.utils.Invariants.illegalState; + public class MaelstromRequest extends Body implements Request { final Txn txn; @@ -126,7 +128,7 @@ public class MaelstromRequest extends Body implements Request Key key = MaelstromKey.readKey(in); switch (op) { - default: throw new IllegalStateException("Invalid op: " + op); + default: throw illegalState("Invalid op: " + op); case "r": in.nextNull(); buildReadKeys.add(key); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java index 2ec35c8..bb44cb5 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java @@ -32,6 +32,8 @@ import accord.api.Key; import accord.api.Result; import accord.primitives.Keys; +import static accord.utils.Invariants.illegalState; + public class MaelstromResult implements Result { final Node.Id client; @@ -116,7 +118,7 @@ public class MaelstromResult implements Result String kind = in.nextName(); switch (kind) { - default: throw new IllegalStateException("Invalid kind: " + kind); + default: throw illegalState("Invalid kind: " + kind); case "r": in.beginArray(); while (in.hasNext()) diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java b/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java index 728430a..f5a8e13 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java @@ -31,6 +31,8 @@ import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; import accord.local.Node.Id; +import static accord.utils.Invariants.illegalState; + public class Packet implements ReplyContext { @@ -161,7 +163,7 @@ public class Packet implements ReplyContext break; case "id": in.nextLong(); break; default: - throw new IllegalStateException("Unexpected field " + field); + throw illegalState("Unexpected field " + field); } } in.endObject(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org