This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit a271897790aa3816c3dea2125b1e374b091bc090 Author: David Capwell <[email protected]> AuthorDate: Thu Oct 17 13:58:25 2024 -0700 Key transaction recovery should witness range transactions patch by Benedict; reviewed by David for CASSANDRA-20105 --- .../src/main/java/accord/impl/CommandsSummary.java | 7 +++-- .../java/accord/impl/InMemoryCommandStore.java | 4 +-- .../src/main/java/accord/local/KeyHistory.java | 31 +++++++++++++++++++-- .../main/java/accord/local/cfk/CommandsForKey.java | 7 +++-- .../java/accord/utils/CheckpointIntervalArray.java | 31 +++++++++++++++++++++ .../utils/CheckpointIntervalArrayBuilder.java | 3 ++ .../java/accord/utils/SearchableRangeList.java | 32 ---------------------- .../accord/utils/SearchableRangeListBuilder.java | 18 ++++++++++++ 8 files changed, 93 insertions(+), 40 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/CommandsSummary.java b/accord-core/src/main/java/accord/impl/CommandsSummary.java index 7793c6e5..acb8bf97 100644 --- a/accord-core/src/main/java/accord/impl/CommandsSummary.java +++ b/accord-core/src/main/java/accord/impl/CommandsSummary.java @@ -22,20 +22,23 @@ import accord.local.SafeCommandStore.CommandFunction; import accord.local.SafeCommandStore.TestDep; import accord.local.SafeCommandStore.TestStartedAt; import accord.local.SafeCommandStore.TestStatus; +import accord.primitives.Routables; import accord.primitives.Timestamp; import accord.primitives.Txn.Kind.Kinds; import accord.primitives.TxnId; public interface CommandsSummary { - <P1, T> T mapReduceFull(TxnId testTxnId, + <P1, T> T mapReduceFull(Routables<?> keysOrRanges, + TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T initialValue); - <P1, T> T mapReduceActive(Timestamp startedBefore, + <P1, T> T mapReduceActive(Routables<?> keysOrRanges, + Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T initialValue); } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 69e547c5..eb827e5b 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -862,7 +862,7 @@ public abstract class InMemoryCommandStore extends CommandStore public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T accumulate) { accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> { - return commands.mapReduceActive(startedBefore, testKind, map, p1, prev); + return commands.mapReduceActive(keysOrRanges, startedBefore, testKind, map, p1, prev); }, accumulate); return mapReduceRangesInternal(keysOrRanges, startedBefore, null, testKind, STARTED_BEFORE, ANY_DEPS, ANY_STATUS, map, p1, accumulate); @@ -873,7 +873,7 @@ public abstract class InMemoryCommandStore extends CommandStore public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate) { accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> { - return commands.mapReduceFull(testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev); + return commands.mapReduceFull(keysOrRanges, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev); }, accumulate); return mapReduceRangesInternal(keysOrRanges, testTxnId, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, accumulate); diff --git a/accord-core/src/main/java/accord/local/KeyHistory.java b/accord-core/src/main/java/accord/local/KeyHistory.java index a3b46dc5..1e104802 100644 --- a/accord-core/src/main/java/accord/local/KeyHistory.java +++ b/accord-core/src/main/java/accord/local/KeyHistory.java @@ -24,6 +24,7 @@ package accord.local; */ public enum KeyHistory { + NONE, // TODO (required): deprecate TIMESTAMPS, @@ -48,7 +49,33 @@ public enum KeyHistory /** * Load recovery information for all keys into memory before processing the command. */ - RECOVER, + RECOVER; + + public boolean satisfiesIfPresent(KeyHistory that) + { + return satisfies(that, ASYNC); + } + + public boolean satisfies(KeyHistory that) + { + return satisfies(that, SYNC); + } + + private boolean satisfies(KeyHistory that, KeyHistory ifSyncRequireAtLeast) + { + switch (that) + { + default: throw new AssertionError("Unhandled KeyHistory: " + that); + case NONE: + return true; + case RECOVER: + case TIMESTAMPS: + return this == that; + case ASYNC: + case INCR: + case SYNC: + return this.compareTo(ifSyncRequireAtLeast) >= 0; + } + } - NONE } diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index 428d2ca0..fa4162ee 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -40,6 +40,7 @@ import accord.local.SafeCommandStore.CommandFunction; import accord.local.SafeCommandStore.TestDep; import accord.local.SafeCommandStore.TestStartedAt; import accord.local.SafeCommandStore.TestStatus; +import accord.primitives.Routables; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.local.cfk.PostProcess.NotifyUnmanagedResult; @@ -1052,7 +1053,8 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm * commands that do not know any deps will be ignored, as will any with an executeAt prior to the txnId. * <p> */ - public <P1, T> T mapReduceFull(TxnId testTxnId, + public <P1, T> T mapReduceFull(Routables<?> keysOrRanges, + TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, @@ -1141,7 +1143,8 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm return initialValue; } - public <P1, T> T mapReduceActive(Timestamp startedBefore, + public <P1, T> T mapReduceActive(Routables<?> keysOrRanges, + Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T initialValue) { diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java index d5a60b71..9661ebfc 100644 --- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java @@ -24,6 +24,7 @@ import accord.utils.CheckpointIntervalArrayBuilder.Accessor; import net.nicoulaj.compilecommand.annotations.Inline; import static accord.utils.SortedArrays.Search.CEIL; +import static accord.utils.SortedArrays.Search.FLOOR; public class CheckpointIntervalArray<Ranges, Range, Key> { @@ -216,4 +217,34 @@ public class CheckpointIntervalArray<Ranges, Range, Key> forEachRange.accept(p1, p2, p3, p4, start, end); return end; } + + @Inline + public <P1, P2, P3, P4> int forEachKey(Key key, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) + { + if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges)) + return minIndex; + + int end = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), key, (a, b) -> -accessor.compareStartTo(b, a), FLOOR); + if (end < 0) end = -1 - end; + if (end <= minIndex) return minIndex; + + int floor = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), key, (a, b) -> -accessor.compareStartTo(b, a), CEIL); + int start = floor; + if (floor < 0) + { + // if there's no precise match on start, step backwards; + // if this range does not overlap us, step forwards again for start + // but retain the floor index for performing scan and checkpoint searches from + // as this contains all ranges that might overlap us (whereas those that end + // after us but before the next range's start would be missed by the next range index) + start = floor = -2 - floor; + if (start < 0) + start = floor = 0; + else if (accessor.compareEndTo(accessor.get(ranges, start), key) < 0) + ++start; + } + + int bound = accessor.endInclusive(ranges) ? -1 : 0; + return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); + } } diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java index 95e1d86e..c50c0eb4 100644 --- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java @@ -76,6 +76,9 @@ public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> RoutingKey end(Ranges ranges, int index); RoutingKey end(Range range); Comparator<RoutingKey> keyComparator(); + int compareEndTo(Range range, RoutingKey key); + int compareStartTo(Range range, RoutingKey key); + boolean endInclusive(Ranges ranges); int binarySearch(Ranges ranges, int from, int to, RoutingKey find, AsymmetricComparator<RoutingKey, Range> comparator, SortedArrays.Search op); } diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeList.java b/accord-core/src/main/java/accord/utils/SearchableRangeList.java index 5e677e82..c0741810 100644 --- a/accord-core/src/main/java/accord/utils/SearchableRangeList.java +++ b/accord-core/src/main/java/accord/utils/SearchableRangeList.java @@ -22,11 +22,9 @@ import accord.primitives.Range; import accord.primitives.RoutableKey; import accord.utils.CheckpointIntervalArrayBuilder.Links; import accord.utils.CheckpointIntervalArrayBuilder.Strategy; -import net.nicoulaj.compilecommand.annotations.Inline; import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS; import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE; -import static accord.utils.SortedArrays.Search.*; /** * Based on CINTIA, the Checkpoint INTerval Array @@ -85,36 +83,6 @@ public class SearchableRangeList extends CheckpointIntervalArray<Range[], Range, super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds, headers, checkpointLists, maxScanAndCheckpointMatches); } - @Inline - public <P1, P2, P3, P4> int forEachKey(RoutableKey key, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) - { - if (ranges.length == 0 || minIndex == ranges.length) - return minIndex; - - int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length, key, (a, b) -> -b.compareStartTo(a), FLOOR); - if (end < 0) end = -1 - end; - if (end <= minIndex) return minIndex; - - int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length, key, (a, b) -> -b.compareStartTo(a), CEIL); - int start = floor; - if (floor < 0) - { - // if there's no precise match on start, step backwards; - // if this range does not overlap us, step forwards again for start - // but retain the floor index for performing scan and checkpoint searches from - // as this contains all ranges that might overlap us (whereas those that end - // after us but before the next range's start would be missed by the next range index) - start = floor = -2 - floor; - if (start < 0) - start = floor = 0; - else if (ranges[start].compareEndTo(key) < 0) - ++start; - } - - int bound = ranges[0].endInclusive() ? -1 : 0; - return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); - } - public static SearchableRangeList build(Range[] ranges) { if (ranges.length == 0) diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java index 546ee16a..73d6dc77 100644 --- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java +++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java @@ -74,6 +74,24 @@ public class SearchableRangeListBuilder extends CheckpointIntervalArrayBuilder<R return Comparator.naturalOrder(); } + @Override + public int compareEndTo(Range range, RoutableKey key) + { + return range.compareEndTo(key); + } + + @Override + public int compareStartTo(Range range, RoutableKey key) + { + return range.compareStartTo(key); + } + + @Override + public boolean endInclusive(Range[] ranges) + { + return ranges[0].endInclusive(); + } + @Override public int binarySearch(Range[] ranges, int from, int to, RoutableKey find, AsymmetricComparator<RoutableKey, Range> comparator, SortedArrays.Search op) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
