This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit aa1d855c9e8863a0c4e72f255e26a5b161b6767d Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Tue Sep 24 22:44:30 2024 +0200 Fixes after rebase and an attempt to reconstruct redundant before --- .../java/accord/impl/InMemoryCommandStore.java | 58 +++++++++++++++++----- .../src/main/java/accord/local/Bootstrap.java | 32 ++++++------ .../src/main/java/accord/local/Cleanup.java | 2 +- .../src/main/java/accord/local/CommandStore.java | 43 ++++++---------- .../src/main/java/accord/local/Commands.java | 4 +- .../main/java/accord/local/SafeCommandStore.java | 2 +- .../src/test/java/accord/burn/BurnTest.java | 3 +- .../accord/impl/basic/DelayedCommandStores.java | 1 - 8 files changed, 83 insertions(+), 62 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index c74e9f27..65bc1618 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -42,6 +42,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,16 +51,19 @@ import accord.api.DataStore; import accord.api.Key; import accord.api.LocalListeners; import accord.api.ProgressLog; +import accord.api.Scheduler; import accord.impl.progresslog.DefaultProgressLog; import accord.local.Cleanup; import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores.RangesForEpoch; import accord.local.Commands; +import accord.local.DurableBefore; import accord.local.KeyHistory; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.PreLoadContext; +import accord.local.RedundantBefore; import accord.local.RedundantStatus; import accord.local.SafeCommand; import accord.local.SafeCommandStore; @@ -127,16 +131,36 @@ public abstract class InMemoryCommandStore extends CommandStore // To simulate the delay in simulatedAsyncPersist private final Scheduler scheduler; - private static <T> FieldPersister<T> simulatedAsyncPersistFactory(Scheduler scheduler) - { - return (commandStore, toPersist) -> simulatedAsyncPersist(scheduler, commandStore, toPersist); - } - private static <T> AsyncResult<?> simulatedAsyncPersist(Scheduler scheduler, CommandStore store, T toPersist) + private static final class SimulatedFieldPersister<T> implements FieldPersister<T> { - AsyncResult.Settable<?> result = AsyncResults.settable(); - scheduler.once(() -> result.trySuccess(null), 100, TimeUnit.MICROSECONDS); - return result; + private T lastValue; + private final Scheduler scheduler; + private final Node node; + private final int id; + public SimulatedFieldPersister(Scheduler scheduler, T defaultValue, Node node, int id) + { + this.scheduler = scheduler; + this.lastValue = defaultValue; + this.node = node; + this.id = id; + } + + public AsyncResult<?> persist(CommandStore store, T toPersist) + { + System.out.println("Persisting for " + node.id() + "-store-" + id); + AsyncResult.Settable<?> result = AsyncResults.settable(); + scheduler.once(() -> { + lastValue = toPersist; + result.trySuccess(null); + }, 100, TimeUnit.MICROSECONDS); + return result; + } + + public T restore() + { + return lastValue; + } } public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) @@ -148,10 +172,10 @@ public abstract class InMemoryCommandStore extends CommandStore progressLogFactory, listenersFactory, epochUpdateHolder, - simulatedAsyncPersistFactory(scheduler), - simulatedAsyncPersistFactory(scheduler), - simulatedAsyncPersistFactory(scheduler), - simulatedAsyncPersistFactory(scheduler)); + new SimulatedFieldPersister<>(scheduler, DurableBefore.EMPTY, (Node) time, id), + new SimulatedFieldPersister<>(scheduler, RedundantBefore.EMPTY, (Node) time, id), + new SimulatedFieldPersister<>(scheduler, ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id), + new SimulatedFieldPersister<>(scheduler, ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id)); this.scheduler = scheduler; } @@ -1358,6 +1382,16 @@ public abstract class InMemoryCommandStore extends CommandStore commandsForKey.clear(); rangeCommands.clear(); historicalRangeCommands.clear(); + + durableBeforePersistentField.clearAndRestore(); + redundantBeforePersistentField.clearAndRestore(); + bootstrapBeganAtPersistentField.clearAndRestore(); + safeToReadPersistentField.clearAndRestore(); + } + + protected void setRedundantBefore(RedundantBefore newRedundantBefore) + { + super.setRedundantBefore(newRedundantBefore); } public interface Loader diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index fe2fa9ed..b1ed9846 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -143,22 +143,22 @@ class Bootstrap // of these ranges as part of this attempt Ranges commitRanges = valid; store.markBootstrapping(safeStore0.commandStore(), globalSyncId, valid).flatMap(ignore -> - CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges) - // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed - .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> { - if (valid.isEmpty()) // we've lost ownership of the range - return AsyncResults.success(Ranges.EMPTY); - - Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, valid); - safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, safeStore1); - return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); - }))) - .flatMap(i -> i) - .flatMap(ranges -> store.execute(contextFor(localSyncId), safeStore -> { - if (!ranges.isEmpty()) - Commands.markBootstrapComplete(safeStore, localSyncId, ranges); - })) - .begin(this); + CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges) + // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed + .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> { + if (valid.isEmpty()) // we've lost ownership of the range + return AsyncResults.success(Ranges.EMPTY); + + Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, valid); + safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, safeStore1); + return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); + }))) + .flatMap(i -> i) + .flatMap(ranges -> store.execute(contextFor(localSyncId), safeStore -> { + if (!ranges.isEmpty()) + Commands.markBootstrapComplete(safeStore, localSyncId, ranges); + }))) + .begin(this); } // we no longer want to fetch these ranges (perhaps we no longer own them) diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index eea389b5..c86192bd 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -135,7 +135,7 @@ public enum Cleanup // - we can impose additional validations here IF we receive an epoch upper bound // - we should be more robust to the presence/absence of executeAt // - be cognisant of future epochs that participated only for PreAccept/Accept, but where txn was not committed to execute in the epoch (this is why we provide null toEpoch here) - illegalState("Command " + txnId + " that is being loaded is not owned by this shard on route " + route); + illegalState(String.format("Command %s that is being loaded is not owned by this shard on route %s. Redundant before: %s", txnId, route, redundantBefore)); } } switch (redundant) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index ab91b32c..b57569ac 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -18,30 +18,6 @@ package accord.local; -import accord.api.LocalListeners; -import accord.api.ProgressLog; -import accord.api.DataStore; -import accord.api.VisibleForImplementationTesting; -import accord.coordinate.CollectCalculatedDeps; -import accord.local.Command.WaitingOn; - -import javax.annotation.Nullable; -import accord.api.Agent; - -import accord.local.CommandStores.RangesForEpoch; -import accord.primitives.Deps; -import accord.primitives.KeyDeps; -import accord.primitives.Keys; -import accord.primitives.Range; -import accord.primitives.Routables; -import accord.utils.async.AsyncChain; - -import accord.api.ConfigurationService.EpochReady; -import accord.utils.DeterministicIdentitySet; -import accord.utils.Invariants; -import accord.utils.ReducingRangeMap; -import accord.utils.async.AsyncResult; - import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Collections; import java.util.HashMap; @@ -75,6 +51,7 @@ import accord.api.VisibleForImplementationTesting; import accord.coordinate.CollectCalculatedDeps; import accord.local.Command.WaitingOn; import accord.local.CommandStores.RangesForEpoch; +import accord.primitives.Deps; import accord.primitives.FullRoute; import accord.primitives.KeyDeps; import accord.primitives.Keys; @@ -212,10 +189,10 @@ public abstract class CommandStore implements AgentExecutor private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private ReducingRangeMap<Timestamp> rejectBefore; - private final PersistentField<DurableBefore, DurableBefore> durableBeforePersistentField; - private final PersistentField<RedundantBefore, RedundantBefore> redundantBeforePersistentField; - private final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, Ranges>> bootstrapBeganAtPersistentField; - private final PersistentField<NavigableMap<Timestamp, Ranges>, NavigableMap<Timestamp, Ranges>> safeToReadPersistentField; + protected final PersistentField<DurableBefore, DurableBefore> durableBeforePersistentField; + protected final PersistentField<RedundantBefore, RedundantBefore> redundantBeforePersistentField; + protected final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, Ranges>> bootstrapBeganAtPersistentField; + protected final PersistentField<NavigableMap<Timestamp, Ranges>, NavigableMap<Timestamp, Ranges>> safeToReadPersistentField; protected CommandStore(int id, NodeTimeService time, @@ -335,7 +312,7 @@ public abstract class CommandStore implements AgentExecutor } // For implementations to use after persistence - protected final void setRedundantBefore(RedundantBefore newRedundantBefore) + protected void setRedundantBefore(RedundantBefore newRedundantBefore) { redundantBefore = newRedundantBefore; } @@ -914,6 +891,8 @@ public abstract class CommandStore implements AgentExecutor } AsyncResult<?> persist(CommandStore store, T toPersist); + + default T restore() { return null; }; } // A helper class for implementing fields that needs to be asynchronously persisted and concurrent updates @@ -945,6 +924,12 @@ public abstract class CommandStore implements AgentExecutor this.set = set; } + public void clearAndRestore() + { + pendingValue = null; + set.accept(persister.restore()); + } + public AsyncResult<?> mergeAndUpdate(@Nonnull I inputValue, @Nullable Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean remergeAfterPersistence) { checkNotNull(merge, "merge cannot be null"); diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 956edf77..5d83e33d 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -75,7 +75,9 @@ import static accord.local.Status.PreApplied; import static accord.local.Status.PreCommitted; import static accord.local.Status.Stable; import static accord.local.Status.Truncated; +import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Route.isFullRoute; +import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; import static accord.utils.Invariants.illegalState; public class Commands @@ -615,7 +617,7 @@ public class Commands return maybeExecute(safeStore, safeCommand, safeCommand.current(), alwaysNotifyListeners, notifyWaitingOn); } - public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, boolean alwaysNotifyListeners, boolean notifyWaitingOn) + public static boolean maybeExecute(SafeCommandStore safeStore0, SafeCommand safeCommand0, Command command, boolean alwaysNotifyListeners, boolean notifyWaitingOn) { if (logger.isTraceEnabled()) logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", command.txnId(), command.status(), alwaysNotifyListeners); diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index c2a02d18..bcabdb90 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -230,7 +230,7 @@ public abstract class SafeCommandStore if (newSaveStatus == Applied && oldSaveStatus != Applied) { Ranges ranges = updated.route().slice(ranges().all(), Minimal).toRanges(); - commandStore().markExclusiveSyncPointLocallyApplied(this, txnId, ranges); + commandStore().markExclusiveSyncPointLocallyApplied(this.commandStore(), txnId, ranges); } } diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index 6d6b1cfa..053d4e7c 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -575,7 +575,8 @@ public class BurnTest @Test public void testOne() { - run(System.nanoTime()); +// run(System.nanoTime()); + run(98634622518625l); } private static void run(long seed) diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index b583b70b..c1cb4fac 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -40,7 +40,6 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.api.Result; import accord.api.Scheduler; import accord.impl.InMemoryCommandStore; import accord.impl.InMemoryCommandStores; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org