This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit fe2fcc41aee0b26c9010aa077c1d8cd2f9e5bb37 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Mon Nov 11 17:07:45 2024 +0000 wip: key recovery should witness range transactions (and range transactions should witness only precisely intersecting txns)) --- accord-core/src/main/java/accord/impl/CommandsSummary.java | 7 +++++-- .../src/main/java/accord/impl/InMemoryCommandStore.java | 4 ++-- accord-core/src/main/java/accord/local/RedundantBefore.java | 10 ++++++++++ accord-core/src/main/java/accord/local/cfk/CommandsForKey.java | 9 +++++++-- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/CommandsSummary.java b/accord-core/src/main/java/accord/impl/CommandsSummary.java index 7793c6e5..acb8bf97 100644 --- a/accord-core/src/main/java/accord/impl/CommandsSummary.java +++ b/accord-core/src/main/java/accord/impl/CommandsSummary.java @@ -22,20 +22,23 @@ import accord.local.SafeCommandStore.CommandFunction; import accord.local.SafeCommandStore.TestDep; import accord.local.SafeCommandStore.TestStartedAt; import accord.local.SafeCommandStore.TestStatus; +import accord.primitives.Routables; import accord.primitives.Timestamp; import accord.primitives.Txn.Kind.Kinds; import accord.primitives.TxnId; public interface CommandsSummary { - <P1, T> T mapReduceFull(TxnId testTxnId, + <P1, T> T mapReduceFull(Routables<?> keysOrRanges, + TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T initialValue); - <P1, T> T mapReduceActive(Timestamp startedBefore, + <P1, T> T mapReduceActive(Routables<?> keysOrRanges, + Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T initialValue); } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 69e547c5..eb827e5b 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -862,7 +862,7 @@ public abstract class InMemoryCommandStore extends CommandStore public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T accumulate) { accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> { - return commands.mapReduceActive(startedBefore, testKind, map, p1, prev); + return commands.mapReduceActive(keysOrRanges, startedBefore, testKind, map, p1, prev); }, accumulate); return mapReduceRangesInternal(keysOrRanges, startedBefore, null, testKind, STARTED_BEFORE, ANY_DEPS, ANY_STATUS, map, p1, accumulate); @@ -873,7 +873,7 @@ public abstract class InMemoryCommandStore extends CommandStore public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate) { accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> { - return commands.mapReduceFull(testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev); + return commands.mapReduceFull(keysOrRanges, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev); }, accumulate); return mapReduceRangesInternal(keysOrRanges, testTxnId, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, accumulate); diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 4aef63a2..7ba76cd6 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -35,6 +35,7 @@ import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.RangeDeps; import accord.primitives.Ranges; +import accord.primitives.RoutableKey; import accord.primitives.Routables; import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; @@ -627,6 +628,15 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, Entry::min, null, get, ignore -> false)); } + public TxnId get(RoutableKey participant, Function<Entry, TxnId> get, TxnId ifNull) + { + Entry entry = get(participant); + if (entry == null) + return ifNull; + TxnId result = get.apply(entry); + return result == null ? ifNull : result; + } + public TxnId max(Routables<?> participants, Function<Entry, TxnId> get) { return foldl(participants, Entry::max, TxnId.NONE, get, ignore -> false); diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index 428d2ca0..60fc08e5 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -40,6 +40,7 @@ import accord.local.SafeCommandStore.CommandFunction; import accord.local.SafeCommandStore.TestDep; import accord.local.SafeCommandStore.TestStartedAt; import accord.local.SafeCommandStore.TestStatus; +import accord.primitives.Routables; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.local.cfk.PostProcess.NotifyUnmanagedResult; @@ -1052,13 +1053,15 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm * commands that do not know any deps will be ignored, as will any with an executeAt prior to the txnId. * <p> */ - public <P1, T> T mapReduceFull(TxnId testTxnId, + public <P1, T> T mapReduceFull(Routables<?> keysOrRanges, + TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T initialValue) { + Invariants.paranoid(keysOrRanges.contains(key)); int start, end, loadingIndex = 0; // if this is null the TxnId is known in byId // otherwise, it must be non-null and represents the transactions (if any) that have requested it be loaded due to being pruned @@ -1141,10 +1144,12 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm return initialValue; } - public <P1, T> T mapReduceActive(Timestamp startedBefore, + public <P1, T> T mapReduceActive(Routables<?> keysOrRanges, + Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T initialValue) { + Invariants.paranoid(keysOrRanges.contains(key)); TxnId prunedBefore = prunedBefore(); int end = insertPos(startedBefore); Timestamp maxCommittedWriteBefore; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
