This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch ex-sp-pl in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 65caf7fc31bc1559457cff63ec2b3f6089930f26 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Sun Sep 29 15:53:16 2024 +0100 fix replay --- accord-core/src/main/java/accord/api/Write.java | 2 ++ .../java/accord/impl/InMemoryCommandStore.java | 23 ++++++++++++++++++++-- .../src/main/java/accord/local/Cleanup.java | 1 + .../src/main/java/accord/local/Commands.java | 17 +--------------- .../src/main/java/accord/primitives/Writes.java | 11 +++++++++++ .../src/test/java/accord/impl/list/ListWrite.java | 12 +++++++++++ 6 files changed, 48 insertions(+), 18 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java index 7f3fad7d..e6052fdb 100644 --- a/accord-core/src/main/java/accord/api/Write.java +++ b/accord-core/src/main/java/accord/api/Write.java @@ -33,4 +33,6 @@ import accord.utils.async.AsyncChain; public interface Write { AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt, DataStore store, PartialTxn txn); + // TODO (expected): this is used only for testing today; hide it somewhere outside of the public API + default void applyUnsafe(Seekable key, SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt, DataStore store, PartialTxn txn) { throw new UnsupportedOperationException(); } } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 89fe806f..11eaf5f9 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -1399,15 +1399,34 @@ public abstract class InMemoryCommandStore extends CommandStore SafeCommand safeCommand = safeStore.unsafeGet(txnId); Command local = safeCommand.current(); if (local.is(Stable) || local.is(PreApplied)) + { Commands.maybeExecute(safeStore, safeCommand, local, true, true); - else if (local.saveStatus().compareTo(Applying) >= 0 && !local.is(Invalidated) && !local.is(Truncated)) - Commands.applyWrites(safeStore, context, local).begin(agent); + } + else if (local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated)) + { + unsafeApplyWrites(safeStore, context, safeCommand, local); + } return null; }); } }; } + public static void unsafeApplyWrites(SafeCommandStore safeStore, PreLoadContext context, SafeCommand safeCommand, Command command) + { + CommandStore unsafeStore = safeStore.commandStore(); + Command.Executed executed = command.asExecuted(); + Participants<?> executes = executed.participants().executes(safeStore, command.txnId(), command.executeAt()); + if (!executes.isEmpty()) + { + command.writes().applyUnsafe(safeStore, Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn()); + safeCommand.applied(safeStore); + safeStore.notifyListeners(safeCommand, command); + } + } + + + @VisibleForTesting public void load(Deps loading) { diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index 63ef7664..78dc341c 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -95,6 +95,7 @@ public enum Cleanup return shouldCleanupInternal(txnId, status, durability, participants, redundantBefore, durableBefore).filter(status); } + // TODO (required): simulate compaction of log records in burn test @VisibleForImplementation public static Cleanup shouldCleanupPartial(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) { diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 637d791f..a9a2805c 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -484,7 +484,7 @@ public class Commands * for transactions below a SyncPoint where we adopted the range, and that will be obtained from peers, * and therefore we do not want to execute locally */ - private static Ranges applyRanges(SafeCommandStore safeStore, Timestamp executeAt) + public static Ranges applyRanges(SafeCommandStore safeStore, Timestamp executeAt) { return safeStore.ranges().allAt(executeAt.epoch()); } @@ -517,21 +517,6 @@ public class Commands })); } - public static AsyncChain<Void> applyWrites(SafeCommandStore safeStore, PreLoadContext context, Command command) - { - CommandStore unsafeStore = safeStore.commandStore(); - Command.Executed executed = command.asExecuted(); - Participants<?> executes = executed.participants().executes(safeStore, command.txnId(), command.executeAt()); - if (!executes.isEmpty()) - return command.writes().apply(safeStore, applyRanges(safeStore, command.executeAt()), command.partialTxn()) - .flatMap(unused -> unsafeStore.submit(context, ss -> { - postApply(ss, command.txnId()); - return null; - })); - else - return AsyncChains.success(null); - } - private static void apply(SafeCommandStore safeStore, Command.Executed command, Participants<?> executes) { CommandStore unsafeStore = safeStore.commandStore(); diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java index 05bb56ad..27a531a5 100644 --- a/accord-core/src/main/java/accord/primitives/Writes.java +++ b/accord-core/src/main/java/accord/primitives/Writes.java @@ -85,6 +85,17 @@ public class Writes return AsyncChains.reduce(futures, (l, r) -> null); } + public void applyUnsafe(SafeCommandStore safeStore, Ranges ranges, PartialTxn txn) + { + if (write == null || ranges.isEmpty()) + return; + + Routables.foldl(keys, ranges, (key, obj, index) -> { + write.applyUnsafe(key, safeStore, txnId, executeAt, safeStore.dataStore(), txn); + return obj; + }, null); + } + @Override public String toString() { diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java index 764d0ce0..aced2e04 100644 --- a/accord-core/src/test/java/accord/impl/list/ListWrite.java +++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java @@ -67,6 +67,18 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write }); } + public void applyUnsafe(Seekable key, SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt, DataStore store, PartialTxn txn) + { + ListStore s = (ListStore) store; + if (!containsKey(key)) + return; + + TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, ?>) safeStore, ((Key)key).toUnseekable(), txnId, executeAt, true); + logger.trace("unsafe applying WRITE on {} at {} key:{}", s.node, executeAt, key); + int[] data = get(key); + s.data.merge((Key)key, new Timestamped<>(executeAt, data, Arrays::toString), ListStore::merge); + } + @Override public boolean equals(Object o) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org