This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit cc6c12cb914a03826bfc65635cdd17317eade84e Author: Ariel Weisberg <aweisb...@apple.com> AuthorDate: Thu Sep 19 16:09:36 2024 -0400 Persists metadata syncrhonously --- .../java/accord/impl/InMemoryCommandStore.java | 148 ++++++---- .../java/accord/impl/InMemoryCommandStores.java | 24 +- .../src/main/java/accord/local/Bootstrap.java | 12 +- .../src/main/java/accord/local/CommandStore.java | 307 +++++++++++++++++---- .../src/main/java/accord/local/CommandStores.java | 22 +- .../src/main/java/accord/local/Commands.java | 40 ++- accord-core/src/main/java/accord/local/Node.java | 2 +- .../java/accord/messages/SetGloballyDurable.java | 5 +- .../main/java/accord/messages/SetShardDurable.java | 2 +- .../src/main/java/accord/utils/TriConsumer.java | 24 ++ .../main/java/accord/utils/async/AsyncChain.java | 11 + .../test/java/accord/impl/RemoteListenersTest.java | 15 +- .../accord/impl/basic/DelayedCommandStores.java | 16 +- .../java/accord/local/BootstrapLocalTxnTest.java | 20 +- .../java/accord/local/cfk/CommandsForKeyTest.java | 14 +- 15 files changed, 494 insertions(+), 168 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 872a6f25..c74e9f27 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -86,6 +86,8 @@ import accord.primitives.TxnId; import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestDep.WITH; @@ -123,9 +125,34 @@ public abstract class InMemoryCommandStore extends CommandStore private InMemorySafeStore current; - public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) + // To simulate the delay in simulatedAsyncPersist + private final Scheduler scheduler; + private static <T> FieldPersister<T> simulatedAsyncPersistFactory(Scheduler scheduler) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + return (commandStore, toPersist) -> simulatedAsyncPersist(scheduler, commandStore, toPersist); + } + + private static <T> AsyncResult<?> simulatedAsyncPersist(Scheduler scheduler, CommandStore store, T toPersist) + { + AsyncResult.Settable<?> result = AsyncResults.settable(); + scheduler.once(() -> result.trySuccess(null), 100, TimeUnit.MICROSECONDS); + return result; + } + + public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) + { + super(id, + time, + agent, + store, + progressLogFactory, + listenersFactory, + epochUpdateHolder, + simulatedAsyncPersistFactory(scheduler), + simulatedAsyncPersistFactory(scheduler), + simulatedAsyncPersistFactory(scheduler), + simulatedAsyncPersistFactory(scheduler)); + this.scheduler = scheduler; } protected boolean canExposeUnloaded() @@ -367,57 +394,59 @@ public abstract class InMemoryCommandStore extends CommandStore } @Override - public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges) + public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges) { - super.markShardDurable(safeStore, syncId, ranges); - markShardDurable(syncId, ranges); - } - - private void markShardDurable(TxnId syncId, Ranges ranges) - { - if (!rangeCommands.containsKey(syncId)) - historicalRangeCommands.merge(syncId, ranges, Ranges::with); - - // TODO (now): apply on retrieval - historicalRangeCommands.entrySet().removeIf(next -> next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges)); - rangeCommands.entrySet().removeIf(tx -> { - if (tx.getKey().compareTo(syncId) >= 0) - return false; - Ranges newRanges = tx.getValue().ranges.without(ranges); - if (!newRanges.isEmpty()) - { - tx.getValue().ranges = newRanges; - return false; - } - else - { - maxRedundant = Timestamp.nonNullOrMax(maxRedundant, tx.getValue().command.value().executeAt()); - return true; - } - }); - - // verify we're clearing the progress log - ((Node)time).scheduler().once(() -> { - DefaultProgressLog progressLog = (DefaultProgressLog) this.progressLog; - commands.headMap(syncId, false).forEach((id, cmd) -> { - Command command = cmd.value(); - if (!command.hasBeen(PreCommitted)) return; - if (!command.txnId().kind().isGloballyVisible()) return; - - Ranges allRanges = unsafeRangesForEpoch().allBetween(id.epoch(), command.executeAtOrTxnId().epoch()); - boolean done = command.hasBeen(Truncated); - if (!done) - { - if (redundantBefore().status(cmd.txnId, command.executeAtOrTxnId(), command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE) - return; - - Route<?> route = cmd.value().route().slice(allRanges); - done = !route.isEmpty() && ranges.containsAll(route); - } - - if (done) Invariants.checkState(progressLog.get(id) == null); - }); - }, 5L, TimeUnit.SECONDS); + // We know it completes immediately for InMemoryCommandStore + AsyncChain<Void> markShardDurableChain = super.markShardDurable(safeStore, syncId, ranges); + markShardDurableChain = markShardDurableChain.map(ignored -> + { + if (!rangeCommands.containsKey(syncId)) + historicalRangeCommands.merge(syncId, ranges, Ranges::with); + + // TODO (now): apply on retrieval + historicalRangeCommands.entrySet().removeIf(next -> next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges)); + rangeCommands.entrySet().removeIf(tx -> { + if (tx.getKey().compareTo(syncId) >= 0) + return false; + Ranges newRanges = tx.getValue().ranges.without(ranges); + if (!newRanges.isEmpty()) + { + tx.getValue().ranges = newRanges; + return false; + } + else + { + maxRedundant = Timestamp.nonNullOrMax(maxRedundant, tx.getValue().command.value().executeAt()); + return true; + } + }); + + // verify we're clearing the progress log + ((Node)time).scheduler().once(() -> { + DefaultProgressLog progressLog = (DefaultProgressLog) this.progressLog; + commands.headMap(syncId, false).forEach((id, cmd) -> { + Command command = cmd.value(); + if (!command.hasBeen(PreCommitted)) return; + if (!command.txnId().kind().isGloballyVisible()) return; + + Ranges allRanges = unsafeRangesForEpoch().allBetween(id.epoch(), command.executeAtOrTxnId().epoch()); + boolean done = command.hasBeen(Truncated); + if (!done) + { + if (redundantBefore().status(cmd.txnId, command.executeAtOrTxnId(), command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE) + return; + + Route<?> route = cmd.value().route().slice(allRanges); + done = !route.isEmpty() && ranges.containsAll(route); + } + + if (done) Invariants.checkState(progressLog.get(id) == null); + }); + }, 5L, TimeUnit.SECONDS); + return null; + }, this); + + return markShardDurableChain; } protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, @@ -485,7 +514,8 @@ public abstract class InMemoryCommandStore extends CommandStore { if (current != null) throw illegalState("Another operation is in progress or it's store was not cleared"); - current = createSafeStore(context, updateRangesForEpoch()); + RangesForEpoch rangesForEpoch = updateRangesForEpoch(); + current = createSafeStore(context, rangesForEpoch); return current; } @@ -1027,9 +1057,9 @@ public abstract class InMemoryCommandStore extends CommandStore Runnable active = null; final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); - public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) + public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); } private synchronized void maybeRun() @@ -1119,9 +1149,9 @@ public abstract class InMemoryCommandStore extends CommandStore private Thread thread; // when run in the executor this will be non-null, null implies not running in this store private final ExecutorService executor; - public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) + public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); this.executor = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']'); @@ -1203,9 +1233,9 @@ public abstract class InMemoryCommandStore extends CommandStore } } - public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) + public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); } @Override diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java index fdb94d8a..9c7f56e0 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java @@ -18,41 +18,45 @@ package accord.impl; -import accord.api.LocalListeners; -import accord.local.*; import accord.api.Agent; import accord.api.DataStore; +import accord.api.LocalListeners; import accord.api.ProgressLog; +import accord.api.Scheduler; +import accord.local.CommandStore; +import accord.local.CommandStores; +import accord.local.NodeTimeService; +import accord.local.ShardDistributor; import accord.utils.RandomSource; public class InMemoryCommandStores { public static class Synchronized extends CommandStores { - public Synchronized(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) + public Synchronized(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler scheduler) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new, scheduler); } } public static class SingleThread extends CommandStores { - public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) + public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler scheduler) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new, scheduler); } - public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) + public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, shardFactory); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, shardFactory, scheduler); } } public static class Debug extends InMemoryCommandStores.SingleThread { - public Debug(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) + public Debug(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler scheduler) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new, scheduler); } } } diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index cae6da0e..fe2fa9ed 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -142,12 +142,12 @@ class Bootstrap // we fix here the ranges we use for the synthetic command, even though we may end up only finishing a subset // of these ranges as part of this attempt Ranges commitRanges = valid; - store.markBootstrapping(safeStore0, globalSyncId, valid); - CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges) - // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed - .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> { - if (valid.isEmpty()) // we've lost ownership of the range - return AsyncResults.success(Ranges.EMPTY); + store.markBootstrapping(safeStore0.commandStore(), globalSyncId, valid).flatMap(ignore -> + CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges) + // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed + .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> { + if (valid.isEmpty()) // we've lost ownership of the range + return AsyncResults.success(Ranges.EMPTY); Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, valid); safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, safeStore1); diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index a6326a8a..ab91b32c 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -52,20 +52,47 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import accord.api.Agent; +import accord.api.ConfigurationService.EpochReady; +import accord.api.DataStore; +import accord.api.LocalListeners; +import accord.api.ProgressLog; +import accord.api.Scheduler; +import accord.api.VisibleForImplementationTesting; +import accord.coordinate.CollectCalculatedDeps; +import accord.local.Command.WaitingOn; +import accord.local.CommandStores.RangesForEpoch; import accord.primitives.FullRoute; +import accord.primitives.KeyDeps; +import accord.primitives.Keys; import accord.primitives.Participants; +import accord.primitives.Range; import accord.primitives.RangeDeps; import accord.primitives.Ranges; +import accord.primitives.Routables; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; +import accord.utils.DeterministicIdentitySet; +import accord.utils.Invariants; +import accord.utils.ReducingRangeMap; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import org.agrona.collections.Int2ObjectHashMap; @@ -76,13 +103,17 @@ import static accord.local.PreLoadContext.empty; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; +import static accord.utils.Invariants.checkState; import static accord.utils.Invariants.illegalState; +import static com.google.common.base.Preconditions.checkNotNull; /** * Single threaded internal shard of accord transaction metadata */ public abstract class CommandStore implements AgentExecutor { + public static final Logger logger = LoggerFactory.getLogger(CommandStore.class); + static class EpochUpdate { final RangesForEpoch newRangesForEpoch; @@ -138,7 +169,8 @@ public abstract class CommandStore implements AgentExecutor DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, - EpochUpdateHolder rangesForEpoch); + EpochUpdateHolder rangesForEpoch, + Scheduler scheduler); } private static final ThreadLocal<CommandStore> CURRENT_STORE = new ThreadLocal<>(); @@ -151,9 +183,9 @@ public abstract class CommandStore implements AgentExecutor protected final LocalListeners listeners; protected final EpochUpdateHolder epochUpdateHolder; - // TODO (expected): schedule regular pruning of these collections - // bootstrapBeganAt and shardDurableAt are both canonical data sets mostly used for debugging / constructing - private NavigableMap<TxnId, Ranges> bootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); // additive (i.e. once inserted, rolled-over until invalidated, and the floor entry contains additions) + // Used in markShardStale to make sure the staleness includes in progresss bootstraps + private transient NavigableMap<TxnId, Ranges> bootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); // additive (i.e. once inserted, rolled-over until invalidated, and the floor entry contains additions) + private RedundantBefore redundantBefore = RedundantBefore.EMPTY; // TODO (expected): store this only once per node private DurableBefore durableBefore = DurableBefore.EMPTY; @@ -180,7 +212,22 @@ public abstract class CommandStore implements AgentExecutor private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private ReducingRangeMap<Timestamp> rejectBefore; - protected CommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) + private final PersistentField<DurableBefore, DurableBefore> durableBeforePersistentField; + private final PersistentField<RedundantBefore, RedundantBefore> redundantBeforePersistentField; + private final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, Ranges>> bootstrapBeganAtPersistentField; + private final PersistentField<NavigableMap<Timestamp, Ranges>, NavigableMap<Timestamp, Ranges>> safeToReadPersistentField; + + protected CommandStore(int id, + NodeTimeService time, + Agent agent, + DataStore store, + ProgressLog.Factory progressLogFactory, + LocalListeners.Factory listenersFactory, + EpochUpdateHolder epochUpdateHolder, + FieldPersister<DurableBefore> persistDurableBefore, + FieldPersister<RedundantBefore> persistRedundantBefore, + FieldPersister<NavigableMap<TxnId, Ranges>> persistBootstrapBeganAt, + FieldPersister<NavigableMap<Timestamp, Ranges>> persistSafeToReadAt) { this.id = id; this.time = time; @@ -189,6 +236,11 @@ public abstract class CommandStore implements AgentExecutor this.progressLog = progressLogFactory.create(this); this.listeners = listenersFactory.create(this); this.epochUpdateHolder = epochUpdateHolder; + this.durableBeforePersistentField = new PersistentField<>(this::durableBefore, DurableBefore::merge, persistDurableBefore, durableBefore -> setDurableBefore(durableBefore)); + this.redundantBeforePersistentField = new PersistentField<>(this::redundantBefore, RedundantBefore::merge, persistRedundantBefore, redundantBefore -> setRedundantBefore(redundantBefore)); + this.bootstrapBeganAtPersistentField = new PersistentField<>(this::bootstrapBeganAt, CommandStore::bootstrap, persistBootstrapBeganAt, bootstrapBeganAt -> setBootstrapBeganAt(bootstrapBeganAt)); + this.safeToReadPersistentField = new PersistentField<>(this::safeToRead, null, persistSafeToReadAt, safeToRead -> setSafeToRead(safeToRead)); + } public final int id() @@ -209,12 +261,18 @@ public abstract class CommandStore implements AgentExecutor return rangesForEpoch; update = epochUpdateHolder.getAndSet(null); + if (!update.addGlobalRanges.isEmpty()) + // Intentionally don't care if this persists since it will be replayed from topology at startup setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE))); + if (update.addRedundantBefore.size() > 0) + // Intentionally don't care if this persists since it will be replayed from topology at startup setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore)); + if (update.newRangesForEpoch != null) rangesForEpoch = update.newRangesForEpoch; + return rangesForEpoch; } @@ -244,14 +302,13 @@ public abstract class CommandStore implements AgentExecutor this.rejectBefore = newRejectBefore; } - /** - * To be overridden by implementations, to ensure the new state is persisted - * - * TODO (required): consider handling asynchronicity of persistence - * (could leave to impls to call this parent method once persisted) - * TODO (desired): compact Ranges, merging overlaps - */ - protected void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) + protected AsyncResult<?> mergeAndUpdateBootstrapBeganAt(BootstrapSyncPoint globalSyncPoint) + { + return bootstrapBeganAtPersistentField.mergeAndUpdate(globalSyncPoint, null, null, false); + } + + // This will not work correctly if called outside PersistentField without remerging + protected final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) { this.bootstrapBeganAt = newBootstrapBeganAt; } @@ -261,20 +318,26 @@ public abstract class CommandStore implements AgentExecutor return durableBefore; } - /** - * To be overridden by implementations, to ensure the new state is persisted. - */ - public void setDurableBefore(DurableBefore durableBefore) + public final AsyncResult<?> mergeAndUpdateDurableBefore(DurableBefore newDurableBefore) { - this.durableBefore = durableBefore; + return durableBeforePersistentField.mergeAndUpdate(newDurableBefore, null, null, true); } - /** - * To be overridden by implementations, to ensure the new state is persisted. - */ - protected void setRedundantBefore(RedundantBefore newRedundantBefore) + // For implementations to use after persistence + protected final void setDurableBefore(DurableBefore newDurableBefore) + { + durableBefore = newDurableBefore; + } + + protected final AsyncResult<?> mergeAndUpdateRedundantBefore(RedundantBefore newRedundantBefore, Timestamp gcBefore, Ranges updatedRanges) + { + return redundantBeforePersistentField.mergeAndUpdate(newRedundantBefore, gcBefore, updatedRanges, true); + } + + // For implementations to use after persistence + protected final void setRedundantBefore(RedundantBefore newRedundantBefore) { - this.redundantBefore = newRedundantBefore; + redundantBefore = newRedundantBefore; } /** @@ -296,10 +359,17 @@ public abstract class CommandStore implements AgentExecutor setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt)); } + protected AsyncResult<?> mergeAndUpdateSafeToRead(Function<NavigableMap<Timestamp, Ranges>, NavigableMap<Timestamp, Ranges>> computeNewValue) + { + // The input values are bound into the merge function to satisfy the fact that there are two different sets of inputs types to the merge function + // depending on whether it is purgeHistory or purgeAndInsert + return safeToReadPersistentField.mergeAndUpdate(computeNewValue); + } + /** * This method may be invoked on a non-CommandStore thread */ - protected synchronized void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) + protected final synchronized void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) { this.safeToRead = newSafeToRead; } @@ -313,13 +383,15 @@ public abstract class CommandStore implements AgentExecutor setRejectBefore(newRejectBefore); } - public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) + public final AsyncChain<?> markExclusiveSyncPointLocallyApplied(CommandStore commandStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint); RedundantBefore newRedundantBefore = RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, TxnId.NONE, TxnId.NONE)); - setRedundantBefore(newRedundantBefore); - updatedRedundantBefore(safeStore, txnId, ranges); + AsyncResult<?> setRedundantBeforeChain = mergeAndUpdateRedundantBefore(newRedundantBefore, txnId, ranges); + return setRedundantBeforeChain.flatMap( + ignored -> commandStore.execute(contextFor(txnId), + safeStore -> updatedRedundantBefore(safeStore, txnId, ranges))); } /** @@ -509,27 +581,35 @@ public abstract class CommandStore implements AgentExecutor bootstraps.remove(bootstrap); } - final void markBootstrapping(SafeCommandStore safeStore, TxnId globalSyncId, Ranges ranges) + final AsyncChain<?> markBootstrapping(CommandStore commandStore, TxnId globalSyncId, Ranges ranges) { store.snapshot(); - setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); + AsyncResult<?> setBootstrapBeganAtResult = mergeAndUpdateBootstrapBeganAt(new BootstrapSyncPoint(globalSyncId, ranges)); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId); - setRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore)); + // TODO (review): What is the correct txnId to provide here to restrict what memtables are flushed? + AsyncResult<?> setRedundantBeforeResult = mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore), globalSyncId, ranges); DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); - setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); - updatedRedundantBefore(safeStore, globalSyncId, ranges); + AsyncResult<?> setDurableBeforeResult = mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); + AsyncChain<?> combinedChain = AsyncChains.allOf(ImmutableList.of(setBootstrapBeganAtResult, setRedundantBeforeResult, setDurableBeforeResult)); + return combinedChain.flatMap( + ignored -> commandStore.execute(PreLoadContext.contextFor(globalSyncId), + safeStore -> updatedRedundantBefore(safeStore, globalSyncId, ranges))); } // TODO (expected): we can immediately truncate dependencies locally once an exclusiveSyncPoint applies, we don't need to wait for the whole shard - public void markShardDurable(SafeCommandStore safeStore, TxnId globalSyncId, Ranges ranges) + public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore0, TxnId globalSyncId, Ranges ranges) { store.snapshot(); - ranges = ranges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal); + ranges = ranges.slice(safeStore0.ranges().allUntil(globalSyncId.epoch()), Minimal); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE); - setRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore)); + AsyncResult<?> setRedundantBeforeChain = mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore), globalSyncId, ranges); DurableBefore addDurableBefore = DurableBefore.create(ranges, globalSyncId, globalSyncId); - setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); - updatedRedundantBefore(safeStore, globalSyncId, ranges); + AsyncResult<?> setDurableBeforeChain = mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); + Ranges slicedRanges = ranges; + AsyncChain<?> combinedChain = AsyncChains.allOf(ImmutableList.of(setRedundantBeforeChain, setDurableBeforeChain)); + return combinedChain.flatMap( + ignored -> safeStore0.commandStore().execute(PreLoadContext.contextFor(globalSyncId), + safeStore1 -> updatedRedundantBefore(safeStore1, globalSyncId, slicedRanges))); } protected void updatedRedundantBefore(SafeCommandStore safeStore, TxnId syncId, Ranges ranges) @@ -556,19 +636,34 @@ public abstract class CommandStore implements AgentExecutor agent.onStale(staleSince, ranges); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast); - setRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore)); + // TODO (review): Is it ok for this to be asynchronous here? + // TODO (review): Is stale since the right txnid here? + mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore), staleSince, ranges).addCallback(agent); // find which ranges need to bootstrap, subtracting those already in progress that cover the id markUnsafeToRead(ranges); } // MUST be invoked before CommandStore reference leaks to anyone + // The integration may have already loaded persisted values for these fields before this is called + // so it must be a merge for each field with the initialization values. These starting values don't need to be + // persisted since we can synthesize them at startup every time + // TODO (review): This needs careful thought about not persisting and that purgeAndInsert is doing the right thing + // with safeToRead Supplier<EpochReady> initialise(long epoch, Ranges ranges) { + // Merge in a base for any ranges that needs to be covered DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); - setBootstrapBeganAt(ImmutableSortedMap.of(TxnId.NONE, ranges)); - setSafeToRead(ImmutableSortedMap.of(Timestamp.NONE, ranges)); + // TODO (review): Convoluted check to not overwrite existing bootstraps with TxnId.NONE + // If loading from disk didn't finish before this then we might initialize the range at TxnId.NONE? + // Does CommandStores.topology ensure that doesn't happen? Is it fine if it does because it will get superseded? + Ranges newBootstrapRanges = ranges; + for (Ranges existing : bootstrapBeganAt.values()) + newBootstrapRanges = newBootstrapRanges.without(existing); + if (!newBootstrapRanges.isEmpty()) + bootstrapBeganAt = bootstrap(new BootstrapSyncPoint(TxnId.NONE, newBootstrapRanges), bootstrapBeganAt); + safeToRead = purgeAndInsert(safeToRead, TxnId.NONE, ranges); return () -> new EpochReady(epoch, DONE, DONE, DONE, DONE); } @@ -589,7 +684,7 @@ public abstract class CommandStore implements AgentExecutor } @VisibleForImplementationTesting - public NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return bootstrapBeganAt; } + public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return bootstrapBeganAt; } @VisibleForImplementationTesting public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; } @@ -624,9 +719,9 @@ public abstract class CommandStore implements AgentExecutor Keys prev = partiallyBootstrapping.get(txnIdx); Keys remaining = prev; if (remaining == null) remaining = builder.directKeyDeps.participatingKeys(txnIdx); - else Invariants.checkState(!remaining.isEmpty()); + else checkState(!remaining.isEmpty()); remaining = remaining.without(range); - if (prev == null) Invariants.checkState(!remaining.isEmpty()); + if (prev == null) checkState(!remaining.isEmpty()); partiallyBootstrapping.put(txnIdx, remaining); return remaining.isEmpty(); } @@ -689,9 +784,9 @@ public abstract class CommandStore implements AgentExecutor Ranges prev = partiallyBootstrapping.get(rangeTxnIdx); Ranges remaining = prev; if (remaining == null) remaining = builder.directRangeDeps.ranges(rangeTxnIdx); - else Invariants.checkState(!remaining.isEmpty()); + else checkState(!remaining.isEmpty()); remaining = remaining.without(Ranges.of(range)); - if (prev == null) Invariants.checkState(!remaining.isEmpty()); + if (prev == null) checkState(!remaining.isEmpty()); partiallyBootstrapping.put(rangeTxnIdx, remaining); return remaining.isEmpty(); } @@ -745,18 +840,35 @@ public abstract class CommandStore implements AgentExecutor final synchronized void markUnsafeToRead(Ranges ranges) { if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges))) - setSafeToRead(purgeHistory(safeToRead, ranges)); + mergeAndUpdateSafeToRead(safeToRead -> purgeHistory(safeToRead, ranges)); } final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp at, Ranges ranges) { Ranges validatedSafeToRead = redundantBefore.validateSafeToRead(forBootstrapAt, ranges); - setSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead)); + mergeAndUpdateSafeToRead(safeToRead -> purgeAndInsert(safeToRead, at, validatedSafeToRead)); } - private static <T extends Timestamp> ImmutableSortedMap<T, Ranges> bootstrap(T at, Ranges ranges, NavigableMap<T, Ranges> bootstrappedAt) + protected static class BootstrapSyncPoint { - Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0); + TxnId syncTxnId; + Ranges ranges; + + protected BootstrapSyncPoint(TxnId syncTxnId, Ranges ranges) + { + this.syncTxnId = syncTxnId; + this.ranges = ranges; + } + } + + protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(BootstrapSyncPoint syncPoint, NavigableMap<TxnId, Ranges> bootstrappedAt) + { + TxnId at = syncPoint.syncTxnId; + Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0 || syncPoint.syncTxnId == TxnId.NONE); + if (syncPoint.syncTxnId == TxnId.NONE) + for (Ranges ranges : bootstrappedAt.values()) + checkState(!syncPoint.ranges.intersects(ranges)); + Ranges ranges = syncPoint.ranges; Invariants.checkArgument(!ranges.isEmpty()); // if we're bootstrapping these ranges, then any period we previously owned the ranges for is effectively invalidated return purgeAndInsert(bootstrappedAt, at, ranges); @@ -793,4 +905,101 @@ public abstract class CommandStore implements AgentExecutor return in; return new SimpleImmutableEntry<>(in.getKey(), without); } + + protected interface FieldPersister<T> + { + default AsyncResult<?> persist(CommandStore store, Timestamp timestamp, Ranges ranges, T toPersist) + { + return persist(store, toPersist); + } + + AsyncResult<?> persist(CommandStore store, T toPersist); + } + + // A helper class for implementing fields that needs to be asynchronously persisted and concurrent updates + // need to be merged and ordered + protected class PersistentField<I, T> + { + @Nonnull + private final Supplier<T> currentValue; + // The update can be bound into the merge function in which case it will be null + // Useful when the merge/update function takes multiple types of input arguments + @Nullable + private final BiFunction<I, T, T> merge; + @Nonnull + private final FieldPersister<T> persister; + @Nonnull + private final Consumer<T> set; + + private T pendingValue; + private AsyncResult<?> pendingResult; + + public PersistentField(@Nonnull Supplier<T> currentValue, @Nonnull BiFunction<I, T, T> merge, @Nonnull FieldPersister<T> persist, @Nullable Consumer<T> set) + { + checkNotNull(currentValue, "currentValue cannot be null"); + checkNotNull(persist, "persist cannot be null"); + checkNotNull(set, "set cannot be null"); + this.currentValue = currentValue; + this.merge = merge; + this.persister = persist; + this.set = set; + } + + public AsyncResult<?> mergeAndUpdate(@Nonnull I inputValue, @Nullable Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean remergeAfterPersistence) + { + checkNotNull(merge, "merge cannot be null"); + checkNotNull(inputValue, "inputValue cannot be null"); + return mergeAndUpdate(inputValue, merge, gcBefore, updatedRanges, remergeAfterPersistence); + } + + public AsyncResult<?> mergeAndUpdate(@Nonnull Function<T, T> update) + { + checkNotNull(update, "merge cannot be null"); + return mergeAndUpdate(null, (ignored, existingValue) -> update.apply(existingValue), null, null, false); + } + + private AsyncResult<?> mergeAndUpdate(@Nullable I inputValue, @Nonnull BiFunction<I, T, T> merge, @Nullable Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean remergeAfterPersistence) + { + checkNotNull(merge, "merge cannot be null"); + AsyncResult.Settable<Void> result = AsyncResults.settable(); + AsyncResult<?> oldPendingResult = pendingResult; + T startingValue = currentValue.get(); + T newValue = pendingValue != null ? merge.apply(inputValue, pendingValue) : merge.apply(inputValue, startingValue); + this.pendingResult = result; + this.pendingValue= newValue; + + AsyncResult<?> pendingWrite = persister.persist(CommandStore.this, gcBefore, updatedRanges, newValue); + + final T newValueFinal = newValue; + BiConsumer<Object, Throwable> callback = (ignored, failure) -> { + if (PersistentField.this.pendingResult == result) + { + PersistentField.this.pendingResult = null; + PersistentField.this.pendingValue = null; + } + if (failure != null) + result.tryFailure(failure); + else + { + // DurableBefore and RedundantBefore can have initial values set non-persistently in updateRangesForEpoch so remerge them here + // updateRangesForEpoch really doesn't integrate well with is callers if it is asynchronous updating these values + // so this complexity is better than the alternative + if (remergeAfterPersistence && currentValue.get() != startingValue) + // I and T will have to be the same for remerge to work + set.accept(merge.apply((I)newValueFinal, currentValue.get())); + else + set.accept(newValueFinal); + result.trySuccess(null); + } + }; + + // Order completion after previous updates, this is probably stricter than necessary but easy to implement + if (oldPendingResult != null) + oldPendingResult.addCallback(() -> pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent)); + else + pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent); + + return result; + } + } } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 55a923a3..4c8efabc 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -29,8 +29,11 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nonnull; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.ConfigurationService.EpochReady; @@ -39,6 +42,7 @@ import accord.api.Key; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; +import accord.api.Scheduler; import accord.local.CommandStore.EpochUpdateHolder; import accord.primitives.EpochSupplier; import accord.primitives.Participants; @@ -56,14 +60,9 @@ import accord.utils.MapReduceConsume; import accord.utils.RandomSource; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; -import javax.annotation.Nonnull; - import org.agrona.collections.Hashing; import org.agrona.collections.Int2ObjectHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static accord.api.ConfigurationService.EpochReady.done; import static accord.local.PreLoadContext.empty; import static accord.primitives.Routables.Slice.Minimal; @@ -87,7 +86,8 @@ public abstract class CommandStores RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, - LocalListeners.Factory listenersFactory); + LocalListeners.Factory listenersFactory, + Scheduler scheduler); } private static class StoreSupplier @@ -99,8 +99,9 @@ public abstract class CommandStores private final LocalListeners.Factory listenersFactory; private final CommandStore.Factory shardFactory; private final RandomSource random; + private final Scheduler scheduler; - StoreSupplier(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) + StoreSupplier(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler) { this.time = time; this.agent = agent; @@ -109,11 +110,12 @@ public abstract class CommandStores this.progressLogFactory = progressLogFactory; this.listenersFactory = listenersFactory; this.shardFactory = shardFactory; + this.scheduler = scheduler; } CommandStore create(int id, EpochUpdateHolder rangesForEpoch) { - return shardFactory.create(id, time, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch); + return shardFactory.create(id, time, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch, scheduler); } } @@ -367,9 +369,9 @@ public abstract class CommandStores } public CommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, - ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) + ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler) { - this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory), shardDistributor); + this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory, scheduler), shardDistributor); } public Topology local() diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 1ae4c582..956edf77 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -623,7 +623,7 @@ public class Commands if (command.status() != Stable && command.status() != PreApplied) { if (alwaysNotifyListeners) - safeStore.notifyListeners(safeCommand, command); + safeStore0.notifyListeners(safeCommand0, command); return false; } @@ -631,10 +631,10 @@ public class Commands if (waitingOn.isWaiting()) { if (alwaysNotifyListeners) - safeStore.notifyListeners(safeCommand, command); + safeStore0.notifyListeners(safeCommand0, command); if (notifyWaitingOn && waitingOn.isWaitingOnCommand()) - new NotifyWaitingOn(safeCommand).accept(safeStore); + new NotifyWaitingOn(safeCommand0).accept(safeStore0); return false; } @@ -647,23 +647,23 @@ public class Commands // TODO (required): we can have dangling transactions in some cases when proposing in a future epoch but // later deciding on an earlier epoch. We should probably turn this into an erased vestigial command, // but we should tighten up our semantics there in general. - safeCommand.readyToExecute(safeStore); + safeCommand0.readyToExecute(safeStore0); logger.trace("{}: set to ReadyToExecute", command.txnId()); - safeStore.notifyListeners(safeCommand, command); + safeStore0.notifyListeners(safeCommand0, command); return true; case PreApplied: - Ranges executeRanges = executeRanges(safeStore, command.executeAt()); + Ranges executeRanges = executeRanges(safeStore0, command.executeAt()); Command.Executed executed = command.asExecuted(); boolean intersects = executed.writes().keys.intersects(executeRanges); if (intersects) { // TODO (now): we should set applying within apply to avoid applying multiple times - safeCommand.applying(safeStore); - safeStore.notifyListeners(safeCommand, command); + safeCommand0.applying(safeStore0); + safeStore0.notifyListeners(safeCommand0, command); logger.trace("{}: applying", command.txnId()); - apply(safeStore, executed); + apply(safeStore0, executed); return true; } else @@ -671,8 +671,26 @@ public class Commands // TODO (desirable, performance): This could be performed immediately upon Committed // but: if we later support transitive dependency elision this could be dangerous logger.trace("{}: applying no-op", command.txnId()); - safeCommand.applied(safeStore); - safeStore.notifyListeners(safeCommand, command); + if (command.txnId().kind() == ExclusiveSyncPoint) + { + Ranges ranges = safeStore0.ranges().allAt(command.txnId().epoch()); + ranges = command.route().slice(ranges, Minimal).participants().toRanges(); + CommandStore commandStore = safeStore0.commandStore(); + safeCommand0.applying(safeStore0); + safeStore0.notifyListeners(safeCommand0, command); + commandStore.markExclusiveSyncPointLocallyApplied(commandStore, command.txnId(), ranges).flatMap(ignored -> + commandStore.execute(PreLoadContext.contextFor(command.txnId()), safeStore1 -> { + SafeCommand safeCommand1 = safeStore1.get(command.txnId()); + safeCommand1.applied(safeStore1); + safeStore1.notifyListeners(safeCommand1, command); + }) + ).begin(commandStore.agent); + } + else + { + safeCommand0.applied(safeStore0); + safeStore0.notifyListeners(safeCommand0, command); + } return true; } default: diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index f008edf1..22e300f7 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -192,7 +192,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService this.now = new AtomicReference<>(Timestamp.fromValues(topology.epoch(), nowSupplier.getAsLong(), id)); this.agent = agent; this.random = random; - this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this)); + this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this), scheduler); // TODO review these leak a reference to an object that hasn't finished construction, possibly to other threads configService.registerListener(this); } diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java index 3fa39321..61a08bca 100644 --- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java +++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java @@ -18,8 +18,8 @@ package accord.messages; -import accord.local.PreLoadContext; import accord.local.DurableBefore; +import accord.local.PreLoadContext; import accord.local.SafeCommandStore; import accord.primitives.TxnId; @@ -51,7 +51,8 @@ public class SetGloballyDurable extends AbstractEpochRequest<SimpleReply> { DurableBefore cur = safeStore.commandStore().durableBefore(); DurableBefore upd = DurableBefore.merge(durableBefore, cur); - safeStore.commandStore().setDurableBefore(upd); + // This is done asynchronously + safeStore.commandStore().mergeAndUpdateDurableBefore(upd).begin(node.agent()); return Ok; } diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java b/accord-core/src/main/java/accord/messages/SetShardDurable.java index 09c7b236..c0c39069 100644 --- a/accord-core/src/main/java/accord/messages/SetShardDurable.java +++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java @@ -46,7 +46,7 @@ public class SetShardDurable extends AbstractEpochRequest<SimpleReply> @Override public SimpleReply apply(SafeCommandStore safeStore) { - safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges); + safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges).begin(node.agent()); return Ok; } diff --git a/accord-core/src/main/java/accord/utils/TriConsumer.java b/accord-core/src/main/java/accord/utils/TriConsumer.java new file mode 100644 index 00000000..72d1ec03 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/TriConsumer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +public interface TriConsumer<P1, P2, P3> +{ + void consume(P1 p1, P2 p2, P3 p3); +} diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java b/accord-core/src/main/java/accord/utils/async/AsyncChain.java index 66f97f16..a642321b 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java @@ -21,6 +21,7 @@ package accord.utils.async; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -48,6 +49,16 @@ public interface AsyncChain<V> return AsyncChains.flatMap(this, mapper, executor); } + default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper, BooleanSupplier inExecutor, Executor executor) + { + return flatMap(input -> { + if (inExecutor.getAsBoolean()) + return AsyncChains.success(mapper.apply(input)); + else + return AsyncChains.ofCallable(executor, () -> mapper.apply(input)); + }); + } + /** * When the chain has failed, this allows the chain to attempt to recover if possible. The provided function may return a {@code null} to represent * that recovery was not possible and that the original exception should propgate. diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java index 4329114f..60f59895 100644 --- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java +++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java @@ -31,7 +31,6 @@ import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.Supplier; import java.util.stream.Stream; - import javax.annotation.Nullable; import org.junit.jupiter.api.Assertions; @@ -65,6 +64,7 @@ import accord.utils.AccordGens; import accord.utils.RandomSource; import accord.utils.RandomTestRunner; import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncResults; import org.agrona.collections.IntHashSet; import org.agrona.collections.ObjectHashSet; @@ -384,7 +384,18 @@ public class RemoteListenersTest protected TestCommandStore(int id) { - super(id, null, null, null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultLocalListeners.DefaultNotifySink.INSTANCE), new EpochUpdateHolder()); + super(id, + null, + null, + null, + ignore -> new ProgressLog.NoOpProgressLog(), + ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), + DefaultLocalListeners.DefaultNotifySink.INSTANCE), + new EpochUpdateHolder(), + (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID, + (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID, + (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID, + (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID); this.storeId = id; } diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 0de6debc..b583b70b 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -40,6 +40,8 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.LocalListeners; import accord.api.ProgressLog; +import accord.api.Result; +import accord.api.Scheduler; import accord.impl.InMemoryCommandStore; import accord.impl.InMemoryCommandStores; import accord.impl.InMemorySafeCommand; @@ -108,15 +110,15 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread return false; } - private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, SimulatedDelayedExecutorService executorService, BooleanSupplier isLoadedCheck, Journal journal) + private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, SimulatedDelayedExecutorService executorService, BooleanSupplier isLoadedCheck, Journal journal, Scheduler scheduler) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal)); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal), scheduler); } public static CommandStores.Factory factory(PendingQueue pending, BooleanSupplier isLoadedCheck, Journal journal) { - return (time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory) -> - new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal); + return (time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, scheduler) -> + new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal, scheduler); } @Override @@ -167,9 +169,9 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private final BooleanSupplier isLoadedCheck; private final Journal journal; - public DelayedCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal) + public DelayedCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal, Scheduler scheduler) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); this.executor = executor; this.isLoadedCheck = isLoadedCheck; this.journal = journal; @@ -200,7 +202,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private static CommandStore.Factory factory(SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal) { - return (id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, executor, isLoadedCheck, journal); + return (id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, scheduler) -> new DelayedCommandStore(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, executor, isLoadedCheck, journal, scheduler); } @Override diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java index 4d598d9e..cd639f78 100644 --- a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java +++ b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java @@ -24,6 +24,7 @@ import java.util.function.Consumer; import org.junit.jupiter.api.Test; +import accord.api.Agent; import accord.impl.PrefixedIntHashKey; import accord.impl.basic.Cluster; import accord.impl.basic.DelayedCommandStores.DelayedCommandStore; @@ -43,6 +44,8 @@ import accord.utils.AccordGens; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.assertj.core.api.Assertions; import static accord.local.PreLoadContext.contextFor; @@ -67,7 +70,7 @@ class BootstrapLocalTxnTest for (int storeId : on.commandStores().ids()) { DelayedCommandStore store = (DelayedCommandStore) on.commandStores().forId(storeId); - // this is a bit redudent but here to make the test easier to maintain. Pre/Post execute we validate each command to make sure everything is fine + // this is a bit redundent but here to make the test easier to maintain. Pre/Post execute we validate each command to make sure everything is fine // but that logic could be changed and this test has a dependency on validation the command... so to make this dependency explicit // the test will call the validation logic within the test even though it will be called again in the background... Consumer<Command> validate = store::validateRead; @@ -80,23 +83,24 @@ class BootstrapLocalTxnTest SyncPoint<Ranges> syncPoint = new SyncPoint<>(globalSyncId, Deps.NONE, ranges, route); Ranges valid = AccordGens.rangesInsideRanges(ranges, (rs2, r) -> rs2.nextInt(1, 4)).next(rs); Invariants.checkArgument(syncPoint.keysOrRanges.containsAll(valid)); + Agent agent = store.agent; store.execute(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safe -> Commands.createBootstrapCompleteMarkerTransaction(safe, localSyncId, valid)) .flatMap(ignore -> store.execute(contextFor(localSyncId), safe -> validate.accept(safe.get(localSyncId, route.homeKey()).current()))) .flatMap(ignore -> store.execute(contextFor(localSyncId), safe -> Commands.markBootstrapComplete(safe, localSyncId, ranges))) .flatMap(ignore -> store.execute(contextFor(localSyncId), safe -> validate.accept(safe.get(localSyncId, route.homeKey()).current()))) // cleanup txn - .flatMap(ignore -> store.submit(PreLoadContext.empty(), safe -> { + .withExecutor(store).flatMap(ignore -> { Cleanup target = cleanupGen.next(rs); if (target == Cleanup.NO) - return Cleanup.NO; - safe.commandStore().setRedundantBefore(RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, TxnId.NONE)); + return AsyncResults.success(Cleanup.NO); + AsyncResult<?> result = store.mergeAndUpdateRedundantBefore(RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, TxnId.NONE), nextGlobalSyncId, ranges); switch (target) { case ERASE: - safe.commandStore().setDurableBefore(DurableBefore.create(ranges, nextGlobalSyncId, nextGlobalSyncId)); + result = result.flatMap(ignored -> store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, nextGlobalSyncId, nextGlobalSyncId)).beginAsResult()).beginAsResult(); break; case TRUNCATE: - safe.commandStore().setDurableBefore(DurableBefore.create(ranges, nextGlobalSyncId, globalSyncId)); + result = result.flatMap(ignored -> store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, nextGlobalSyncId, globalSyncId)).beginAsResult()).beginAsResult(); break; case TRUNCATE_WITH_OUTCOME: case INVALIDATE: @@ -105,8 +109,8 @@ class BootstrapLocalTxnTest default: throw new UnsupportedOperationException(target.name()); } - return target; - })) + return result.map(ignored -> target); + }) // validateRead is called implicitly _on command completion_ .flatMap(target -> store.execute(contextFor(localSyncId), safe -> { SafeCommand cmd = safe.get(localSyncId, route.homeKey()); diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 470d23c3..608b2add 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -47,10 +47,10 @@ import accord.api.Read; import accord.api.Result; import accord.api.RoutingKey; import accord.api.Update; -import accord.impl.IntKey; import accord.impl.DefaultLocalListeners; import accord.impl.DefaultLocalListeners.DefaultNotifySink; import accord.impl.DefaultRemoteListeners; +import accord.impl.IntKey; import accord.local.Command; import accord.local.Command.AbstractCommand; import accord.local.CommandStore; @@ -903,7 +903,17 @@ public class CommandsForKeyTest protected TestCommandStore(int pruneInterval, int pruneHlcDelta) { - super(0, null, null, null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), new EpochUpdateHolder()); + super(0, + null, + null, + null, + ignore -> new ProgressLog.NoOpProgressLog(), + ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), + new EpochUpdateHolder(), + (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID, + (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID, + (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID, + (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID); this.pruneInterval = pruneInterval; this.pruneHlcDelta = pruneHlcDelta; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org