This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 1f6d998835c64e7a4f6ac8bc23c5e052760ac493 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Sep 26 15:59:36 2024 +0200 Make it easier to log changes in rangesForEpoch, durableBefore, redundantBefore, safeToRead, and rangesForEpoch --- .../java/accord/impl/InMemoryCommandStore.java | 13 ++- .../src/main/java/accord/local/CommandStore.java | 94 ++++++++++++---------- .../main/java/accord/local/SafeCommandStore.java | 31 +++++++ .../java/accord/messages/SetGloballyDurable.java | 2 +- 4 files changed, 94 insertions(+), 46 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 6ba93447..e66b25a9 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -479,8 +479,8 @@ public abstract class InMemoryCommandStore extends CommandStore { if (current != null) throw illegalState("Another operation is in progress or it's store was not cleared"); - RangesForEpoch rangesForEpoch = updateRangesForEpoch(); current = createSafeStore(context, rangesForEpoch); + updateRangesForEpoch(current); return current; } @@ -663,7 +663,7 @@ public abstract class InMemoryCommandStore extends CommandStore protected final Map<TxnId, InMemorySafeCommand> commands; private final Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey; private final Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey; - private final RangesForEpoch ranges; + private RangesForEpoch ranges; public InMemorySafeStore(InMemoryCommandStore commandStore, RangesForEpoch ranges, @@ -677,7 +677,7 @@ public abstract class InMemoryCommandStore extends CommandStore this.commands = commands; this.commandsForKey = commandsForKey; this.timestampsForKey = timestampsForKey; - this.ranges = Invariants.nonNull(ranges); + this.ranges = ranges; } @Override @@ -797,6 +797,13 @@ public abstract class InMemoryCommandStore extends CommandStore return ranges; } + @Override + public void setRangesForEpoch(RangesForEpoch rangesForEpoch) + { + super.setRangesForEpoch(rangesForEpoch); + ranges = rangesForEpoch; + } + @Override public NodeTimeService time() { diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 966c4418..2ebe1c44 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -33,7 +33,6 @@ import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,20 +208,19 @@ public abstract class CommandStore implements AgentExecutor return agent; } - public RangesForEpoch updateRangesForEpoch() + public void updateRangesForEpoch(SafeCommandStore safeStore) { EpochUpdate update = epochUpdateHolder.get(); if (update == null) - return rangesForEpoch; + return; update = epochUpdateHolder.getAndSet(null); if (!update.addGlobalRanges.isEmpty()) - upsertDurableBefore(DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)); + safeStore.upsertDurableBefore(DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)); if (update.addRedundantBefore.size() > 0) - upsertRedundantBefore(update.addRedundantBefore); + safeStore.upsertRedundantBefore(update.addRedundantBefore); if (update.newRangesForEpoch != null) - rangesForEpoch = update.newRangesForEpoch; - return rangesForEpoch; + safeStore.setRangesForEpoch(update.newRangesForEpoch); } public RangesForEpoch unsafeRangesForEpoch() @@ -250,8 +248,7 @@ public abstract class CommandStore implements AgentExecutor protected abstract void registerHistoricalTransactions(Deps deps, SafeCommandStore safeStore); - // implementations are expected to override this for persistence - public void upsertDurableBefore(DurableBefore addDurableBefore) + protected void upsertDurableBefore(DurableBefore addDurableBefore) { durableBefore = DurableBefore.merge(durableBefore, addDurableBefore); } @@ -261,6 +258,7 @@ public abstract class CommandStore implements AgentExecutor this.rejectBefore = newRejectBefore; } + // Should be called _only_ via safe command store protected void upsertRedundantBefore(RedundantBefore addRedundantBefore) { redundantBefore = RedundantBefore.merge(redundantBefore, addRedundantBefore); @@ -276,6 +274,13 @@ public abstract class CommandStore implements AgentExecutor redundantBefore = newRedundantBefore; } + protected void upsertRejectBefore(TxnId txnId, Ranges ranges) + { + ReducingRangeMap<Timestamp> newRejectBefore = rejectBefore != null ? rejectBefore : new ReducingRangeMap<>(); + newRejectBefore = ReducingRangeMap.add(newRejectBefore, ranges, txnId, Timestamp::max); + unsafeSetRejectBefore(newRejectBefore); + } + /** * This method may be invoked on a non-CommandStore thread */ @@ -312,9 +317,7 @@ public abstract class CommandStore implements AgentExecutor { // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint); - ReducingRangeMap<Timestamp> newRejectBefore = rejectBefore != null ? rejectBefore : new ReducingRangeMap<>(); - newRejectBefore = ReducingRangeMap.add(newRejectBefore, ranges, txnId, Timestamp::max); - unsafeSetRejectBefore(newRejectBefore); + safeStore.upsertRejectBefore(txnId, ranges); } public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) @@ -515,10 +518,11 @@ public abstract class CommandStore implements AgentExecutor final void markBootstrapping(SafeCommandStore safeStore, TxnId globalSyncId, Ranges ranges) { - unsafeSetBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); + safeStore.upsertSetBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, globalSyncId); - upsertRedundantBefore(addRedundantBefore); - upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE)); + safeStore.upsertRedundantBefore(addRedundantBefore); + safeStore.upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE)); + // TODO: can we use `upsert` for notifications? updatedRedundantBefore(safeStore, globalSyncId, ranges); } @@ -527,9 +531,9 @@ public abstract class CommandStore implements AgentExecutor { final Ranges slicedRanges = durableRanges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal); RedundantBefore addShardRedundant = RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE, TxnId.NONE); - upsertRedundantBefore(addShardRedundant); + safeStore.upsertRedundantBefore(addShardRedundant); DurableBefore addDurableBefore = DurableBefore.create(slicedRanges, globalSyncId, globalSyncId); - upsertDurableBefore(addDurableBefore); + safeStore.upsertDurableBefore(addDurableBefore); updatedRedundantBefore(safeStore, globalSyncId, slicedRanges); safeStore = safeStore; // make unusable in lambda safeStore.dataStore().snapshot(slicedRanges, globalSyncId).begin((success, fail) -> { @@ -541,7 +545,7 @@ public abstract class CommandStore implements AgentExecutor execute(PreLoadContext.empty(), safeStore0 -> { RedundantBefore addGc = RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId, TxnId.NONE); - upsertRedundantBefore(addGc); + safeStore0.upsertRedundantBefore(addGc); }); }); } @@ -569,9 +573,10 @@ public abstract class CommandStore implements AgentExecutor agent.onStale(staleSince, ranges); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast); - upsertRedundantBefore(addRedundantBefore); + safeStore.upsertRedundantBefore(addRedundantBefore); // find which ranges need to bootstrap, subtracting those already in progress that cover the id + // safeStore.upsertUnsafeToRead(ranges); markUnsafeToRead(ranges); } @@ -583,19 +588,24 @@ public abstract class CommandStore implements AgentExecutor // with safeToRead Supplier<EpochReady> initialise(long epoch, Ranges ranges) { - // Merge in a base for any ranges that needs to be covered - DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); - upsertDurableBefore(addDurableBefore); - // TODO (review): Convoluted check to not overwrite existing bootstraps with TxnId.NONE - // If loading from disk didn't finish before this then we might initialize the range at TxnId.NONE? - // Does CommandStores.topology ensure that doesn't happen? Is it fine if it does because it will get superseded? - Ranges newBootstrapRanges = ranges; - for (Ranges existing : bootstrapBeganAt.values()) - newBootstrapRanges = newBootstrapRanges.without(existing); - if (!newBootstrapRanges.isEmpty()) - bootstrapBeganAt = bootstrap(TxnId.NONE, newBootstrapRanges, bootstrapBeganAt); - safeToRead = purgeAndInsert(safeToRead, TxnId.NONE, ranges); - return () -> new EpochReady(epoch, DONE, DONE, DONE, DONE); + return () -> { + AsyncResult<Void> done = execute(empty(), (safeStore) -> { + // Merge in a base for any ranges that needs to be covered + DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); + safeStore.upsertDurableBefore(addDurableBefore); + // TODO (review): Convoluted check to not overwrite existing bootstraps with TxnId.NONE + // If loading from disk didn't finish before this then we might initialize the range at TxnId.NONE? + // Does CommandStores.topology ensure that doesn't happen? Is it fine if it does because it will get superseded? + Ranges newBootstrapRanges = ranges; + for (Ranges existing : bootstrapBeganAt.values()) + newBootstrapRanges = newBootstrapRanges.without(existing); + if (!newBootstrapRanges.isEmpty()) + bootstrapBeganAt = bootstrap(TxnId.NONE, newBootstrapRanges, bootstrapBeganAt); + safeStore.upsertSafeToRead(purgeAndInsert(safeToRead, TxnId.NONE, ranges)); + }).beginAsResult(); + + return new EpochReady(epoch, DONE, DONE, DONE, DONE); + }; } public final Ranges safeToReadAt(Timestamp at) @@ -619,12 +629,6 @@ public abstract class CommandStore implements AgentExecutor return durableBefore; } - @VisibleForTesting - public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return bootstrapBeganAt; } - - @VisibleForTesting - public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; } - public final boolean isRejectedIfNotPreAccepted(TxnId txnId, Unseekables<?> participants) { if (rejectBefore == null) @@ -773,16 +777,22 @@ public abstract class CommandStore implements AgentExecutor return redundantBefore.status(minimumDependencyId, executeAt, participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE) >= 0; } - final synchronized void markUnsafeToRead(Ranges ranges) + final void markUnsafeToRead(Ranges ranges) { if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges))) - unsafeSetSafeToRead(purgeHistory(safeToRead, ranges)); + { + execute(empty(), safeStore -> { + safeStore.upsertSafeToRead(purgeHistory(safeToRead, ranges)); + }); + } } final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp at, Ranges ranges) { - Ranges validatedSafeToRead = redundantBefore.validateSafeToRead(forBootstrapAt, ranges); - unsafeSetSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead)); + execute(empty(), safeStore -> { + Ranges validatedSafeToRead = redundantBefore.validateSafeToRead(forBootstrapAt, ranges); + safeStore.upsertSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead)); + }); } protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, Ranges ranges, NavigableMap<TxnId, Ranges> bootstrappedAt) diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index c2a02d18..82ba1c06 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -18,6 +18,7 @@ package accord.local; +import java.util.NavigableMap; import javax.annotation.Nullable; import accord.api.Agent; @@ -248,6 +249,36 @@ public abstract class SafeCommandStore commandStore().updateMaxConflicts(prev, updated); } + public void upsertRedundantBefore(RedundantBefore addRedundantBefore) + { + commandStore().upsertRedundantBefore(addRedundantBefore); + } + + public void upsertSetBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) + { + commandStore().unsafeSetBootstrapBeganAt(newBootstrapBeganAt); + } + + public void upsertDurableBefore(DurableBefore addDurableBefore) + { + commandStore().upsertDurableBefore(addDurableBefore); + } + + public void upsertSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) + { + commandStore().unsafeSetSafeToRead(newSafeToRead); + } + + public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) + { + commandStore().unsafeSetRangesForEpoch(rangesForEpoch); + } + + public void upsertRejectBefore(TxnId txnId, Ranges ranges) + { + commandStore().upsertRejectBefore(txnId, ranges); + } + public void updateCommandsForKey(Command prev, Command next) { if (!CommandsForKey.needsUpdate(prev, next)) diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java index 6a295d88..7d912435 100644 --- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java +++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java @@ -52,7 +52,7 @@ public class SetGloballyDurable extends AbstractEpochRequest<SimpleReply> DurableBefore cur = safeStore.commandStore().durableBefore(); DurableBefore upd = DurableBefore.merge(durableBefore, cur); // This is done asynchronously - safeStore.commandStore().upsertDurableBefore(upd); + safeStore.upsertDurableBefore(upd); return Ok; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org