This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit c6b2cbaac852c5c2453ff9b289db0866ccfa0345 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Sep 25 18:23:10 2024 +0100 wip: journal replay finishing touches --- .../src/main/java/accord/api/DataStore.java | 3 +- .../java/accord/impl/InMemoryCommandStore.java | 48 +++++++++--- .../src/main/java/accord/local/CommandStore.java | 85 ++++++++++------------ .../accord/impl/list/ListFetchCoordinator.java | 5 +- .../src/test/java/accord/impl/list/ListStore.java | 12 ++- .../src/test/java/accord/impl/mock/MockStore.java | 8 ++ .../main/java/accord/maelstrom/MaelstromStore.java | 9 +++ 7 files changed, 109 insertions(+), 61 deletions(-) diff --git a/accord-core/src/main/java/accord/api/DataStore.java b/accord-core/src/main/java/accord/api/DataStore.java index a93ff073..45d62101 100644 --- a/accord-core/src/main/java/accord/api/DataStore.java +++ b/accord-core/src/main/java/accord/api/DataStore.java @@ -25,6 +25,7 @@ import accord.local.SafeCommandStore; import accord.primitives.Ranges; import accord.primitives.SyncPoint; import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; @@ -112,6 +113,6 @@ public interface DataStore } FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback); - default AsyncResult<Void> snapshot(Ranges ranges) { return AsyncResults.success(null); }; + AsyncResult<Void> snapshot(Ranges ranges, TxnId before); default void restoreFromSnapshot() {}; } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 1b474779..2793b6e9 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -56,10 +56,12 @@ import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores.RangesForEpoch; import accord.local.Commands; +import accord.local.DurableBefore; import accord.local.KeyHistory; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.PreLoadContext; +import accord.local.RedundantBefore; import accord.local.RedundantStatus; import accord.local.SafeCommand; import accord.local.SafeCommandStore; @@ -84,6 +86,7 @@ import accord.primitives.Timestamp; import accord.primitives.Txn.Kind.Kinds; import accord.primitives.TxnId; import accord.utils.Invariants; +import accord.utils.ReducingRangeMap; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; @@ -428,15 +431,6 @@ public abstract class InMemoryCommandStore extends CommandStore return new InMemorySafeStore(this, ranges, context, commands, timestampsForKey, commandsForKeys); } - private void loadCommandsForKey(RoutableKey key, - KeyHistory keyHistory, - Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey, - Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey) - { - commandsForKey.put(key, commandsForKey((Key) key).createSafeReference()); - timestampsForKey.put(key, timestampsForKey((Key) key).createSafeReference()); - } - protected void validateRead(Command current) {} protected final InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges) @@ -1466,4 +1460,40 @@ public abstract class InMemoryCommandStore extends CommandStore historicalRangeCommands.merge(txnId, ranges.slice(allRanges), Ranges::with); }); } + + @Override + public void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch) + { + super.unsafeSetRangesForEpoch(newRangesForEpoch); + } + + @Override + public void unsafeSetDurableBefore(DurableBefore newDurableBefore) + { + super.unsafeSetDurableBefore(newDurableBefore); + } + + @Override + public void unsafeSetRedundantBefore(RedundantBefore newRedundantBefore) + { + super.unsafeSetRedundantBefore(newRedundantBefore); + } + + @Override + public void unsafeSetRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore) + { + super.unsafeSetRejectBefore(newRejectBefore); + } + + @Override + public void unsafeSetSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) + { + super.unsafeSetSafeToRead(newSafeToRead); + } + + @Override + public void unsafeSetBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) + { + super.unsafeSetBootstrapBeganAt(newBootstrapBeganAt); + } } diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 298d0c04..966c4418 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -33,6 +33,7 @@ import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,6 @@ import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.api.VisibleForImplementationTesting; import accord.coordinate.CollectCalculatedDeps; import accord.local.Command.WaitingOn; import accord.local.CommandStores.RangesForEpoch; @@ -217,9 +217,9 @@ public abstract class CommandStore implements AgentExecutor update = epochUpdateHolder.getAndSet(null); if (!update.addGlobalRanges.isEmpty()) - setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE))); + upsertDurableBefore(DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)); if (update.addRedundantBefore.size() > 0) - setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore)); + upsertRedundantBefore(update.addRedundantBefore); if (update.newRangesForEpoch != null) rangesForEpoch = update.newRangesForEpoch; return rangesForEpoch; @@ -230,6 +230,11 @@ public abstract class CommandStore implements AgentExecutor return rangesForEpoch; } + protected void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch) + { + rangesForEpoch = newRangesForEpoch; + } + public abstract boolean inStore(); public void maybeExecuteImmediately(Runnable task) @@ -246,39 +251,42 @@ public abstract class CommandStore implements AgentExecutor protected abstract void registerHistoricalTransactions(Deps deps, SafeCommandStore safeStore); // implementations are expected to override this for persistence - protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore) + public void upsertDurableBefore(DurableBefore addDurableBefore) { - this.rejectBefore = newRejectBefore; + durableBefore = DurableBefore.merge(durableBefore, addDurableBefore); } - protected final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) + protected void unsafeSetRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore) { - this.bootstrapBeganAt = newBootstrapBeganAt; + this.rejectBefore = newRejectBefore; } - public DurableBefore durableBefore() + protected void upsertRedundantBefore(RedundantBefore addRedundantBefore) { - return durableBefore; + redundantBefore = RedundantBefore.merge(redundantBefore, addRedundantBefore); } - public final void upsertDurableBefore(DurableBefore addDurableBefore) + protected void unsafeSetDurableBefore(DurableBefore newDurableBefore) { - durableBefore = DurableBefore.merge(durableBefore, addDurableBefore); + durableBefore = newDurableBefore; } - protected final void setDurableBefore(DurableBefore newDurableBefore) + protected void unsafeSetRedundantBefore(RedundantBefore newRedundantBefore) { - durableBefore = newDurableBefore; + redundantBefore = newRedundantBefore; } - protected void upsertRedundantBefore(RedundantBefore addRedundantBefore) + /** + * This method may be invoked on a non-CommandStore thread + */ + protected synchronized void unsafeSetSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) { - redundantBefore = RedundantBefore.merge(redundantBefore, addRedundantBefore); + this.safeToRead = newSafeToRead; } - protected void setRedundantBefore(RedundantBefore newRedundantBefore) + protected void unsafeSetBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) { - redundantBefore = newRedundantBefore; + this.bootstrapBeganAt = newBootstrapBeganAt; } /** @@ -300,21 +308,13 @@ public abstract class CommandStore implements AgentExecutor setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt)); } - /** - * This method may be invoked on a non-CommandStore thread - */ - protected final synchronized void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) - { - this.safeToRead = newSafeToRead; - } - public final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint); ReducingRangeMap<Timestamp> newRejectBefore = rejectBefore != null ? rejectBefore : new ReducingRangeMap<>(); newRejectBefore = ReducingRangeMap.add(newRejectBefore, ranges, txnId, Timestamp::max); - setRejectBefore(newRejectBefore); + unsafeSetRejectBefore(newRejectBefore); } public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) @@ -322,7 +322,7 @@ public abstract class CommandStore implements AgentExecutor // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint); RedundantBefore newRedundantBefore = RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, TxnId.NONE, TxnId.NONE, TxnId.NONE)); - setRedundantBefore(newRedundantBefore); + unsafeSetRedundantBefore(newRedundantBefore); updatedRedundantBefore(safeStore, txnId, ranges); } @@ -515,7 +515,7 @@ public abstract class CommandStore implements AgentExecutor final void markBootstrapping(SafeCommandStore safeStore, TxnId globalSyncId, Ranges ranges) { - setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); + unsafeSetBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, globalSyncId); upsertRedundantBefore(addRedundantBefore); upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE)); @@ -532,7 +532,7 @@ public abstract class CommandStore implements AgentExecutor upsertDurableBefore(addDurableBefore); updatedRedundantBefore(safeStore, globalSyncId, slicedRanges); safeStore = safeStore; // make unusable in lambda - safeStore.dataStore().snapshot(slicedRanges).begin((success, fail) -> { + safeStore.dataStore().snapshot(slicedRanges, globalSyncId).begin((success, fail) -> { if (fail != null) { logger.error("Unsuccessful dataStore snapshot; unable to update GC markers", fail); @@ -569,7 +569,7 @@ public abstract class CommandStore implements AgentExecutor agent.onStale(staleSince, ranges); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast); - setRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore)); + upsertRedundantBefore(addRedundantBefore); // find which ranges need to bootstrap, subtracting those already in progress that cover the id markUnsafeToRead(ranges); @@ -614,10 +614,15 @@ public abstract class CommandStore implements AgentExecutor return redundantBefore; } - @VisibleForImplementationTesting + public DurableBefore durableBefore() + { + return durableBefore; + } + + @VisibleForTesting public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return bootstrapBeganAt; } - @VisibleForImplementationTesting + @VisibleForTesting public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; } public final boolean isRejectedIfNotPreAccepted(TxnId txnId, Unseekables<?> participants) @@ -771,25 +776,13 @@ public abstract class CommandStore implements AgentExecutor final synchronized void markUnsafeToRead(Ranges ranges) { if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges))) - setSafeToRead(purgeHistory(safeToRead, ranges)); + unsafeSetSafeToRead(purgeHistory(safeToRead, ranges)); } final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp at, Ranges ranges) { Ranges validatedSafeToRead = redundantBefore.validateSafeToRead(forBootstrapAt, ranges); - setSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead)); - } - - protected static class BootstrapSyncPoint - { - final TxnId syncTxnId; - final Ranges ranges; - - protected BootstrapSyncPoint(TxnId syncTxnId, Ranges ranges) - { - this.syncTxnId = syncTxnId; - this.ranges = ranges; - } + unsafeSetSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead)); } protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, Ranges ranges, NavigableMap<TxnId, Ranges> bootstrappedAt) diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java index 575dda1b..a4687514 100644 --- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -66,10 +66,7 @@ public class ListFetchCoordinator extends AbstractFetchCoordinator persisting.add(commandStore.execute(PreLoadContext.empty(), safeStore -> { listData.forEach((key, value) -> listStore.data.merge(key, value, Timestamped::merge)); }).addCallback((ignore, fail) -> { - if (fail == null) { - success(from, received); - listStore.snapshot(); - } + if (fail == null) success(from, received); else fail(from, received, fail); }).beginAsResult()); } diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 81416540..2473d6dc 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -41,8 +41,13 @@ import accord.coordinate.Timeout; import accord.coordinate.TopologyMismatch; import accord.coordinate.tracking.AllTracker; import accord.coordinate.tracking.RequestStatus; +import accord.impl.InMemoryCommandStore; import accord.impl.basic.SimulatedFault; +import accord.local.CommandStore; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.DurableBefore; import accord.local.Node; +import accord.local.RedundantBefore; import accord.local.SafeCommandStore; import accord.messages.Callback; import accord.messages.ReadData; @@ -59,6 +64,7 @@ import accord.topology.Topologies; import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.RandomSource; +import accord.utils.ReducingRangeMap; import accord.utils.Timestamped; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; @@ -194,7 +200,7 @@ public class ListStore implements DataStore private final Deque<PendingSnapshot> pendingSnapshots = new ArrayDeque<>(); private long pendingDelay = 0; - public AsyncResult<Void> snapshot() + public AsyncResult<Void> snapshot(Ranges ranges, TxnId before) { Snapshot snapshot = new Snapshot(data, addedAts, removedAts, purgedAts, fetchCompletes, pendingRemoves); AsyncResult.Settable<Void> result = new AsyncResults.SettableResult<>(); @@ -227,12 +233,16 @@ public class ListStore implements DataStore public void restoreFromSnapshot() { + if (snapshot == null) + return; + data.putAll(snapshot.data); addedAts.addAll(snapshot.addedAts); removedAts.addAll(snapshot.removedAts); purgedAts.addAll(snapshot.purgedAts); fetchCompletes.addAll(snapshot.fetchCompletes); pendingRemoves.addAll(snapshot.pendingRemoves); + InMemoryCommandStore commandStore = (InMemoryCommandStore) CommandStore.current(); } public void clear() diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java index e3613fb2..803c7efc 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockStore.java +++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java @@ -33,9 +33,11 @@ import accord.primitives.Seekable; import accord.primitives.Seekables; import accord.primitives.SyncPoint; import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; public class MockStore implements DataStore @@ -149,4 +151,10 @@ public class MockStore implements DataStore callback.fetched(ranges); return new ImmediateFetchFuture(ranges); } + + @Override + public AsyncResult<Void> snapshot(Ranges ranges, TxnId before) + { + return AsyncResults.success(null); + } } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java index 013db36d..777700cc 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java @@ -27,7 +27,10 @@ import accord.local.Node; import accord.local.SafeCommandStore; import accord.primitives.Ranges; import accord.primitives.SyncPoint; +import accord.primitives.TxnId; import accord.utils.Timestamped; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import accord.utils.async.AsyncResults.SettableResult; public class MaelstromStore implements DataStore @@ -57,4 +60,10 @@ public class MaelstromStore implements DataStore { return new ImmediateFetchResult(ranges); } + + @Override + public AsyncResult<Void> snapshot(Ranges ranges, TxnId before) + { + return AsyncResults.success(null); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org