This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit cf62c3071c7a0fbdb50598f4b98e8f296b3dbc46 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Sep 25 17:20:12 2024 +0100 wip --- .../src/main/java/accord/api/DataStore.java | 3 +- .../java/accord/impl/InMemoryCommandStore.java | 179 +++++--------- .../java/accord/impl/InMemoryCommandStores.java | 17 +- .../java/accord/impl/progresslog/HomeState.java | 2 +- .../java/accord/impl/progresslog/WaitingState.java | 4 +- .../src/main/java/accord/local/Bootstrap.java | 34 +-- .../src/main/java/accord/local/Cleanup.java | 3 +- .../src/main/java/accord/local/Command.java | 3 +- .../src/main/java/accord/local/CommandStore.java | 265 +++++---------------- .../src/main/java/accord/local/CommandStores.java | 14 +- .../src/main/java/accord/local/Commands.java | 51 ++-- accord-core/src/main/java/accord/local/Node.java | 2 +- .../main/java/accord/local/RedundantBefore.java | 41 ++-- .../main/java/accord/local/SafeCommandStore.java | 2 +- .../main/java/accord/local/cfk/CommandsForKey.java | 4 +- .../src/main/java/accord/local/cfk/Updating.java | 6 +- .../java/accord/messages/SetGloballyDurable.java | 2 +- .../main/java/accord/messages/SetShardDurable.java | 2 +- .../test/java/accord/impl/RemoteListenersTest.java | 7 +- .../src/test/java/accord/impl/basic/Cluster.java | 2 +- .../accord/impl/basic/DelayedCommandStores.java | 14 +- .../src/test/java/accord/impl/list/ListStore.java | 107 +++++++-- .../java/accord/local/BootstrapLocalTxnTest.java | 131 ---------- .../java/accord/local/cfk/CommandsForKeyTest.java | 6 +- 24 files changed, 288 insertions(+), 613 deletions(-) diff --git a/accord-core/src/main/java/accord/api/DataStore.java b/accord-core/src/main/java/accord/api/DataStore.java index 259b39cd..a93ff073 100644 --- a/accord-core/src/main/java/accord/api/DataStore.java +++ b/accord-core/src/main/java/accord/api/DataStore.java @@ -26,6 +26,7 @@ import accord.primitives.Ranges; import accord.primitives.SyncPoint; import accord.primitives.Timestamp; import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; /** * A marker interface for a shard instance's storage, that is passed to @@ -111,6 +112,6 @@ public interface DataStore } FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback); - default void snapshot() {}; + default AsyncResult<Void> snapshot(Ranges ranges) { return AsyncResults.success(null); }; default void restoreFromSnapshot() {}; } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 65bc1618..1b474779 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -42,7 +42,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,19 +50,16 @@ import accord.api.DataStore; import accord.api.Key; import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.api.Scheduler; import accord.impl.progresslog.DefaultProgressLog; import accord.local.Cleanup; import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores.RangesForEpoch; import accord.local.Commands; -import accord.local.DurableBefore; import accord.local.KeyHistory; import accord.local.Node; import accord.local.NodeTimeService; import accord.local.PreLoadContext; -import accord.local.RedundantBefore; import accord.local.RedundantStatus; import accord.local.SafeCommand; import accord.local.SafeCommandStore; @@ -90,8 +86,6 @@ import accord.primitives.TxnId; import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; -import accord.utils.async.AsyncResult; -import accord.utils.async.AsyncResults; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestDep.WITH; @@ -129,54 +123,9 @@ public abstract class InMemoryCommandStore extends CommandStore private InMemorySafeStore current; - // To simulate the delay in simulatedAsyncPersist - private final Scheduler scheduler; - - private static final class SimulatedFieldPersister<T> implements FieldPersister<T> - { - private T lastValue; - private final Scheduler scheduler; - private final Node node; - private final int id; - public SimulatedFieldPersister(Scheduler scheduler, T defaultValue, Node node, int id) - { - this.scheduler = scheduler; - this.lastValue = defaultValue; - this.node = node; - this.id = id; - } - - public AsyncResult<?> persist(CommandStore store, T toPersist) - { - System.out.println("Persisting for " + node.id() + "-store-" + id); - AsyncResult.Settable<?> result = AsyncResults.settable(); - scheduler.once(() -> { - lastValue = toPersist; - result.trySuccess(null); - }, 100, TimeUnit.MICROSECONDS); - return result; - } - - public T restore() - { - return lastValue; - } - } - - public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) + public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) { - super(id, - time, - agent, - store, - progressLogFactory, - listenersFactory, - epochUpdateHolder, - new SimulatedFieldPersister<>(scheduler, DurableBefore.EMPTY, (Node) time, id), - new SimulatedFieldPersister<>(scheduler, RedundantBefore.EMPTY, (Node) time, id), - new SimulatedFieldPersister<>(scheduler, ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id), - new SimulatedFieldPersister<>(scheduler, ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id)); - this.scheduler = scheduler; + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); } protected boolean canExposeUnloaded() @@ -418,59 +367,57 @@ public abstract class InMemoryCommandStore extends CommandStore } @Override - public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges) + public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges) { - // We know it completes immediately for InMemoryCommandStore - AsyncChain<Void> markShardDurableChain = super.markShardDurable(safeStore, syncId, ranges); - markShardDurableChain = markShardDurableChain.map(ignored -> - { - if (!rangeCommands.containsKey(syncId)) - historicalRangeCommands.merge(syncId, ranges, Ranges::with); - - // TODO (now): apply on retrieval - historicalRangeCommands.entrySet().removeIf(next -> next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges)); - rangeCommands.entrySet().removeIf(tx -> { - if (tx.getKey().compareTo(syncId) >= 0) - return false; - Ranges newRanges = tx.getValue().ranges.without(ranges); - if (!newRanges.isEmpty()) - { - tx.getValue().ranges = newRanges; - return false; - } - else - { - maxRedundant = Timestamp.nonNullOrMax(maxRedundant, tx.getValue().command.value().executeAt()); - return true; - } - }); - - // verify we're clearing the progress log - ((Node)time).scheduler().once(() -> { - DefaultProgressLog progressLog = (DefaultProgressLog) this.progressLog; - commands.headMap(syncId, false).forEach((id, cmd) -> { - Command command = cmd.value(); - if (!command.hasBeen(PreCommitted)) return; - if (!command.txnId().kind().isGloballyVisible()) return; - - Ranges allRanges = unsafeRangesForEpoch().allBetween(id.epoch(), command.executeAtOrTxnId().epoch()); - boolean done = command.hasBeen(Truncated); - if (!done) - { - if (redundantBefore().status(cmd.txnId, command.executeAtOrTxnId(), command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE) - return; - - Route<?> route = cmd.value().route().slice(allRanges); - done = !route.isEmpty() && ranges.containsAll(route); - } - - if (done) Invariants.checkState(progressLog.get(id) == null); - }); - }, 5L, TimeUnit.SECONDS); - return null; - }, this); - - return markShardDurableChain; + super.markShardDurable(safeStore, syncId, ranges); + markShardDurable(syncId, ranges); + } + + private void markShardDurable(TxnId syncId, Ranges ranges) + { + if (!rangeCommands.containsKey(syncId)) + historicalRangeCommands.merge(syncId, ranges, Ranges::with); + + // TODO (now): apply on retrieval + historicalRangeCommands.entrySet().removeIf(next -> next.getKey().compareTo(syncId) < 0 && next.getValue().intersects(ranges)); + rangeCommands.entrySet().removeIf(tx -> { + if (tx.getKey().compareTo(syncId) >= 0) + return false; + Ranges newRanges = tx.getValue().ranges.without(ranges); + if (!newRanges.isEmpty()) + { + tx.getValue().ranges = newRanges; + return false; + } + else + { + maxRedundant = Timestamp.nonNullOrMax(maxRedundant, tx.getValue().command.value().executeAt()); + return true; + } + }); + + // verify we're clearing the progress log + ((Node)time).scheduler().once(() -> { + DefaultProgressLog progressLog = (DefaultProgressLog) this.progressLog; + commands.headMap(syncId, false).forEach((id, cmd) -> { + Command command = cmd.value(); + if (!command.hasBeen(PreCommitted)) return; + if (!command.txnId().kind().isGloballyVisible()) return; + + Ranges allRanges = unsafeRangesForEpoch().allBetween(id.epoch(), command.executeAtOrTxnId().epoch()); + boolean done = command.hasBeen(Truncated); + if (!done) + { + if (redundantBefore().status(cmd.txnId, command.executeAtOrTxnId(), command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE) + return; + + Route<?> route = cmd.value().route().slice(allRanges); + done = !route.isEmpty() && ranges.containsAll(route); + } + + if (done) Invariants.checkState(progressLog.get(id) == null); + }); + }, 5L, TimeUnit.SECONDS); } protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, @@ -1081,9 +1028,9 @@ public abstract class InMemoryCommandStore extends CommandStore Runnable active = null; final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); - public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) + public Synchronized(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); } private synchronized void maybeRun() @@ -1173,9 +1120,9 @@ public abstract class InMemoryCommandStore extends CommandStore private Thread thread; // when run in the executor this will be non-null, null implies not running in this store private final ExecutorService executor; - public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) + public SingleThread(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); this.executor = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']'); @@ -1257,9 +1204,9 @@ public abstract class InMemoryCommandStore extends CommandStore } } - public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler) + public Debug(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); } @Override @@ -1382,16 +1329,6 @@ public abstract class InMemoryCommandStore extends CommandStore commandsForKey.clear(); rangeCommands.clear(); historicalRangeCommands.clear(); - - durableBeforePersistentField.clearAndRestore(); - redundantBeforePersistentField.clearAndRestore(); - bootstrapBeganAtPersistentField.clearAndRestore(); - safeToReadPersistentField.clearAndRestore(); - } - - protected void setRedundantBefore(RedundantBefore newRedundantBefore) - { - super.setRedundantBefore(newRedundantBefore); } public interface Loader diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java index 9c7f56e0..1596bb19 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java @@ -22,7 +22,6 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.api.Scheduler; import accord.local.CommandStore; import accord.local.CommandStores; import accord.local.NodeTimeService; @@ -33,30 +32,30 @@ public class InMemoryCommandStores { public static class Synchronized extends CommandStores { - public Synchronized(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler scheduler) + public Synchronized(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new, scheduler); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new); } } public static class SingleThread extends CommandStores { - public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler scheduler) + public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new, scheduler); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new); } - public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler) + public SingleThread(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, shardFactory, scheduler); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, shardFactory); } } public static class Debug extends InMemoryCommandStores.SingleThread { - public Debug(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, Scheduler scheduler) + public Debug(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new, scheduler); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new); } } } diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index cf68e186..d9268b61 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -131,7 +131,7 @@ abstract class HomeState extends WaitingState { Invariants.checkState(!isHomeDoneOrUninitialised()); Command command = safeCommand.current(); - Invariants.checkState(!safeStore.isTruncated(command), () -> String.format("Command %s is truncated", command)); + Invariants.checkState(!safeStore.isTruncated(command), "Command %s is truncated", command); // TODO (expected): when invalidated, safer to maintain HomeState until known to be globally invalidated // TODO (now): validate that we clear HomeState when we receive a Durable reply, to replace the token check logic diff --git a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java index 58980feb..d60f2a28 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java @@ -304,9 +304,7 @@ abstract class WaitingState extends BaseTxnState Command command = safeCommand.current(); Invariants.checkState(!owner.hasActive(Waiting, txnId)); Invariants.checkState(command.saveStatus().compareTo(blockedUntil.minSaveStatus) < 0, - () -> String.format("Command has met desired criteria (%s) but progress log entry has not been cancelled: %s", - blockedUntil.minSaveStatus, - command)); + "Command has met desired criteria (%s) but progress log entry has not been cancelled: %s", blockedUntil.minSaveStatus, command); set(safeStore, owner, blockedUntil, Querying); TxnId txnId = safeCommand.txnId(); diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index b1ed9846..d77df6f5 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -41,7 +41,6 @@ import accord.utils.Invariants; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; -import static accord.local.PreLoadContext.contextFor; import static accord.local.PreLoadContext.empty; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; @@ -96,7 +95,7 @@ class Bootstrap } // an attempt to fetch some portion of the range we are bootstrapping - class Attempt implements FetchRanges, BiConsumer<Void, Throwable> + class Attempt implements FetchRanges, BiConsumer<Object, Throwable> { final List<SafeToRead> states = new ArrayList<>(); Runnable cancel; @@ -117,7 +116,7 @@ class Bootstrap this.valid = ranges; } - void start(SafeCommandStore safeStore0) + void start(SafeCommandStore safeStore) { if (valid.isEmpty()) { @@ -142,23 +141,16 @@ class Bootstrap // we fix here the ranges we use for the synthetic command, even though we may end up only finishing a subset // of these ranges as part of this attempt Ranges commitRanges = valid; - store.markBootstrapping(safeStore0.commandStore(), globalSyncId, valid).flatMap(ignore -> - CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges) - // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed - .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safeStore1 -> { - if (valid.isEmpty()) // we've lost ownership of the range - return AsyncResults.success(Ranges.EMPTY); - - Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, valid); - safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, safeStore1); - return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); - }))) - .flatMap(i -> i) - .flatMap(ranges -> store.execute(contextFor(localSyncId), safeStore -> { - if (!ranges.isEmpty()) - Commands.markBootstrapComplete(safeStore, localSyncId, ranges); - }))) - .begin(this); + safeStore = safeStore; + // we submit a separate execution so that we know markBootstrapping is durable before we initiate the fetch + safeStore.commandStore().submit(empty(), safeStore0 -> { + store.markBootstrapping(safeStore0, globalSyncId, commitRanges); + return CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges); + }).flatMap(i -> i).flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(empty(), safeStore1 -> { + if (valid.isEmpty()) // we've lost ownership of the range + return AsyncResults.success(Ranges.EMPTY); + return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); + }))).flatMap(i -> i).begin(this); } // we no longer want to fetch these ranges (perhaps we no longer own them) @@ -379,7 +371,7 @@ class Bootstrap } @Override - public void accept(Void success, Throwable failure) + public void accept(Object success, Throwable failure) { if (completed) return; diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index c86192bd..578912a6 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -98,7 +98,6 @@ public enum Cleanup commandStore.redundantBefore(), commandStore.durableBefore(), enforceInvariants); } - public static Cleanup shouldCleanup(TxnId txnId, Status status, Durability durability, EpochSupplier toEpoch, Route<?> route, RedundantBefore redundantBefore, DurableBefore durableBefore) { return shouldCleanup(txnId, status, durability, toEpoch, route, redundantBefore, durableBefore, true); @@ -135,7 +134,7 @@ public enum Cleanup // - we can impose additional validations here IF we receive an epoch upper bound // - we should be more robust to the presence/absence of executeAt // - be cognisant of future epochs that participated only for PreAccept/Accept, but where txn was not committed to execute in the epoch (this is why we provide null toEpoch here) - illegalState(String.format("Command %s that is being loaded is not owned by this shard on route %s. Redundant before: %s", txnId, route, redundantBefore)); + illegalState("Command %s that is being loaded is not owned by this shard on route %s. Redundant before: %s", txnId, route, redundantBefore); } } switch (redundant) diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index f1876624..ddc4b5a8 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -340,8 +340,7 @@ public abstract class Command implements CommonAttributes Invariants.checkState(result != null, "Result is null"); break; case Invalidated: - Invariants.checkState(validate.durability().isMaybeInvalidated(), - () -> String.format("%s is not invalidated", validate.durability())); + Invariants.checkState(validate.durability().isMaybeInvalidated(), "%s is not invalidated", validate.durability()); case Unknown: Invariants.checkState(validate.durability() != Local); case Erased: diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index b57569ac..298d0c04 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -28,15 +28,11 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import javax.annotation.Nonnull; import javax.annotation.Nullable; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +42,6 @@ import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.api.Scheduler; import accord.api.VisibleForImplementationTesting; import accord.coordinate.CollectCalculatedDeps; import accord.local.Command.WaitingOn; @@ -68,7 +63,6 @@ import accord.utils.DeterministicIdentitySet; import accord.utils.Invariants; import accord.utils.ReducingRangeMap; import accord.utils.async.AsyncChain; -import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import org.agrona.collections.Int2ObjectHashMap; @@ -82,7 +76,6 @@ import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; import static accord.utils.Invariants.checkState; import static accord.utils.Invariants.illegalState; -import static com.google.common.base.Preconditions.checkNotNull; /** * Single threaded internal shard of accord transaction metadata @@ -119,13 +112,13 @@ public abstract class CommandStore implements AgentExecutor // TODO (desired): can better encapsulate by accepting only the newRangesForEpoch and deriving the add/remove ranges public void add(long epoch, RangesForEpoch newRangesForEpoch, Ranges addRanges) { - RedundantBefore addRedundantBefore = RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.minForEpoch(epoch)); + RedundantBefore addRedundantBefore = RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.minForEpoch(epoch)); update(newRangesForEpoch, addRedundantBefore); } public void remove(long epoch, RangesForEpoch newRangesForEpoch, Ranges removeRanges) { - RedundantBefore addRedundantBefore = RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, TxnId.NONE, TxnId.NONE); + RedundantBefore addRedundantBefore = RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE); update(newRangesForEpoch, addRedundantBefore); } @@ -146,8 +139,7 @@ public abstract class CommandStore implements AgentExecutor DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, - EpochUpdateHolder rangesForEpoch, - Scheduler scheduler); + EpochUpdateHolder rangesForEpoch); } private static final ThreadLocal<CommandStore> CURRENT_STORE = new ThreadLocal<>(); @@ -189,22 +181,13 @@ public abstract class CommandStore implements AgentExecutor private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private ReducingRangeMap<Timestamp> rejectBefore; - protected final PersistentField<DurableBefore, DurableBefore> durableBeforePersistentField; - protected final PersistentField<RedundantBefore, RedundantBefore> redundantBeforePersistentField; - protected final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, Ranges>> bootstrapBeganAtPersistentField; - protected final PersistentField<NavigableMap<Timestamp, Ranges>, NavigableMap<Timestamp, Ranges>> safeToReadPersistentField; - protected CommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, - EpochUpdateHolder epochUpdateHolder, - FieldPersister<DurableBefore> persistDurableBefore, - FieldPersister<RedundantBefore> persistRedundantBefore, - FieldPersister<NavigableMap<TxnId, Ranges>> persistBootstrapBeganAt, - FieldPersister<NavigableMap<Timestamp, Ranges>> persistSafeToReadAt) + EpochUpdateHolder epochUpdateHolder) { this.id = id; this.time = time; @@ -213,11 +196,6 @@ public abstract class CommandStore implements AgentExecutor this.progressLog = progressLogFactory.create(this); this.listeners = listenersFactory.create(this); this.epochUpdateHolder = epochUpdateHolder; - this.durableBeforePersistentField = new PersistentField<>(this::durableBefore, DurableBefore::merge, persistDurableBefore, durableBefore -> setDurableBefore(durableBefore)); - this.redundantBeforePersistentField = new PersistentField<>(this::redundantBefore, RedundantBefore::merge, persistRedundantBefore, redundantBefore -> setRedundantBefore(redundantBefore)); - this.bootstrapBeganAtPersistentField = new PersistentField<>(this::bootstrapBeganAt, CommandStore::bootstrap, persistBootstrapBeganAt, bootstrapBeganAt -> setBootstrapBeganAt(bootstrapBeganAt)); - this.safeToReadPersistentField = new PersistentField<>(this::safeToRead, null, persistSafeToReadAt, safeToRead -> setSafeToRead(safeToRead)); - } public final int id() @@ -238,18 +216,12 @@ public abstract class CommandStore implements AgentExecutor return rangesForEpoch; update = epochUpdateHolder.getAndSet(null); - if (!update.addGlobalRanges.isEmpty()) - // Intentionally don't care if this persists since it will be replayed from topology at startup setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE))); - if (update.addRedundantBefore.size() > 0) - // Intentionally don't care if this persists since it will be replayed from topology at startup setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore)); - if (update.newRangesForEpoch != null) rangesForEpoch = update.newRangesForEpoch; - return rangesForEpoch; } @@ -279,12 +251,6 @@ public abstract class CommandStore implements AgentExecutor this.rejectBefore = newRejectBefore; } - protected AsyncResult<?> mergeAndUpdateBootstrapBeganAt(BootstrapSyncPoint globalSyncPoint) - { - return bootstrapBeganAtPersistentField.mergeAndUpdate(globalSyncPoint, null, null, false); - } - - // This will not work correctly if called outside PersistentField without remerging protected final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) { this.bootstrapBeganAt = newBootstrapBeganAt; @@ -295,23 +261,21 @@ public abstract class CommandStore implements AgentExecutor return durableBefore; } - public final AsyncResult<?> mergeAndUpdateDurableBefore(DurableBefore newDurableBefore) + public final void upsertDurableBefore(DurableBefore addDurableBefore) { - return durableBeforePersistentField.mergeAndUpdate(newDurableBefore, null, null, true); + durableBefore = DurableBefore.merge(durableBefore, addDurableBefore); } - // For implementations to use after persistence protected final void setDurableBefore(DurableBefore newDurableBefore) { durableBefore = newDurableBefore; } - protected final AsyncResult<?> mergeAndUpdateRedundantBefore(RedundantBefore newRedundantBefore, Timestamp gcBefore, Ranges updatedRanges) + protected void upsertRedundantBefore(RedundantBefore addRedundantBefore) { - return redundantBeforePersistentField.mergeAndUpdate(newRedundantBefore, gcBefore, updatedRanges, true); + redundantBefore = RedundantBefore.merge(redundantBefore, addRedundantBefore); } - // For implementations to use after persistence protected void setRedundantBefore(RedundantBefore newRedundantBefore) { redundantBefore = newRedundantBefore; @@ -336,13 +300,6 @@ public abstract class CommandStore implements AgentExecutor setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt)); } - protected AsyncResult<?> mergeAndUpdateSafeToRead(Function<NavigableMap<Timestamp, Ranges>, NavigableMap<Timestamp, Ranges>> computeNewValue) - { - // The input values are bound into the merge function to satisfy the fact that there are two different sets of inputs types to the merge function - // depending on whether it is purgeHistory or purgeAndInsert - return safeToReadPersistentField.mergeAndUpdate(computeNewValue); - } - /** * This method may be invoked on a non-CommandStore thread */ @@ -360,15 +317,13 @@ public abstract class CommandStore implements AgentExecutor setRejectBefore(newRejectBefore); } - public final AsyncChain<?> markExclusiveSyncPointLocallyApplied(CommandStore commandStore, TxnId txnId, Ranges ranges) + public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint); - RedundantBefore newRedundantBefore = RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, TxnId.NONE, TxnId.NONE)); - AsyncResult<?> setRedundantBeforeChain = mergeAndUpdateRedundantBefore(newRedundantBefore, txnId, ranges); - return setRedundantBeforeChain.flatMap( - ignored -> commandStore.execute(contextFor(txnId), - safeStore -> updatedRedundantBefore(safeStore, txnId, ranges))); + RedundantBefore newRedundantBefore = RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, TxnId.NONE, TxnId.NONE, TxnId.NONE)); + setRedundantBefore(newRedundantBefore); + updatedRedundantBefore(safeStore, txnId, ranges); } /** @@ -558,35 +513,37 @@ public abstract class CommandStore implements AgentExecutor bootstraps.remove(bootstrap); } - final AsyncChain<?> markBootstrapping(CommandStore commandStore, TxnId globalSyncId, Ranges ranges) + final void markBootstrapping(SafeCommandStore safeStore, TxnId globalSyncId, Ranges ranges) { - store.snapshot(); - AsyncResult<?> setBootstrapBeganAtResult = mergeAndUpdateBootstrapBeganAt(new BootstrapSyncPoint(globalSyncId, ranges)); - RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId); - // TODO (review): What is the correct txnId to provide here to restrict what memtables are flushed? - AsyncResult<?> setRedundantBeforeResult = mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore), globalSyncId, ranges); - DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); - AsyncResult<?> setDurableBeforeResult = mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); - AsyncChain<?> combinedChain = AsyncChains.allOf(ImmutableList.of(setBootstrapBeganAtResult, setRedundantBeforeResult, setDurableBeforeResult)); - return combinedChain.flatMap( - ignored -> commandStore.execute(PreLoadContext.contextFor(globalSyncId), - safeStore -> updatedRedundantBefore(safeStore, globalSyncId, ranges))); + setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); + RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, globalSyncId); + upsertRedundantBefore(addRedundantBefore); + upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE)); + updatedRedundantBefore(safeStore, globalSyncId, ranges); } // TODO (expected): we can immediately truncate dependencies locally once an exclusiveSyncPoint applies, we don't need to wait for the whole shard - public AsyncChain<Void> markShardDurable(SafeCommandStore safeStore0, TxnId globalSyncId, Ranges ranges) - { - store.snapshot(); - ranges = ranges.slice(safeStore0.ranges().allUntil(globalSyncId.epoch()), Minimal); - RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE); - AsyncResult<?> setRedundantBeforeChain = mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore), globalSyncId, ranges); - DurableBefore addDurableBefore = DurableBefore.create(ranges, globalSyncId, globalSyncId); - AsyncResult<?> setDurableBeforeChain = mergeAndUpdateDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); - Ranges slicedRanges = ranges; - AsyncChain<?> combinedChain = AsyncChains.allOf(ImmutableList.of(setRedundantBeforeChain, setDurableBeforeChain)); - return combinedChain.flatMap( - ignored -> safeStore0.commandStore().execute(PreLoadContext.contextFor(globalSyncId), - safeStore1 -> updatedRedundantBefore(safeStore1, globalSyncId, slicedRanges))); + public void markShardDurable(SafeCommandStore safeStore, TxnId globalSyncId, Ranges durableRanges) + { + final Ranges slicedRanges = durableRanges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal); + RedundantBefore addShardRedundant = RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE, TxnId.NONE); + upsertRedundantBefore(addShardRedundant); + DurableBefore addDurableBefore = DurableBefore.create(slicedRanges, globalSyncId, globalSyncId); + upsertDurableBefore(addDurableBefore); + updatedRedundantBefore(safeStore, globalSyncId, slicedRanges); + safeStore = safeStore; // make unusable in lambda + safeStore.dataStore().snapshot(slicedRanges).begin((success, fail) -> { + if (fail != null) + { + logger.error("Unsuccessful dataStore snapshot; unable to update GC markers", fail); + return; + } + + execute(PreLoadContext.empty(), safeStore0 -> { + RedundantBefore addGc = RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId, TxnId.NONE); + upsertRedundantBefore(addGc); + }); + }); } protected void updatedRedundantBefore(SafeCommandStore safeStore, TxnId syncId, Ranges ranges) @@ -598,7 +555,6 @@ public abstract class CommandStore implements AgentExecutor // also: we no longer expect epochs that are losing a range to be marked stale, make sure logic reflects this public void markShardStale(SafeCommandStore safeStore, Timestamp staleSince, Ranges ranges, boolean isSincePrecise) { - store.snapshot(); Timestamp staleUntilAtLeast = staleSince; if (isSincePrecise) { @@ -612,10 +568,8 @@ public abstract class CommandStore implements AgentExecutor } agent.onStale(staleSince, ranges); - RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast); - // TODO (review): Is it ok for this to be asynchronous here? - // TODO (review): Is stale since the right txnid here? - mergeAndUpdateRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore), staleSince, ranges).addCallback(agent); + RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast); + setRedundantBefore(RedundantBefore.merge(redundantBefore, addRedundantBefore)); // find which ranges need to bootstrap, subtracting those already in progress that cover the id markUnsafeToRead(ranges); @@ -631,7 +585,7 @@ public abstract class CommandStore implements AgentExecutor { // Merge in a base for any ranges that needs to be covered DurableBefore addDurableBefore = DurableBefore.create(ranges, TxnId.NONE, TxnId.NONE); - setDurableBefore(DurableBefore.merge(durableBefore, addDurableBefore)); + upsertDurableBefore(addDurableBefore); // TODO (review): Convoluted check to not overwrite existing bootstraps with TxnId.NONE // If loading from disk didn't finish before this then we might initialize the range at TxnId.NONE? // Does CommandStores.topology ensure that doesn't happen? Is it fine if it does because it will get superseded? @@ -639,7 +593,7 @@ public abstract class CommandStore implements AgentExecutor for (Ranges existing : bootstrapBeganAt.values()) newBootstrapRanges = newBootstrapRanges.without(existing); if (!newBootstrapRanges.isEmpty()) - bootstrapBeganAt = bootstrap(new BootstrapSyncPoint(TxnId.NONE, newBootstrapRanges), bootstrapBeganAt); + bootstrapBeganAt = bootstrap(TxnId.NONE, newBootstrapRanges, bootstrapBeganAt); safeToRead = purgeAndInsert(safeToRead, TxnId.NONE, ranges); return () -> new EpochReady(epoch, DONE, DONE, DONE, DONE); } @@ -696,9 +650,9 @@ public abstract class CommandStore implements AgentExecutor Keys prev = partiallyBootstrapping.get(txnIdx); Keys remaining = prev; if (remaining == null) remaining = builder.directKeyDeps.participatingKeys(txnIdx); - else checkState(!remaining.isEmpty()); + else Invariants.checkState(!remaining.isEmpty()); remaining = remaining.without(range); - if (prev == null) checkState(!remaining.isEmpty()); + if (prev == null) Invariants.checkState(!remaining.isEmpty()); partiallyBootstrapping.put(txnIdx, remaining); return remaining.isEmpty(); } @@ -761,9 +715,9 @@ public abstract class CommandStore implements AgentExecutor Ranges prev = partiallyBootstrapping.get(rangeTxnIdx); Ranges remaining = prev; if (remaining == null) remaining = builder.directRangeDeps.ranges(rangeTxnIdx); - else checkState(!remaining.isEmpty()); + else Invariants.checkState(!remaining.isEmpty()); remaining = remaining.without(Ranges.of(range)); - if (prev == null) checkState(!remaining.isEmpty()); + if (prev == null) Invariants.checkState(!remaining.isEmpty()); partiallyBootstrapping.put(rangeTxnIdx, remaining); return remaining.isEmpty(); } @@ -817,19 +771,19 @@ public abstract class CommandStore implements AgentExecutor final synchronized void markUnsafeToRead(Ranges ranges) { if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges))) - mergeAndUpdateSafeToRead(safeToRead -> purgeHistory(safeToRead, ranges)); + setSafeToRead(purgeHistory(safeToRead, ranges)); } final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp at, Ranges ranges) { Ranges validatedSafeToRead = redundantBefore.validateSafeToRead(forBootstrapAt, ranges); - mergeAndUpdateSafeToRead(safeToRead -> purgeAndInsert(safeToRead, at, validatedSafeToRead)); + setSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead)); } protected static class BootstrapSyncPoint { - TxnId syncTxnId; - Ranges ranges; + final TxnId syncTxnId; + final Ranges ranges; protected BootstrapSyncPoint(TxnId syncTxnId, Ranges ranges) { @@ -838,14 +792,12 @@ public abstract class CommandStore implements AgentExecutor } } - protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(BootstrapSyncPoint syncPoint, NavigableMap<TxnId, Ranges> bootstrappedAt) + protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, Ranges ranges, NavigableMap<TxnId, Ranges> bootstrappedAt) { - TxnId at = syncPoint.syncTxnId; - Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0 || syncPoint.syncTxnId == TxnId.NONE); - if (syncPoint.syncTxnId == TxnId.NONE) - for (Ranges ranges : bootstrappedAt.values()) - checkState(!syncPoint.ranges.intersects(ranges)); - Ranges ranges = syncPoint.ranges; + Invariants.checkArgument(bootstrappedAt.lastKey().compareTo(at) < 0 || at == TxnId.NONE); + if (at == TxnId.NONE) + for (Ranges rs : bootstrappedAt.values()) + checkState(!ranges.intersects(rs)); Invariants.checkArgument(!ranges.isEmpty()); // if we're bootstrapping these ranges, then any period we previously owned the ranges for is effectively invalidated return purgeAndInsert(bootstrappedAt, at, ranges); @@ -882,109 +834,4 @@ public abstract class CommandStore implements AgentExecutor return in; return new SimpleImmutableEntry<>(in.getKey(), without); } - - protected interface FieldPersister<T> - { - default AsyncResult<?> persist(CommandStore store, Timestamp timestamp, Ranges ranges, T toPersist) - { - return persist(store, toPersist); - } - - AsyncResult<?> persist(CommandStore store, T toPersist); - - default T restore() { return null; }; - } - - // A helper class for implementing fields that needs to be asynchronously persisted and concurrent updates - // need to be merged and ordered - protected class PersistentField<I, T> - { - @Nonnull - private final Supplier<T> currentValue; - // The update can be bound into the merge function in which case it will be null - // Useful when the merge/update function takes multiple types of input arguments - @Nullable - private final BiFunction<I, T, T> merge; - @Nonnull - private final FieldPersister<T> persister; - @Nonnull - private final Consumer<T> set; - - private T pendingValue; - private AsyncResult<?> pendingResult; - - public PersistentField(@Nonnull Supplier<T> currentValue, @Nonnull BiFunction<I, T, T> merge, @Nonnull FieldPersister<T> persist, @Nullable Consumer<T> set) - { - checkNotNull(currentValue, "currentValue cannot be null"); - checkNotNull(persist, "persist cannot be null"); - checkNotNull(set, "set cannot be null"); - this.currentValue = currentValue; - this.merge = merge; - this.persister = persist; - this.set = set; - } - - public void clearAndRestore() - { - pendingValue = null; - set.accept(persister.restore()); - } - - public AsyncResult<?> mergeAndUpdate(@Nonnull I inputValue, @Nullable Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean remergeAfterPersistence) - { - checkNotNull(merge, "merge cannot be null"); - checkNotNull(inputValue, "inputValue cannot be null"); - return mergeAndUpdate(inputValue, merge, gcBefore, updatedRanges, remergeAfterPersistence); - } - - public AsyncResult<?> mergeAndUpdate(@Nonnull Function<T, T> update) - { - checkNotNull(update, "merge cannot be null"); - return mergeAndUpdate(null, (ignored, existingValue) -> update.apply(existingValue), null, null, false); - } - - private AsyncResult<?> mergeAndUpdate(@Nullable I inputValue, @Nonnull BiFunction<I, T, T> merge, @Nullable Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean remergeAfterPersistence) - { - checkNotNull(merge, "merge cannot be null"); - AsyncResult.Settable<Void> result = AsyncResults.settable(); - AsyncResult<?> oldPendingResult = pendingResult; - T startingValue = currentValue.get(); - T newValue = pendingValue != null ? merge.apply(inputValue, pendingValue) : merge.apply(inputValue, startingValue); - this.pendingResult = result; - this.pendingValue= newValue; - - AsyncResult<?> pendingWrite = persister.persist(CommandStore.this, gcBefore, updatedRanges, newValue); - - final T newValueFinal = newValue; - BiConsumer<Object, Throwable> callback = (ignored, failure) -> { - if (PersistentField.this.pendingResult == result) - { - PersistentField.this.pendingResult = null; - PersistentField.this.pendingValue = null; - } - if (failure != null) - result.tryFailure(failure); - else - { - // DurableBefore and RedundantBefore can have initial values set non-persistently in updateRangesForEpoch so remerge them here - // updateRangesForEpoch really doesn't integrate well with is callers if it is asynchronous updating these values - // so this complexity is better than the alternative - if (remergeAfterPersistence && currentValue.get() != startingValue) - // I and T will have to be the same for remerge to work - set.accept(merge.apply((I)newValueFinal, currentValue.get())); - else - set.accept(newValueFinal); - result.trySuccess(null); - } - }; - - // Order completion after previous updates, this is probably stricter than necessary but easy to implement - if (oldPendingResult != null) - oldPendingResult.addCallback(() -> pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent)); - else - pendingWrite.withExecutor(CommandStore.this).addCallback(callback).begin(agent); - - return result; - } - } } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 4c8efabc..a6b7b030 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -42,7 +42,6 @@ import accord.api.Key; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; -import accord.api.Scheduler; import accord.local.CommandStore.EpochUpdateHolder; import accord.primitives.EpochSupplier; import accord.primitives.Participants; @@ -86,8 +85,7 @@ public abstract class CommandStores RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, - LocalListeners.Factory listenersFactory, - Scheduler scheduler); + LocalListeners.Factory listenersFactory); } private static class StoreSupplier @@ -99,9 +97,8 @@ public abstract class CommandStores private final LocalListeners.Factory listenersFactory; private final CommandStore.Factory shardFactory; private final RandomSource random; - private final Scheduler scheduler; - StoreSupplier(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler) + StoreSupplier(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) { this.time = time; this.agent = agent; @@ -110,12 +107,11 @@ public abstract class CommandStores this.progressLogFactory = progressLogFactory; this.listenersFactory = listenersFactory; this.shardFactory = shardFactory; - this.scheduler = scheduler; } CommandStore create(int id, EpochUpdateHolder rangesForEpoch) { - return shardFactory.create(id, time, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch, scheduler); + return shardFactory.create(id, time, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch); } } @@ -369,9 +365,9 @@ public abstract class CommandStores } public CommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, - ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Scheduler scheduler) + ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) { - this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory, scheduler), shardDistributor); + this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory), shardDistributor); } public Topology local() diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 5d83e33d..e164ec11 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -551,9 +551,7 @@ public class Commands { Command.Executed command = safeStore.get(txnId).current().asExecuted(); if (command.hasBeen(Applied)) - { return AsyncChains.success(null); - } return apply(safeStore, context, txnId); } @@ -564,7 +562,8 @@ public class Commands // that was pre-bootstrap for some range (so redundant and we may have gone ahead of), but had to be executed locally // for another range CommandStore unsafeStore = safeStore.commandStore(); - // TODO (required): avoid allocating a timestamp here + // TODO (required, API): do we care about tracking the write persistence latency, when this is just a memtable write? + // the only reason it will be slow is because Memtable flushes are backed-up (which will be reported elsewhere) long t0 = safeStore.time().now(); return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt()), command.partialTxn()) .flatMap(unused -> unsafeStore.submit(context, ss -> { @@ -617,7 +616,7 @@ public class Commands return maybeExecute(safeStore, safeCommand, safeCommand.current(), alwaysNotifyListeners, notifyWaitingOn); } - public static boolean maybeExecute(SafeCommandStore safeStore0, SafeCommand safeCommand0, Command command, boolean alwaysNotifyListeners, boolean notifyWaitingOn) + public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, boolean alwaysNotifyListeners, boolean notifyWaitingOn) { if (logger.isTraceEnabled()) logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", command.txnId(), command.status(), alwaysNotifyListeners); @@ -625,7 +624,7 @@ public class Commands if (command.status() != Stable && command.status() != PreApplied) { if (alwaysNotifyListeners) - safeStore0.notifyListeners(safeCommand0, command); + safeStore.notifyListeners(safeCommand, command); return false; } @@ -633,10 +632,10 @@ public class Commands if (waitingOn.isWaiting()) { if (alwaysNotifyListeners) - safeStore0.notifyListeners(safeCommand0, command); + safeStore.notifyListeners(safeCommand, command); if (notifyWaitingOn && waitingOn.isWaitingOnCommand()) - new NotifyWaitingOn(safeCommand0).accept(safeStore0); + new NotifyWaitingOn(safeCommand).accept(safeStore); return false; } @@ -649,23 +648,23 @@ public class Commands // TODO (required): we can have dangling transactions in some cases when proposing in a future epoch but // later deciding on an earlier epoch. We should probably turn this into an erased vestigial command, // but we should tighten up our semantics there in general. - safeCommand0.readyToExecute(safeStore0); + safeCommand.readyToExecute(safeStore); logger.trace("{}: set to ReadyToExecute", command.txnId()); - safeStore0.notifyListeners(safeCommand0, command); + safeStore.notifyListeners(safeCommand, command); return true; case PreApplied: - Ranges executeRanges = executeRanges(safeStore0, command.executeAt()); + Ranges executeRanges = executeRanges(safeStore, command.executeAt()); Command.Executed executed = command.asExecuted(); boolean intersects = executed.writes().keys.intersects(executeRanges); if (intersects) { // TODO (now): we should set applying within apply to avoid applying multiple times - safeCommand0.applying(safeStore0); - safeStore0.notifyListeners(safeCommand0, command); + safeCommand.applying(safeStore); + safeStore.notifyListeners(safeCommand, command); logger.trace("{}: applying", command.txnId()); - apply(safeStore0, executed); + apply(safeStore, executed); return true; } else @@ -673,26 +672,8 @@ public class Commands // TODO (desirable, performance): This could be performed immediately upon Committed // but: if we later support transitive dependency elision this could be dangerous logger.trace("{}: applying no-op", command.txnId()); - if (command.txnId().kind() == ExclusiveSyncPoint) - { - Ranges ranges = safeStore0.ranges().allAt(command.txnId().epoch()); - ranges = command.route().slice(ranges, Minimal).participants().toRanges(); - CommandStore commandStore = safeStore0.commandStore(); - safeCommand0.applying(safeStore0); - safeStore0.notifyListeners(safeCommand0, command); - commandStore.markExclusiveSyncPointLocallyApplied(commandStore, command.txnId(), ranges).flatMap(ignored -> - commandStore.execute(PreLoadContext.contextFor(command.txnId()), safeStore1 -> { - SafeCommand safeCommand1 = safeStore1.get(command.txnId()); - safeCommand1.applied(safeStore1); - safeStore1.notifyListeners(safeCommand1, command); - }) - ).begin(commandStore.agent); - } - else - { - safeCommand0.applied(safeStore0); - safeStore0.notifyListeners(safeCommand0, command); - } + safeCommand.applied(safeStore); + safeStore.notifyListeners(safeCommand, command); return true; } default: @@ -910,7 +891,7 @@ public class Commands break; case TRUNCATE_WITH_OUTCOME: - Invariants.checkArgument(!command.hasBeen(Truncated), command.toString()); + Invariants.checkArgument(!command.hasBeen(Truncated), "%s", command); Invariants.checkState(command.hasBeen(PreApplied)); result = truncatedApplyWithOutcome(command.asExecuted()); break; @@ -1206,7 +1187,7 @@ public class Commands public static Command updateRouteOrParticipants(SafeCommandStore safeStore, SafeCommand safeCommand, Unseekables<?> participants) { Command current = safeCommand.current(); - if (current.hasBeen(Invalidated) || current.hasBeen(Truncated)) + if (current.saveStatus().compareTo(Erased) >= 0) return current; CommonAttributes updated = current; diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 22e300f7..f008edf1 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -192,7 +192,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService this.now = new AtomicReference<>(Timestamp.fromValues(topology.epoch(), nowSupplier.getAsLong(), id)); this.agent = agent; this.random = random; - this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this), scheduler); + this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this)); // TODO review these leak a reference to an object that hasn't finished construction, possibly to other threads configService.registerListener(this); } diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 26942bf6..a39cdf98 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -95,6 +95,12 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> */ public final @Nonnull TxnId shardAppliedOrInvalidatedBefore; + /** + * Represents the maximum TxnId we know to have fully executed until across all healthy replicas for the range in question. + * Unless we are stale or pre-bootstrap, in which case no such guarantees can be made. + */ + public final @Nonnull TxnId gcBefore; + /** * bootstrappedAt defines the txnId bounds we expect to maintain data for locally. * @@ -116,12 +122,12 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> */ public final @Nullable Timestamp staleUntilAtLeast; - public Entry(Range range, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) + public Entry(Range range, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) { - this(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast); + this(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } - public Entry(Range range, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId locallyDecidedAndAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) + public Entry(Range range, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId locallyDecidedAndAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) { this.range = range; this.startEpoch = startEpoch; @@ -129,11 +135,14 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> this.locallyAppliedOrInvalidatedBefore = locallyAppliedOrInvalidatedBefore; this.locallyDecidedAndAppliedOrInvalidatedBefore = locallyDecidedAndAppliedOrInvalidatedBefore; this.shardAppliedOrInvalidatedBefore = shardAppliedOrInvalidatedBefore; + this.gcBefore = gcBefore; this.bootstrappedAt = bootstrappedAt; this.staleUntilAtLeast = staleUntilAtLeast; Invariants.checkArgument(locallyAppliedOrInvalidatedBefore.equals(TxnId.NONE) || locallyAppliedOrInvalidatedBefore.domain().isRange()); Invariants.checkArgument(locallyDecidedAndAppliedOrInvalidatedBefore.equals(TxnId.NONE) || locallyDecidedAndAppliedOrInvalidatedBefore.domain().isRange()); Invariants.checkArgument(shardAppliedOrInvalidatedBefore.equals(TxnId.NONE) || shardAppliedOrInvalidatedBefore.domain().isRange()); + Invariants.checkArgument(gcBefore.equals(TxnId.NONE) || gcBefore.domain().isRange()); + Invariants.checkArgument(gcBefore.compareTo(shardAppliedOrInvalidatedBefore) <= 0); } public static Entry reduce(Entry a, Entry b) @@ -154,6 +163,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> int cl = cur.locallyAppliedOrInvalidatedBefore.compareTo(add.locallyAppliedOrInvalidatedBefore); int cd = cur.locallyDecidedAndAppliedOrInvalidatedBefore.compareTo(add.locallyDecidedAndAppliedOrInvalidatedBefore); int cs = cur.shardAppliedOrInvalidatedBefore.compareTo(add.shardAppliedOrInvalidatedBefore); + int cg = cur.gcBefore.compareTo(add.gcBefore); int cb = cur.bootstrappedAt.compareTo(add.bootstrappedAt); int csu = compareStaleUntilAtLeast(cur.staleUntilAtLeast, add.staleUntilAtLeast); @@ -165,6 +175,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> TxnId locallyAppliedOrInvalidatedBefore = cl >= 0 ? cur.locallyAppliedOrInvalidatedBefore : add.locallyAppliedOrInvalidatedBefore; TxnId locallyDecidedAndAppliedOrInvalidatedBefore = cd >= 0 ? cur.locallyDecidedAndAppliedOrInvalidatedBefore : add.locallyDecidedAndAppliedOrInvalidatedBefore; TxnId shardAppliedOrInvalidatedBefore = cs >= 0 ? cur.shardAppliedOrInvalidatedBefore : add.shardAppliedOrInvalidatedBefore; + TxnId gcBefore = cg >= 0 ? cur.gcBefore : add.gcBefore; TxnId bootstrappedAt = cb >= 0 ? cur.bootstrappedAt : add.bootstrappedAt; Timestamp staleUntilAtLeast = csu >= 0 ? cur.staleUntilAtLeast : add.staleUntilAtLeast; @@ -182,7 +193,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (staleUntilAtLeast != null && bootstrappedAt.compareTo(staleUntilAtLeast) >= 0) staleUntilAtLeast = null; - return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast); + return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } static @Nonnull RedundantStatus getAndMerge(Entry entry, @Nonnull RedundantStatus prev, TxnId txnId, EpochSupplier executeAt) @@ -305,7 +316,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> Entry withRange(Range range) { - return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast); + return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } public boolean equals(Object that) @@ -376,27 +387,27 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return Ranges.ofSortedAndDeoverlapped(staleRanges).mergeTouching(); } - public static RedundantBefore create(Ranges ranges, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt) + public static RedundantBefore create(Ranges ranges, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt) { - return create(ranges, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, null); + return create(ranges, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, null); } - public static RedundantBefore create(Ranges ranges, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) + public static RedundantBefore create(Ranges ranges, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) { - return create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast); + return create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } - public static RedundantBefore create(Ranges ranges, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt) + public static RedundantBefore create(Ranges ranges, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt) { - return create(ranges, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, null); + return create(ranges, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, null); } - public static RedundantBefore create(Ranges ranges, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) + public static RedundantBefore create(Ranges ranges, long startEpoch, long endEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) { if (ranges.isEmpty()) return new RedundantBefore(); - Entry entry = new Entry(null, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast); + Entry entry = new Entry(null, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); Builder builder = new Builder(ranges.get(0).endInclusive(), ranges.size() * 2); for (int i = 0 ; i < ranges.size() ; ++i) { @@ -509,7 +520,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (v.range.start().equals(start) && v.range.end().equals(end)) return v; - return new Entry(v.range.newRange(start, end), v.startEpoch, v.endEpoch, v.locallyAppliedOrInvalidatedBefore, v.shardAppliedOrInvalidatedBefore, v.bootstrappedAt, v.staleUntilAtLeast); + return new Entry(v.range.newRange(start, end), v.startEpoch, v.endEpoch, v.locallyAppliedOrInvalidatedBefore, v.shardAppliedOrInvalidatedBefore, v.gcBefore, v.bootstrappedAt, v.staleUntilAtLeast); } @Override @@ -528,7 +539,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return new Entry(a.range.newRange( a.range.start().compareTo(b.range.start()) <= 0 ? a.range.start() : b.range.start(), a.range.end().compareTo(b.range.end()) >= 0 ? a.range.end() : b.range.end() - ), a.startEpoch, a.endEpoch, a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, a.bootstrappedAt, a.staleUntilAtLeast); + ), a.startEpoch, a.endEpoch, a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, a.staleUntilAtLeast); } @Override diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index bcabdb90..c2a02d18 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -230,7 +230,7 @@ public abstract class SafeCommandStore if (newSaveStatus == Applied && oldSaveStatus != Applied) { Ranges ranges = updated.route().slice(ranges().all(), Minimal).toRanges(); - commandStore().markExclusiveSyncPointLocallyApplied(this.commandStore(), txnId, ranges); + commandStore().markExclusiveSyncPointLocallyApplied(this, txnId, ranges); } } diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index ad52b511..7cc2bb84 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -1573,12 +1573,12 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm private void checkBehindCommitForLinearizabilityViolation(TxnInfo newInfo, TxnInfo maxAppliedWrite) { - if (!isPreBootstrap(newInfo) && CommandsForKey.reportLinearizabilityViolations()) + if (!isPreBootstrap(newInfo)) { for (int i = maxAppliedWriteByExecuteAt ; i >= 0 ; --i) { TxnInfo txn = committedByExecuteAt[i]; - if (newInfo == txn) + if (newInfo == txn && CommandsForKey.reportLinearizabilityViolations()) { // we haven't found anything pre-bootstrap that follows this command, so log a linearizability violation // TODO (expected): this should be a rate-limited logger; need to integrate with Cassandra diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java b/accord-core/src/main/java/accord/local/cfk/Updating.java index 2b653064..fdf8eb9d 100644 --- a/accord-core/src/main/java/accord/local/cfk/Updating.java +++ b/accord-core/src/main/java/accord/local/cfk/Updating.java @@ -610,13 +610,13 @@ class Updating System.arraycopy(committedByExecuteAt, pos, result, pos + 1, committedByExecuteAt.length - pos); int maxAppliedWriteByExecuteAt = cfk.maxAppliedWriteByExecuteAt; - if (reportLinearizabilityViolations() && pos <= maxAppliedWriteByExecuteAt) + if (pos <= maxAppliedWriteByExecuteAt) { if (pos < maxAppliedWriteByExecuteAt && !wasPruned && cfk.isPostBootstrap(newInfo)) { for (int i = pos; i <= maxAppliedWriteByExecuteAt; ++i) { - if (committedByExecuteAt[pos].kind().witnesses(newInfo)) + if (committedByExecuteAt[pos].kind().witnesses(newInfo) && reportLinearizabilityViolations()) logger.error("Linearizability violation on key {}: {} is committed to execute (at {}) before {} that should witness it but has already applied (at {})", cfk.key, newInfo.plainTxnId(), newInfo.plainExecuteAt(), committedByExecuteAt[i].plainTxnId(), committedByExecuteAt[i].plainExecuteAt()); } } @@ -674,7 +674,7 @@ class Updating TxnInfo[] committedByExecuteAt = cfk.committedByExecuteAt; for (int i = cfk.maxAppliedWriteByExecuteAt + 1; i < appliedPos ; ++i) { - if (reportLinearizabilityViolations() && committedByExecuteAt[i].status != APPLIED && appliedKind.witnesses(committedByExecuteAt[i])) + if (committedByExecuteAt[i].status != APPLIED && appliedKind.witnesses(committedByExecuteAt[i]) && reportLinearizabilityViolations()) logger.error("Linearizability violation on key {}: {} is committed to execute (at {}) before {} that should witness it but has already applied (at {})", cfk.key, committedByExecuteAt[i].plainTxnId(), committedByExecuteAt[i].plainExecuteAt(), applied.plainTxnId(), applied.plainExecuteAt()); } } diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java index 61a08bca..6a295d88 100644 --- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java +++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java @@ -52,7 +52,7 @@ public class SetGloballyDurable extends AbstractEpochRequest<SimpleReply> DurableBefore cur = safeStore.commandStore().durableBefore(); DurableBefore upd = DurableBefore.merge(durableBefore, cur); // This is done asynchronously - safeStore.commandStore().mergeAndUpdateDurableBefore(upd).begin(node.agent()); + safeStore.commandStore().upsertDurableBefore(upd); return Ok; } diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java b/accord-core/src/main/java/accord/messages/SetShardDurable.java index c0c39069..09c7b236 100644 --- a/accord-core/src/main/java/accord/messages/SetShardDurable.java +++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java @@ -46,7 +46,7 @@ public class SetShardDurable extends AbstractEpochRequest<SimpleReply> @Override public SimpleReply apply(SafeCommandStore safeStore) { - safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges).begin(node.agent()); + safeStore.commandStore().markShardDurable(safeStore, exclusiveSyncPoint.syncId, (Ranges)exclusiveSyncPoint.keysOrRanges); return Ok; } diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java index 60f59895..664f025a 100644 --- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java +++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java @@ -64,7 +64,6 @@ import accord.utils.AccordGens; import accord.utils.RandomSource; import accord.utils.RandomTestRunner; import accord.utils.async.AsyncChain; -import accord.utils.async.AsyncResults; import org.agrona.collections.IntHashSet; import org.agrona.collections.ObjectHashSet; @@ -391,11 +390,7 @@ public class RemoteListenersTest ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultLocalListeners.DefaultNotifySink.INSTANCE), - new EpochUpdateHolder(), - (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID, - (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID, - (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID, - (ignored1, ignored2) -> AsyncResults.SUCCESS_VOID); + new EpochUpdateHolder()); this.storeId = id; } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index baa4ba6f..a938052e 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -468,7 +468,7 @@ public class Cluster implements Scheduler BurnTestConfigurationService configService = new BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, nodeMap::get, topologyUpdates); BooleanSupplier isLoadedCheck = Gens.supplier(Gens.bools().mixedDistribution().next(random), random); Node node = new Node(id, messageSink, configService, nowSupplier, NodeTimeService.elapsedWrapperFromNonMonotonicSource(TimeUnit.MILLISECONDS, nowSupplier), - () -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()), + () -> new ListStore(sinks, random, id), new ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()), nodeExecutor.agent(), randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultRequestTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, isLoadedCheck, journal), new CoordinationAdapter.DefaultFactory(), diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index c1cb4fac..fd2256a3 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -109,15 +109,15 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread return false; } - private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, SimulatedDelayedExecutorService executorService, BooleanSupplier isLoadedCheck, Journal journal, Scheduler scheduler) + private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, SimulatedDelayedExecutorService executorService, BooleanSupplier isLoadedCheck, Journal journal) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal), scheduler); + super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal)); } public static CommandStores.Factory factory(PendingQueue pending, BooleanSupplier isLoadedCheck, Journal journal) { - return (time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, scheduler) -> - new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal, scheduler); + return (time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory) -> + new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal); } @Override @@ -168,9 +168,9 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private final BooleanSupplier isLoadedCheck; private final Journal journal; - public DelayedCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal, Scheduler scheduler) + public DelayedCommandStore(int id, NodeTimeService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, scheduler); + super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); this.executor = executor; this.isLoadedCheck = isLoadedCheck; this.journal = journal; @@ -201,7 +201,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private static CommandStore.Factory factory(SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck, Journal journal) { - return (id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, scheduler) -> new DelayedCommandStore(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, executor, isLoadedCheck, journal, scheduler); + return (id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, executor, isLoadedCheck, journal); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 27b40ec1..81416540 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -18,8 +18,10 @@ package accord.impl.list; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -30,6 +32,7 @@ import java.util.stream.Collectors; import accord.api.DataStore; import accord.api.Key; +import accord.api.Scheduler; import accord.coordinate.CoordinateSyncPoint; import accord.coordinate.ExecuteSyncPoint.SyncPointErased; import accord.coordinate.Invalidated; @@ -54,9 +57,12 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.topology.Topologies; import accord.topology.Topology; +import accord.utils.Invariants; +import accord.utils.RandomSource; import accord.utils.Timestamped; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import org.agrona.collections.Int2ObjectHashMap; import org.agrona.collections.LongArrayList; @@ -136,6 +142,9 @@ public class ListStore implements DataStore static final Timestamped<int[]> EMPTY = new Timestamped<>(Timestamp.NONE, new int[0], Arrays::toString); final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>(); + final Scheduler scheduler; + final RandomSource random; + private final List<ChangeAt> addedAts = new ArrayList<>(); private final List<ChangeAt> removedAts = new ArrayList<>(); private final List<PurgeAt> purgedAts = new ArrayList<>(); @@ -148,38 +157,82 @@ public class ListStore implements DataStore // when out of order epochs are detected, this holds the callbacks to try again private final List<Runnable> onRemovalDone = new ArrayList<>(); + private static final class Snapshot + { + private final NavigableMap<RoutableKey, Timestamped<int[]>> data; + private final List<ChangeAt> addedAts; + private final List<ChangeAt> removedAts; + private final List<PurgeAt> purgedAts; + private final List<FetchComplete> fetchCompletes; + private final LongArrayList pendingRemoves; + + private Snapshot(NavigableMap<RoutableKey, Timestamped<int[]>> data, List<ChangeAt> addedAts, List<ChangeAt> removedAts, List<PurgeAt> purgedAts, List<FetchComplete> fetchCompletes, LongArrayList pendingRemoves) + { + this.data = new TreeMap<>(data); + this.addedAts = new ArrayList<>(addedAts); + this.removedAts = new ArrayList<>(removedAts); + this.purgedAts = new ArrayList<>(purgedAts); + this.fetchCompletes = new ArrayList<>(fetchCompletes); + this.pendingRemoves = new LongArrayList(); + this.pendingRemoves.addAll(pendingRemoves); + } + } + + private static final class PendingSnapshot + { + final long delay; + final Runnable runnable; + + private PendingSnapshot(long delay, Runnable runnable) + { + this.delay = delay; + this.runnable = runnable; + } + } + + private Snapshot snapshot; + private final Deque<PendingSnapshot> pendingSnapshots = new ArrayDeque<>(); + private long pendingDelay = 0; - private final NavigableMap<RoutableKey, Timestamped<int[]>> snapshot = new TreeMap<>(); - private final List<ChangeAt> addedAtSnapshot = new ArrayList<>(); - private final List<ChangeAt> removedAtSnapshot = new ArrayList<>(); - private final List<PurgeAt> purgedAtSnapshot = new ArrayList<>(); - private final List<FetchComplete> fetchCompleteSnapshot = new ArrayList<>(); - private final LongArrayList pendingRemovesSnapshot = new LongArrayList(); + public AsyncResult<Void> snapshot() + { + Snapshot snapshot = new Snapshot(data, addedAts, removedAts, purgedAts, fetchCompletes, pendingRemoves); + AsyncResult.Settable<Void> result = new AsyncResults.SettableResult<>(); + long delay = Math.max(1, random.nextBiasedLong(100, 1000, 5000) - pendingDelay); + pendingDelay += delay; + pendingSnapshots.add(new PendingSnapshot(delay, () -> { + this.snapshot = snapshot; + result.setSuccess(null); + })); + + if (pendingSnapshots.size() == 1) + scheduleRunSnapshot(); + return result; + } - public void snapshot() + private void scheduleRunSnapshot() { - snapshot.clear(); - snapshot.putAll(data); - addedAtSnapshot.clear(); - addedAtSnapshot.addAll(addedAts); - removedAtSnapshot.clear(); - removedAtSnapshot.addAll(removedAts); - purgedAtSnapshot.clear(); - purgedAtSnapshot.addAll(purgedAts); - fetchCompleteSnapshot.clear(); - fetchCompleteSnapshot.addAll(fetchCompletes); - pendingRemovesSnapshot.clear(); - pendingRemovesSnapshot.addAll(pendingRemoves); + Invariants.checkState(!pendingSnapshots.isEmpty()); + scheduler.once(() -> { + if (pendingSnapshots.isEmpty()) + return; + + PendingSnapshot pendingSnapshot = pendingSnapshots.pollFirst(); + pendingSnapshot.runnable.run(); + pendingDelay -= pendingSnapshot.delay; + if (!pendingSnapshots.isEmpty()) + scheduleRunSnapshot(); + }, pendingSnapshots.peekFirst().delay, TimeUnit.MILLISECONDS); } public void restoreFromSnapshot() { - data.putAll(snapshot); - addedAts.addAll(addedAtSnapshot); - removedAts.addAll(removedAtSnapshot); - purgedAts.addAll(purgedAtSnapshot); - fetchCompletes.addAll(fetchCompleteSnapshot); - pendingRemoves.addAll(pendingRemovesSnapshot); + data.putAll(snapshot.data); + addedAts.addAll(snapshot.addedAts); + removedAts.addAll(snapshot.removedAts); + purgedAts.addAll(snapshot.purgedAts); + fetchCompletes.addAll(snapshot.fetchCompletes); + pendingRemoves.addAll(snapshot.pendingRemoves); } public void clear() @@ -195,8 +248,10 @@ public class ListStore implements DataStore // adding here to help trace burn test queries public final Node.Id node; - public ListStore(Node.Id node) + public ListStore(Scheduler scheduler, RandomSource random, Node.Id node) { + this.scheduler = scheduler; + this.random = random; this.node = node; } diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java deleted file mode 100644 index cd639f78..00000000 --- a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.local; - -import java.util.Collections; -import java.util.List; -import java.util.function.Consumer; - -import org.junit.jupiter.api.Test; - -import accord.api.Agent; -import accord.impl.PrefixedIntHashKey; -import accord.impl.basic.Cluster; -import accord.impl.basic.DelayedCommandStores.DelayedCommandStore; -import accord.messages.MessageType; -import accord.messages.ReplyContext; -import accord.messages.Request; -import accord.primitives.Deps; -import accord.primitives.FullRoute; -import accord.primitives.Ranges; -import accord.primitives.Routable; -import accord.primitives.SyncPoint; -import accord.primitives.Txn; -import accord.primitives.TxnId; -import accord.topology.Topology; -import accord.topology.TopologyUtils; -import accord.utils.AccordGens; -import accord.utils.Gen; -import accord.utils.Gens; -import accord.utils.Invariants; -import accord.utils.async.AsyncResult; -import accord.utils.async.AsyncResults; -import org.assertj.core.api.Assertions; - -import static accord.local.PreLoadContext.contextFor; -import static accord.utils.Property.qt; - -class BootstrapLocalTxnTest -{ - private static final Gen<Gen<Cleanup>> CLEANUP_DISTRIBUTION = Gens.mixedDistribution(Cleanup.NO, Cleanup.TRUNCATE, Cleanup.TRUNCATE_WITH_OUTCOME, Cleanup.ERASE); - - @Test - public void localOnlyTxnLifeCycle() - { - Ranges ranges = Ranges.ofSortedAndDeoverlapped(PrefixedIntHashKey.ranges(0, 1)); - List<Node.Id> nodes = Collections.singletonList(new Node.Id(42)); - Topology t = TopologyUtils.topology(1, nodes, ranges, 2); - qt().check(rs -> Cluster.run(rs::fork, nodes, t, nodeMap -> new Request() - { - @Override - public void process(Node on, Node.Id from, ReplyContext replyContext) - { - Gen<Cleanup> cleanupGen = CLEANUP_DISTRIBUTION.next(rs); - for (int storeId : on.commandStores().ids()) - { - DelayedCommandStore store = (DelayedCommandStore) on.commandStores().forId(storeId); - // this is a bit redundent but here to make the test easier to maintain. Pre/Post execute we validate each command to make sure everything is fine - // but that logic could be changed and this test has a dependency on validation the command... so to make this dependency explicit - // the test will call the validation logic within the test even though it will be called again in the background... - Consumer<Command> validate = store::validateRead; - TxnId globalSyncId = on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range); - TxnId localSyncId = globalSyncId.as(Txn.Kind.LocalOnly); - TxnId nextGlobalSyncId = on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range).withEpoch(globalSyncId.epoch() + 1); - Ranges ranges = AccordGens.rangesInsideRanges(store.updateRangesForEpoch().currentRanges(), (rs2, r) -> rs2.nextInt(1, 4)).next(rs); - - FullRoute<?> route = on.computeRoute(globalSyncId, ranges); - SyncPoint<Ranges> syncPoint = new SyncPoint<>(globalSyncId, Deps.NONE, ranges, route); - Ranges valid = AccordGens.rangesInsideRanges(ranges, (rs2, r) -> rs2.nextInt(1, 4)).next(rs); - Invariants.checkArgument(syncPoint.keysOrRanges.containsAll(valid)); - Agent agent = store.agent; - store.execute(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safe -> Commands.createBootstrapCompleteMarkerTransaction(safe, localSyncId, valid)) - .flatMap(ignore -> store.execute(contextFor(localSyncId), safe -> validate.accept(safe.get(localSyncId, route.homeKey()).current()))) - .flatMap(ignore -> store.execute(contextFor(localSyncId), safe -> Commands.markBootstrapComplete(safe, localSyncId, ranges))) - .flatMap(ignore -> store.execute(contextFor(localSyncId), safe -> validate.accept(safe.get(localSyncId, route.homeKey()).current()))) - // cleanup txn - .withExecutor(store).flatMap(ignore -> { - Cleanup target = cleanupGen.next(rs); - if (target == Cleanup.NO) - return AsyncResults.success(Cleanup.NO); - AsyncResult<?> result = store.mergeAndUpdateRedundantBefore(RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, TxnId.NONE), nextGlobalSyncId, ranges); - switch (target) - { - case ERASE: - result = result.flatMap(ignored -> store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, nextGlobalSyncId, nextGlobalSyncId)).beginAsResult()).beginAsResult(); - break; - case TRUNCATE: - result = result.flatMap(ignored -> store.mergeAndUpdateDurableBefore(DurableBefore.create(ranges, nextGlobalSyncId, globalSyncId)).beginAsResult()).beginAsResult(); - break; - case TRUNCATE_WITH_OUTCOME: - case INVALIDATE: - // no update to DurableBefore = TRUNCATE_WITH_OUTCOME - break; - default: - throw new UnsupportedOperationException(target.name()); - } - return result.map(ignored -> target); - }) - // validateRead is called implicitly _on command completion_ - .flatMap(target -> store.execute(contextFor(localSyncId), safe -> { - SafeCommand cmd = safe.get(localSyncId, route.homeKey()); - Command current = cmd.current(); - Assertions.assertThat(current.saveStatus()).isEqualTo(target == Cleanup.NO ? SaveStatus.Applied : target.appliesIfNot); - })) - .begin(on.agent()); - } - } - - @Override - public MessageType type() - { - return null; - } - })); - } -} \ No newline at end of file diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 608b2add..6b7c0b09 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -909,11 +909,7 @@ public class CommandsForKeyTest null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), - new EpochUpdateHolder(), - (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID, - (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID, - (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID, - (ignored0, ignored1) -> AsyncResults.SUCCESS_VOID); + new EpochUpdateHolder()); this.pruneInterval = pruneInterval; this.pruneHlcDelta = pruneHlcDelta; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org