This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 0aa1d4c8153a09ca3cff79a06bc0f1caa94ae9a5 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Oct 2 12:42:11 2024 +0100 More follow-up to CASSANDRA-19967 and CASSANDRA-19869 --- .../java/accord/impl/InMemoryCommandStore.java | 4 +- .../main/java/accord/impl/TimestampsForKeys.java | 9 +- .../src/main/java/accord/local/Cleanup.java | 11 +- .../src/main/java/accord/local/Command.java | 2 +- .../src/main/java/accord/local/CommandStore.java | 176 +-------------------- .../src/main/java/accord/local/CommandStores.java | 5 - .../src/main/java/accord/local/Commands.java | 20 +-- accord-core/src/main/java/accord/local/Node.java | 19 ++- .../main/java/accord/local/RedundantBefore.java | 146 +++++++++++++++++ .../main/java/accord/local/SafeCommandStore.java | 34 +++- .../main/java/accord/local/StoreParticipants.java | 2 +- .../main/java/accord/local/cfk/CommandsForKey.java | 5 - .../java/accord/local/cfk/SafeCommandsForKey.java | 2 +- .../src/main/java/accord/messages/PreAccept.java | 2 +- .../src/main/java/accord/messages/Propagate.java | 8 +- .../java/accord/messages/QueryDurableBefore.java | 2 +- .../src/main/java/accord/messages/ReadData.java | 2 +- .../java/accord/messages/SetGloballyDurable.java | 2 +- .../src/main/java/accord/primitives/Timestamp.java | 5 + .../src/test/java/accord/impl/basic/Journal.java | 3 +- .../src/test/java/accord/impl/list/ListRead.java | 2 +- .../test/java/accord/messages/PreAcceptTest.java | 4 +- .../test/java/accord/primitives/KeyDepsTest.java | 2 +- 23 files changed, 245 insertions(+), 222 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 915ecec7..deee1947 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -397,7 +397,7 @@ public abstract class InMemoryCommandStore extends CommandStore boolean done = command.hasBeen(Truncated); if (!done) { - if (redundantBefore().status(txnId, command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE) + if (unsafeGetRedundantBefore().status(txnId, command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE) return; Route<?> route = command.route().slice(allRanges); @@ -759,7 +759,7 @@ public abstract class InMemoryCommandStore extends CommandStore return; Ranges slice = ranges(txnId, updated.executeAtOrTxnId()); - slice = commandStore.redundantBefore().removeShardRedundant(txnId, updated.executeAtOrTxnId(), slice); + slice = commandStore.unsafeGetRedundantBefore().removeShardRedundant(txnId, updated.executeAtOrTxnId(), slice); commandStore.rangeCommands.computeIfAbsent(txnId, ignore -> new RangeCommand(commandStore.commands.get(txnId))) .update(((AbstractRanges)updated.participants().touches()).toRanges().slice(slice, Minimal)); } diff --git a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java index bd0e0ef7..baea052c 100644 --- a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java +++ b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java @@ -21,6 +21,7 @@ package accord.impl; import accord.api.RoutingKey; import accord.api.VisibleForImplementation; import accord.local.CommandStore; +import accord.local.SafeCommandStore; import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -38,7 +39,7 @@ public class TimestampsForKeys private TimestampsForKeys() {} - public static TimestampsForKey updateLastExecutionTimestamps(CommandStore commandStore, SafeTimestampsForKey tfk, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn) + public static TimestampsForKey updateLastExecutionTimestamps(SafeCommandStore safeStore, SafeTimestampsForKey tfk, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn) { TimestampsForKey current = tfk.current(); @@ -46,7 +47,7 @@ public class TimestampsForKeys if (executeAt.compareTo(lastWrite) < 0) { - if (commandStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY) + if (safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY) return current; throw illegalState("%s is less than the most recent write timestamp %s", executeAt, lastWrite); } @@ -59,7 +60,7 @@ public class TimestampsForKeys if (cmp < 0) { - if (!commandStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable())) + if (!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable())) return current; throw illegalState("%s is less than the most recent executed timestamp %s", executeAt, lastExecuted); } @@ -83,6 +84,6 @@ public class TimestampsForKeys @VisibleForImplementation public static <D> TimestampsForKey updateLastExecutionTimestamps(AbstractSafeCommandStore<?,?,?> safeStore, RoutingKey key, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn) { - return updateLastExecutionTimestamps(safeStore.commandStore(), safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn); + return updateLastExecutionTimestamps(safeStore, safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn); } } diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index 78dc341c..b07c8a09 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -76,18 +76,19 @@ public enum Cleanup public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command command) { - return shouldCleanup(safeStore.commandStore(), command, command.participants()); + return shouldCleanup(safeStore, command, command.participants()); } public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command command, @Nonnull StoreParticipants participants) { - return shouldCleanup(safeStore.commandStore(), command, participants); + return shouldCleanup(command.txnId(), command.saveStatus(), command.durability(), participants, + safeStore.redundantBefore(), safeStore.durableBefore()); } - public static Cleanup shouldCleanup(CommandStore commandStore, Command command, @Nonnull StoreParticipants participants) + public static Cleanup shouldCleanup(Command command, RedundantBefore redundantBefore, DurableBefore durableBefore) { - return shouldCleanup(command.txnId(), command.saveStatus(), command.durability(), participants, - commandStore.redundantBefore(), commandStore.durableBefore()); + return shouldCleanup(command.txnId(), command.saveStatus(), command.durability(), command.participants(), + redundantBefore, durableBefore); } public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index c3e2c851..8fad6318 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -1368,7 +1368,7 @@ public abstract class Command implements CommonAttributes long maxEpoch = prevEpoch; long epoch = rangesForEpoch.epochs[i]; Ranges ranges = rangesForEpoch.ranges[i]; - ranges = safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges); + ranges = safeStore.redundantBefore().removePreBootstrap(txnId, ranges); if (!ranges.isEmpty()) { Unseekables<?> executionParticipants = participants.route.slice(ranges, Minimal); diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 44d3ca56..d8a4583e 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -22,18 +22,12 @@ import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.DataStore; import accord.coordinate.CollectCalculatedDeps; -import accord.local.Command.WaitingOn; import javax.annotation.Nullable; import accord.api.Agent; import accord.local.CommandStores.RangesForEpoch; -import accord.primitives.KeyDeps; -import accord.primitives.Participants; -import accord.primitives.Range; import accord.primitives.Routables; -import accord.primitives.RoutingKeys; -import accord.primitives.Status; import accord.primitives.Unseekables; import accord.utils.async.AsyncChain; @@ -44,7 +38,6 @@ import accord.utils.async.AsyncResult; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -62,12 +55,10 @@ import org.slf4j.LoggerFactory; import accord.primitives.Deps; import accord.primitives.FullRoute; -import accord.primitives.RangeDeps; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.async.AsyncResults; -import org.agrona.collections.Int2ObjectHashMap; import static accord.api.ConfigurationService.EpochReady.DONE; import static accord.local.KeyHistory.COMMANDS; @@ -637,180 +628,29 @@ public abstract class CommandStore implements AgentExecutor }; } - public final Ranges safeToReadAt(Timestamp at) + public final boolean isRejectedIfNotPreAccepted(TxnId txnId, Unseekables<?> participants) { - return safeToRead.lowerEntry(at).getValue(); - } + if (rejectBefore == null) + return false; - // TODO (desired): Commands.durability() can use this to upgrade to Majority without further info - public final Status.Durability globalDurability(TxnId txnId) - { - return durableBefore.min(txnId); + return rejectBefore.rejects(txnId, participants); } - public final RedundantBefore redundantBefore() + public final RedundantBefore unsafeGetRedundantBefore() { return redundantBefore; } - public DurableBefore durableBefore() + public DurableBefore unsafeGetDurableBefore() { return durableBefore; } @VisibleForTesting - public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return bootstrapBeganAt; } + public final NavigableMap<TxnId, Ranges> unsafeGetBootstrapBeganAt() { return bootstrapBeganAt; } @VisibleForTesting - public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; } - - public final boolean isRejectedIfNotPreAccepted(TxnId txnId, Unseekables<?> participants) - { - if (rejectBefore == null) - return false; - - return rejectBefore.rejects(txnId, participants); - } - - public final void removeRedundantDependencies(Unseekables<?> participants, WaitingOn.Update builder) - { - // Note: we do not need to track the bootstraps we implicitly depend upon, because we will not serve any read requests until this has completed - // and since we are a timestamp store, and we write only this will sort itself out naturally - // TODO (required): make sure we have no races on HLC around SyncPoint else this resolution may not work (we need to know the micros equivalent timestamp of the snapshot) - class KeyState - { - Int2ObjectHashMap<RoutingKeys> partiallyBootstrapping; - - /** - * Are the participating ranges for the txn fully covered by bootstrapping ranges for this command store - */ - boolean isFullyBootstrapping(WaitingOn.Update builder, Range range, int txnIdx) - { - if (builder.directKeyDeps.foldEachKey(txnIdx, range, true, (r0, k, p) -> p && r0.contains(k))) - return true; - - if (partiallyBootstrapping == null) - partiallyBootstrapping = new Int2ObjectHashMap<>(); - RoutingKeys prev = partiallyBootstrapping.get(txnIdx); - RoutingKeys remaining = prev; - if (remaining == null) remaining = builder.directKeyDeps.participatingKeys(txnIdx); - else Invariants.checkState(!remaining.isEmpty()); - remaining = remaining.without(range); - if (prev == null) Invariants.checkState(!remaining.isEmpty()); - partiallyBootstrapping.put(txnIdx, remaining); - return remaining.isEmpty(); - } - } - - KeyDeps directKeyDeps = builder.directKeyDeps; - if (!directKeyDeps.isEmpty()) - { - redundantBefore().foldl(directKeyDeps.keys(), (e, s, d, b) -> { - // TODO (desired, efficiency): foldlInt so we can track the lower rangeidx bound and not revisit unnecessarily - // find the txnIdx below which we are known to be fully redundant locally due to having been applied or invalidated - int bootstrapIdx = d.txnIds().find(e.bootstrappedAt); - if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx; - int appliedIdx = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); - if (appliedIdx < 0) appliedIdx = -1 - appliedIdx; - - // remove intersecting transactions with known redundant txnId - // note that we must exclude all transactions that are pre-bootstrap, and perform the more complicated dance below, - // as these transactions may be only partially applied, and we may need to wait for them on another key. - if (appliedIdx > bootstrapIdx) - { - d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0, s0, txnIdx) -> { - b0.removeWaitingOnDirectKeyTxnId(txnIdx); - }); - } - - if (bootstrapIdx > 0) - { - d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0, s0, r, txnIdx) -> { - if (b0.isWaitingOnDirectKeyTxnIdx(txnIdx) && s0.isFullyBootstrapping(b0, r, txnIdx)) - b0.removeWaitingOnDirectKeyTxnId(txnIdx); - }); - } - return s; - }, new KeyState(), directKeyDeps, builder, ignore -> false); - } - - /** - * If we have to handle bootstrapping ranges for range transactions, these may only partially cover the - * transaction, in which case we should not remove the transaction as a dependency. But if it is fully - * covered by bootstrapping ranges then we *must* remove it as a dependency. - */ - class RangeState - { - Range range; - int bootstrapIdx, appliedIdx; - Map<Integer, Ranges> partiallyBootstrapping; - - /** - * Are the participating ranges for the txn fully covered by bootstrapping ranges for this command store - */ - boolean isFullyBootstrapping(int rangeTxnIdx) - { - // if all deps for the txnIdx are contained in the range, don't inflate any shared object state - if (builder.directRangeDeps.foldEachRange(rangeTxnIdx, range, true, (r1, r2, p) -> p && r1.contains(r2))) - return true; - - if (partiallyBootstrapping == null) - partiallyBootstrapping = new HashMap<>(); - Ranges prev = partiallyBootstrapping.get(rangeTxnIdx); - Ranges remaining = prev; - if (remaining == null) remaining = builder.directRangeDeps.ranges(rangeTxnIdx); - else Invariants.checkState(!remaining.isEmpty()); - remaining = remaining.without(Ranges.of(range)); - if (prev == null) Invariants.checkState(!remaining.isEmpty()); - partiallyBootstrapping.put(rangeTxnIdx, remaining); - return remaining.isEmpty(); - } - } - - RangeDeps rangeDeps = builder.directRangeDeps; - // TODO (required, consider): slice to only those ranges we own, maybe don't even construct rangeDeps.covering() - redundantBefore().foldl(participants, (e, s, d, b) -> { - int bootstrapIdx = d.txnIds().find(e.bootstrappedAt); - if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx; - s.bootstrapIdx = bootstrapIdx; - - int appliedIdx = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); - if (appliedIdx < 0) appliedIdx = -1 - appliedIdx; - s.appliedIdx = appliedIdx; - - // remove intersecting transactions with known redundant txnId - if (appliedIdx > bootstrapIdx) - { - // TODO (desired): - // TODO (desired): move the bounds check into forEach, matching structure used for keys - d.forEach(e.range, b, s, (b0, s0, txnIdx) -> { - if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx) - b0.removeWaitingOnDirectRangeTxnId(txnIdx); - }); - } - - if (bootstrapIdx > 0) - { - // if we have any ranges where bootstrap is involved, we have to do a more complicated dance since - // this may imply only partial redundancy (we may still depend on the transaction for some other range) - s.range = e.range; - // TODO (desired): move the bounds check into forEach, matching structure used for keys - d.forEach(e.range, b, s, (b0, s0, txnIdx) -> { - if (txnIdx < s0.bootstrapIdx && b0.isWaitingOnDirectRangeTxnIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx)) - b0.removeWaitingOnDirectRangeTxnId(txnIdx); - }); - } - return s; - }, new RangeState(), rangeDeps, builder, ignore -> false); - } - - public final boolean hasLocallyRedundantDependencies(TxnId minimumDependencyId, Timestamp executeAt, Participants<?> participantsOfWaitingTxn) - { - // TODO (required): consider race conditions when bootstrapping into an active command store, that may have seen a higher txnId than this? - // might benefit from maintaining a per-CommandStore largest TxnId register to ensure we allocate a higher TxnId for our ExclSync, - // or from using whatever summary records we have for the range, once we maintain them - return redundantBefore.status(minimumDependencyId, participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE) >= 0; - } + public NavigableMap<Timestamp, Ranges> unsafeGetSafeToRead() { return safeToRead; } final void markUnsafeToRead(Ranges ranges) { diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 53f619d5..2e366661 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -208,11 +208,6 @@ public abstract class CommandStores return allAt(txnId); } - public @Nonnull Ranges unsafeToReadAt(Timestamp at) - { - return allAt(at).without(store.safeToReadAt(at)); - } - public @Nonnull Ranges allAt(Timestamp at) { return allAt(at.epoch()); diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index a85ad650..76c74a86 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -634,10 +634,10 @@ public class Commands protected static WaitingOn.Update updateWaitingOn(SafeCommandStore safeStore, CommonAttributes waiting, Timestamp executeAt, WaitingOn.Update update, Participants<?> participants) { - CommandStore commandStore = safeStore.commandStore(); + RedundantBefore redundantBefore = safeStore.redundantBefore(); TxnId minWaitingOnTxnId = update.minWaitingOnTxnId(); - if (minWaitingOnTxnId != null && commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), executeAt, participants)) - safeStore.commandStore().removeRedundantDependencies(participants, update); + if (minWaitingOnTxnId != null && redundantBefore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), executeAt, participants)) + redundantBefore.removeRedundantDependencies(participants, update); update.forEachWaitingOnId(safeStore, update, waiting, executeAt, (store, upd, w, exec, i) -> { SafeCommand dep = store.ifLoadedAndInitialised(upd.txnId(i)); @@ -859,7 +859,7 @@ public class Commands { TxnId txnId = command.txnId(); participants = command.participants().supplement(participants); - RedundantStatus status = safeStore.commandStore().redundantBefore().status(txnId, participants.owns()); + RedundantStatus status = safeStore.redundantBefore().status(txnId, participants.owns()); switch (status) { default: throw new AssertionError("Unhandled RedundantStatus: " + status); @@ -937,7 +937,7 @@ public class Commands depSafe = safeStore.ifInitialised(loadDepId); if (depSafe == null) { - RedundantStatus redundantStatus = safeStore.commandStore().redundantBefore().status(waitingId, waiting.partialDeps().participants(loadDepId)); + RedundantStatus redundantStatus = safeStore.redundantBefore().status(waitingId, waiting.partialDeps().participants(loadDepId)); switch (redundantStatus) { default: throw new AssertionError("Unexpected redundant status: " + redundantStatus); @@ -1013,7 +1013,7 @@ public class Commands // TODO (desired): slightly costly to invert a large partialDeps collection Participants<?> participants = waiting.partialDeps().participants(dep.txnId()); participants = waiting.participants().dependencyExecutesAtLeast(safeStore, participants, waitingId, waiting.executeAt()); - RedundantStatus redundantStatus = safeStore.commandStore().redundantBefore().status(dep.txnId(), participants); + RedundantStatus redundantStatus = safeStore.redundantBefore().status(dep.txnId(), participants); switch (redundantStatus) { default: throw new AssertionError("Unknown redundant status: " + redundantStatus); @@ -1101,13 +1101,13 @@ public class Commands static Command removeRedundantDependencies(SafeCommandStore safeStore, SafeCommand safeCommand, @Nullable TxnId redundant) { - CommandStore commandStore = safeStore.commandStore(); Command.Committed current = safeCommand.current().asCommitted(); + RedundantBefore redundantBefore = safeStore.redundantBefore(); WaitingOn.Update update = new WaitingOn.Update(current.waitingOn); TxnId minWaitingOnTxnId = update.minWaitingOnTxnId(); - if (minWaitingOnTxnId != null && commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), current.executeAt(), current.participants().owns)) - safeStore.commandStore().removeRedundantDependencies(current.participants().owns, update); + if (minWaitingOnTxnId != null && redundantBefore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), current.executeAt(), current.participants().owns)) + redundantBefore.removeRedundantDependencies(current.participants().owns, update); // if we are a range transaction, being redundant for this transaction does not imply we are redundant for all transactions if (redundant != null) @@ -1248,7 +1248,7 @@ public class Commands { // TODO (required, later): in the event we are depending on a stale key for an insert into a non-stale key, we cannot proceed and must mark the new key stale // I think today this is unsupported in practice, but must be addressed before we improve efficiency of result handling - Ranges staleRanges = permitStaleMissing.commandStore().redundantBefore().staleRanges(); + Ranges staleRanges = permitStaleMissing.redundantBefore().staleRanges(); required = required.without(staleRanges); return adding == null ? required.isEmpty() : covers.test(adding, required); } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 6ca13287..499435c9 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -403,12 +403,23 @@ public class Node implements ConfigurationService.Listener, NodeTimeService private static Timestamp nowAtLeast(Timestamp current, Timestamp proposed) { - if (current.epoch() >= proposed.epoch() && current.hlc() >= proposed.hlc()) + long currentEpoch = current.epoch(), proposedEpoch = proposed.epoch(); + long maxEpoch = Math.max(currentEpoch, proposedEpoch); + + long currentHlc = current.hlc(), proposedHlc = proposed.hlc(); + if (currentHlc == proposedHlc) + { + // we want to produce a zero Hlc + int currentFlags = current.flags(), proposedFlags = proposed.flags(); + if (proposedFlags > currentFlags) ++proposedHlc; + else if (proposedFlags == currentFlags && proposed.node.id > current.node.id) ++proposedHlc; + } + long maxHlc = Math.max(currentHlc, proposedHlc); + + if (maxEpoch == currentEpoch && maxHlc == currentHlc) return current; - return proposed.withEpochAtLeast(proposed.epoch()) - .withHlcAtLeast(current.hlc()) - .withNode(current.node); + return Timestamp.fromValues(maxEpoch, maxHlc, current.flags(), current.node); } @Override diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index a0d7ede8..41b84ae8 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -18,6 +18,8 @@ package accord.local; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -27,16 +29,20 @@ import accord.api.VisibleForImplementation; import accord.primitives.AbstractRanges; import accord.primitives.Deps; import accord.primitives.EpochSupplier; +import accord.primitives.KeyDeps; import accord.primitives.Participants; import accord.primitives.Range; +import accord.primitives.RangeDeps; import accord.primitives.Ranges; import accord.primitives.Routables; +import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; import accord.utils.Invariants; import accord.utils.ReducingIntervalMap; import accord.utils.ReducingRangeMap; +import org.agrona.collections.Int2ObjectHashMap; import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY; import static accord.local.RedundantBefore.PreBootstrapOrStale.POST_BOOTSTRAP; @@ -675,4 +681,144 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> } } } + + public final void removeRedundantDependencies(Unseekables<?> participants, Command.WaitingOn.Update builder) + { + // Note: we do not need to track the bootstraps we implicitly depend upon, because we will not serve any read requests until this has completed + // and since we are a timestamp store, and we write only this will sort itself out naturally + // TODO (required): make sure we have no races on HLC around SyncPoint else this resolution may not work (we need to know the micros equivalent timestamp of the snapshot) + class KeyState + { + Int2ObjectHashMap<RoutingKeys> partiallyBootstrapping; + + /** + * Are the participating ranges for the txn fully covered by bootstrapping ranges for this command store + */ + boolean isFullyBootstrapping(Command.WaitingOn.Update builder, Range range, int txnIdx) + { + if (builder.directKeyDeps.foldEachKey(txnIdx, range, true, (r0, k, p) -> p && r0.contains(k))) + return true; + + if (partiallyBootstrapping == null) + partiallyBootstrapping = new Int2ObjectHashMap<>(); + RoutingKeys prev = partiallyBootstrapping.get(txnIdx); + RoutingKeys remaining = prev; + if (remaining == null) remaining = builder.directKeyDeps.participatingKeys(txnIdx); + else Invariants.checkState(!remaining.isEmpty()); + remaining = remaining.without(range); + if (prev == null) Invariants.checkState(!remaining.isEmpty()); + partiallyBootstrapping.put(txnIdx, remaining); + return remaining.isEmpty(); + } + } + + KeyDeps directKeyDeps = builder.directKeyDeps; + if (!directKeyDeps.isEmpty()) + { + foldl(directKeyDeps.keys(), (e, s, d, b) -> { + // TODO (desired, efficiency): foldlInt so we can track the lower rangeidx bound and not revisit unnecessarily + // find the txnIdx below which we are known to be fully redundant locally due to having been applied or invalidated + int bootstrapIdx = d.txnIds().find(e.bootstrappedAt); + if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx; + int appliedIdx = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); + if (appliedIdx < 0) appliedIdx = -1 - appliedIdx; + + // remove intersecting transactions with known redundant txnId + // note that we must exclude all transactions that are pre-bootstrap, and perform the more complicated dance below, + // as these transactions may be only partially applied, and we may need to wait for them on another key. + if (appliedIdx > bootstrapIdx) + { + d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0, s0, txnIdx) -> { + b0.removeWaitingOnDirectKeyTxnId(txnIdx); + }); + } + + if (bootstrapIdx > 0) + { + d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0, s0, r, txnIdx) -> { + if (b0.isWaitingOnDirectKeyTxnIdx(txnIdx) && s0.isFullyBootstrapping(b0, r, txnIdx)) + b0.removeWaitingOnDirectKeyTxnId(txnIdx); + }); + } + return s; + }, new KeyState(), directKeyDeps, builder, ignore -> false); + } + + /** + * If we have to handle bootstrapping ranges for range transactions, these may only partially cover the + * transaction, in which case we should not remove the transaction as a dependency. But if it is fully + * covered by bootstrapping ranges then we *must* remove it as a dependency. + */ + class RangeState + { + Range range; + int bootstrapIdx, appliedIdx; + Map<Integer, Ranges> partiallyBootstrapping; + + /** + * Are the participating ranges for the txn fully covered by bootstrapping ranges for this command store + */ + boolean isFullyBootstrapping(int rangeTxnIdx) + { + // if all deps for the txnIdx are contained in the range, don't inflate any shared object state + if (builder.directRangeDeps.foldEachRange(rangeTxnIdx, range, true, (r1, r2, p) -> p && r1.contains(r2))) + return true; + + if (partiallyBootstrapping == null) + partiallyBootstrapping = new HashMap<>(); + Ranges prev = partiallyBootstrapping.get(rangeTxnIdx); + Ranges remaining = prev; + if (remaining == null) remaining = builder.directRangeDeps.ranges(rangeTxnIdx); + else Invariants.checkState(!remaining.isEmpty()); + remaining = remaining.without(Ranges.of(range)); + if (prev == null) Invariants.checkState(!remaining.isEmpty()); + partiallyBootstrapping.put(rangeTxnIdx, remaining); + return remaining.isEmpty(); + } + } + + RangeDeps rangeDeps = builder.directRangeDeps; + // TODO (required, consider): slice to only those ranges we own, maybe don't even construct rangeDeps.covering() + foldl(participants, (e, s, d, b) -> { + int bootstrapIdx = d.txnIds().find(e.bootstrappedAt); + if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx; + s.bootstrapIdx = bootstrapIdx; + + int appliedIdx = d.txnIds().find(e.locallyAppliedOrInvalidatedBefore); + if (appliedIdx < 0) appliedIdx = -1 - appliedIdx; + s.appliedIdx = appliedIdx; + + // remove intersecting transactions with known redundant txnId + if (appliedIdx > bootstrapIdx) + { + // TODO (desired): + // TODO (desired): move the bounds check into forEach, matching structure used for keys + d.forEach(e.range, b, s, (b0, s0, txnIdx) -> { + if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx) + b0.removeWaitingOnDirectRangeTxnId(txnIdx); + }); + } + + if (bootstrapIdx > 0) + { + // if we have any ranges where bootstrap is involved, we have to do a more complicated dance since + // this may imply only partial redundancy (we may still depend on the transaction for some other range) + s.range = e.range; + // TODO (desired): move the bounds check into forEach, matching structure used for keys + d.forEach(e.range, b, s, (b0, s0, txnIdx) -> { + if (txnIdx < s0.bootstrapIdx && b0.isWaitingOnDirectRangeTxnIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx)) + b0.removeWaitingOnDirectRangeTxnId(txnIdx); + }); + } + return s; + }, new RangeState(), rangeDeps, builder, ignore -> false); + } + + public final boolean hasLocallyRedundantDependencies(TxnId minimumDependencyId, Timestamp executeAt, Participants<?> participantsOfWaitingTxn) + { + // TODO (required): consider race conditions when bootstrapping into an active command store, that may have seen a higher txnId than this? + // might benefit from maintaining a per-CommandStore largest TxnId register to ensure we allocate a higher TxnId for our ExclSync, + // or from using whatever summary records we have for the range, once we maintain them + return status(minimumDependencyId, participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE) >= 0; + } } diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index dd25ad3e..bb2bf7e8 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -153,7 +153,7 @@ public abstract class SafeCommandStore protected SafeCommandsForKey maybeCleanup(SafeCommandsForKey safeCfk) { - RedundantBefore.Entry entry = commandStore().redundantBefore().get(safeCfk.key().toUnseekable()); + RedundantBefore.Entry entry = redundantBefore().get(safeCfk.key().toUnseekable()); if (entry != null) safeCfk.updateRedundantBefore(this, entry); return safeCfk; @@ -356,6 +356,26 @@ public abstract class SafeCommandStore public abstract NodeTimeService time(); public abstract CommandStores.RangesForEpoch ranges(); + protected NavigableMap<TxnId, Ranges> bootstrapBeganAt() + { + return commandStore().unsafeGetBootstrapBeganAt(); + } + + protected NavigableMap<Timestamp, Ranges> safeToReadAt() + { + return commandStore().unsafeGetSafeToRead(); + } + + public RedundantBefore redundantBefore() + { + return commandStore().unsafeGetRedundantBefore(); + } + + public DurableBefore durableBefore() + { + return commandStore().unsafeGetDurableBefore(); + } + public Ranges futureRanges(TxnId txnId) { return ranges().allBefore(txnId.epoch()); @@ -376,11 +396,21 @@ public abstract class SafeCommandStore return ranges().allBetween(txnId.epoch(), untilLocalEpoch); } + public final Ranges safeToReadAt(Timestamp at) + { + return safeToReadAt().lowerEntry(at).getValue(); + } + + public @Nonnull Ranges unsafeToReadAt(Timestamp at) + { + return ranges().allAt(at).without(safeToReadAt(at)); + } + // if we have to re-bootstrap (due to failed bootstrap or catching up on a range) then we may // have dangling redundant commands; these can safely be executed locally because we are a timestamp store final boolean isFullyPreBootstrapOrStale(Command command, Participants<?> forKeys) { - return commandStore().redundantBefore().preBootstrapOrStale(command.txnId(), forKeys) == FULLY; + return redundantBefore().preBootstrapOrStale(command.txnId(), forKeys) == FULLY; } public void registerListener(SafeCommand listeningTo, SaveStatus await, TxnId waiting) diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java b/accord-core/src/main/java/accord/local/StoreParticipants.java index 03753722..0ba6f6a3 100644 --- a/accord-core/src/main/java/accord/local/StoreParticipants.java +++ b/accord-core/src/main/java/accord/local/StoreParticipants.java @@ -173,7 +173,7 @@ public class StoreParticipants ? safeStore.ranges().all() : safeStore.ranges().allAt(executeAt.epoch()); - return safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges); + return safeStore.redundantBefore().removePreBootstrap(txnId, ranges); } public Participants<?> executes(SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt) diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index 55332acf..d36f7eb9 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -1714,11 +1714,6 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm return i; } - public TxnId findFirst() - { - return byId.length > 0 ? byId[0] : null; - } - @Override public boolean equals(Object o) { diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java index 8207dd6e..18e24219 100644 --- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java @@ -113,6 +113,6 @@ public abstract class SafeCommandsForKey implements SafeState<CommandsForKey> public void refresh(SafeCommandStore safeStore) { - updateRedundantBefore(safeStore, safeStore.commandStore().redundantBefore().get(key)); + updateRedundantBefore(safeStore, safeStore.redundantBefore().get(key)); } } diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java index 1b901e8f..bcb01261 100644 --- a/accord-core/src/main/java/accord/messages/PreAccept.java +++ b/accord-core/src/main/java/accord/messages/PreAccept.java @@ -263,7 +263,7 @@ public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> }, executeAt.equals(txnId) ? null : txnId, builder); // TODO (required): make sure any sync point is in the past - Deps redundant = safeStore.commandStore().redundantBefore().collectDeps(participants.touches(), redundantBuilder, minEpoch, executeAt).build(); + Deps redundant = safeStore.redundantBefore().collectDeps(participants.touches(), redundantBuilder, minEpoch, executeAt).build(); Deps result = builder.build().with(redundant); Invariants.checkState(!result.contains(txnId)); return result; diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index f819dadc..a50f7deb 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -354,7 +354,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt if (participants.owns().isEmpty()) return known.knownForAny(); - RedundantStatus status = safeStore.commandStore().redundantBefore().status(txnId, participants.owns()); + RedundantStatus status = safeStore.redundantBefore().status(txnId, participants.owns()); // if our peers have truncated this command, then either: // 1) we have already applied it locally; 2) the command doesn't apply locally; 3) we are stale; or 4) the command is invalidated @@ -367,7 +367,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt } Ranges ranges = safeStore.ranges().allSince(txnId.epoch()); - ranges = safeStore.commandStore().redundantBefore().everExpectToExecute(txnId, ranges); + ranges = safeStore.redundantBefore().everExpectToExecute(txnId, ranges); if (!ranges.isEmpty()) { // even though command stores only lose ranges, we still adopt ranges as of some epoch, and re-bootstrap. @@ -384,14 +384,14 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt return null; Participants<?> executes = participants.executes(safeStore, txnId, executeAtIfKnown); - status = safeStore.commandStore().redundantBefore().status(txnId, executes); + status = safeStore.redundantBefore().status(txnId, executes); if (tryPurge(safeStore, safeCommand, status)) return null; // compute the ranges we expect to execute - i.e. those we own, and are not stale or pre-bootstrap // TODO (required): use StoreParticipants.executes Ranges ranges = safeStore.ranges().allAt(executeAtIfKnown.epoch()); - ranges = safeStore.commandStore().redundantBefore().expectToExecute(txnId, executeAtIfKnown, ranges); + ranges = safeStore.redundantBefore().expectToExecute(txnId, executeAtIfKnown, ranges); if (ranges.isEmpty() || (executes = executes.slice(ranges, Minimal)).isEmpty()) { // TODO (expected): we might prefer to adopt Redundant status, and permit ourselves to later accept the result of the execution and/or definition diff --git a/accord-core/src/main/java/accord/messages/QueryDurableBefore.java b/accord-core/src/main/java/accord/messages/QueryDurableBefore.java index 057e9a38..5c9d694c 100644 --- a/accord-core/src/main/java/accord/messages/QueryDurableBefore.java +++ b/accord-core/src/main/java/accord/messages/QueryDurableBefore.java @@ -42,7 +42,7 @@ public class QueryDurableBefore extends AbstractEpochRequest<QueryDurableBefore. @Override public DurableBeforeReply apply(SafeCommandStore safeStore) { - return new DurableBeforeReply(safeStore.commandStore().durableBefore()); + return new DurableBeforeReply(safeStore.durableBefore()); } @Override diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 7dfa12d0..5d75f0af 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -285,7 +285,7 @@ public abstract class ReadData extends AbstractEpochRequest<ReadData.CommitOrRea { Timestamp executeAt = command.executesAtLeast(); // TODO (required): for awaitsOnlyDeps commands, if we cannot infer an actual executeAtLeast we should confirm no situation where txnId is not an adequately conservative value for unavailable/unsafeToRead - return safeStore.ranges().unsafeToReadAt(executeAt); + return safeStore.unsafeToReadAt(executeAt); } void read(SafeCommandStore safeStore, Command command) diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java index 3c396ec7..56093563 100644 --- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java +++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java @@ -48,7 +48,7 @@ public class SetGloballyDurable extends AbstractEpochRequest<SimpleReply> @Override public SimpleReply apply(SafeCommandStore safeStore) { - DurableBefore cur = safeStore.commandStore().durableBefore(); + DurableBefore cur = safeStore.durableBefore(); DurableBefore upd = DurableBefore.merge(durableBefore, cur); // This is done asynchronously safeStore.upsertDurableBefore(upd); diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java index 1249d574..4211771b 100644 --- a/accord-core/src/main/java/accord/primitives/Timestamp.java +++ b/accord-core/src/main/java/accord/primitives/Timestamp.java @@ -162,6 +162,11 @@ public class Timestamp implements Comparable<Timestamp>, EpochSupplier return minHlc <= hlc() ? this : new Timestamp(epoch(), minHlc, flags(), node); } + public Timestamp withHlcAtLeastAndNoFlags(long minHlc) + { + return minHlc <= hlc() ? this : new Timestamp(epoch(), minHlc, 0, node); + } + public Timestamp withEpoch(long epoch) { return epoch == epoch() ? this : new Timestamp(epoch, hlc(), flags(), node); diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java b/accord-core/src/test/java/accord/impl/basic/Journal.java index 1a53472b..6d0b04da 100644 --- a/accord-core/src/test/java/accord/impl/basic/Journal.java +++ b/accord-core/src/test/java/accord/impl/basic/Journal.java @@ -89,8 +89,7 @@ public class Journal Command command = reconstruct(diffs, Reconstruct.Last).get(0); if (command.status() == Truncated || command.status() == Invalidated) continue; // Already truncated - StoreParticipants participants = Invariants.nonNull(command.participants()); - Cleanup cleanup = Cleanup.shouldCleanup(store, command, participants); + Cleanup cleanup = Cleanup.shouldCleanup(command, store.unsafeGetRedundantBefore(), store.unsafeGetDurableBefore()); switch (cleanup) { case NO: diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java index eb765767..915442ad 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRead.java +++ b/accord-core/src/test/java/accord/impl/list/ListRead.java @@ -70,7 +70,7 @@ public class ListRead implements Read ListStore s = (ListStore)store; logger.trace("submitting READ on {} at {} key:{}", s.node, executeAt, key); return executor.apply(safeStore.commandStore()).submit(() -> { - Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt); + Ranges unavailable = safeStore.unsafeToReadAt(executeAt); ListData result = new ListData(); switch (key.domain()) { diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java index 11d99a57..a32c08fa 100644 --- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java +++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java @@ -105,7 +105,7 @@ public class PreAcceptTest commandStore.execute(PreLoadContext.contextFor(txnId, txn.keys().toParticipants()), safeStore -> { CommandsForKey cfk = safeStore.get(key.toUnseekable()).current(); - TxnId commandId = cfk.findFirst(); + TxnId commandId = cfk.get(0).plainTxnId(); Command command = safeStore.ifInitialised(commandId).current(); Assertions.assertEquals(Status.PreAccepted, command.status()); }).begin(new RethrowAgent()); @@ -274,7 +274,7 @@ public class PreAcceptTest commandStore.execute(PreLoadContext.contextFor(txnId, txn.keys().toParticipants()), safeStore -> { CommandsForKey cfk = safeStore.get(key.toUnseekable()).current(); - TxnId commandId = cfk.findFirst(); + TxnId commandId = cfk.get(0).plainTxnId(); Command command = safeStore.ifInitialised(commandId).current(); Assertions.assertEquals(Status.PreAccepted, command.status()); }).begin(new RethrowAgent()); diff --git a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java index 374d7b73..e3eee0e4 100644 --- a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java +++ b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java @@ -164,7 +164,7 @@ public class KeyDepsTest expectedTxnId.remove(txnId); Assertions.assertEquals(expectedTxnId, without.txnIds()); - Assertions.assertEquals(Keys.EMPTY, without.participatingKeys(txnId)); + Assertions.assertEquals(RoutingKeys.EMPTY, without.participatingKeys(txnId)); // all other keys are fine? for (TxnId other : deps.test.txnIds()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org