This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit a271897790aa3816c3dea2125b1e374b091bc090
Author: David Capwell <[email protected]>
AuthorDate: Thu Oct 17 13:58:25 2024 -0700

    Key transaction recovery should witness range transactions
    
    patch by Benedict; reviewed by David for CASSANDRA-20105
---
 .../src/main/java/accord/impl/CommandsSummary.java |  7 +++--
 .../java/accord/impl/InMemoryCommandStore.java     |  4 +--
 .../src/main/java/accord/local/KeyHistory.java     | 31 +++++++++++++++++++--
 .../main/java/accord/local/cfk/CommandsForKey.java |  7 +++--
 .../java/accord/utils/CheckpointIntervalArray.java | 31 +++++++++++++++++++++
 .../utils/CheckpointIntervalArrayBuilder.java      |  3 ++
 .../java/accord/utils/SearchableRangeList.java     | 32 ----------------------
 .../accord/utils/SearchableRangeListBuilder.java   | 18 ++++++++++++
 8 files changed, 93 insertions(+), 40 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/CommandsSummary.java 
b/accord-core/src/main/java/accord/impl/CommandsSummary.java
index 7793c6e5..acb8bf97 100644
--- a/accord-core/src/main/java/accord/impl/CommandsSummary.java
+++ b/accord-core/src/main/java/accord/impl/CommandsSummary.java
@@ -22,20 +22,23 @@ import accord.local.SafeCommandStore.CommandFunction;
 import accord.local.SafeCommandStore.TestDep;
 import accord.local.SafeCommandStore.TestStartedAt;
 import accord.local.SafeCommandStore.TestStatus;
+import accord.primitives.Routables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn.Kind.Kinds;
 import accord.primitives.TxnId;
 
 public interface CommandsSummary
 {
-    <P1, T> T mapReduceFull(TxnId testTxnId,
+    <P1, T> T mapReduceFull(Routables<?> keysOrRanges,
+                            TxnId testTxnId,
                             Kinds testKind,
                             TestStartedAt testStartedAt,
                             TestDep testDep,
                             TestStatus testStatus,
                             CommandFunction<P1, T, T> map, P1 p1, T 
initialValue);
 
-    <P1, T> T mapReduceActive(Timestamp startedBefore,
+    <P1, T> T mapReduceActive(Routables<?> keysOrRanges,
+                              Timestamp startedBefore,
                               Kinds testKind,
                               CommandFunction<P1, T, T> map, P1 p1, T 
initialValue);
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 69e547c5..eb827e5b 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -862,7 +862,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, 
Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, 
T accumulate)
         {
             accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
-                return commands.mapReduceActive(startedBefore, testKind, map, 
p1, prev);
+                return commands.mapReduceActive(keysOrRanges, startedBefore, 
testKind, map, p1, prev);
             }, accumulate);
 
             return mapReduceRangesInternal(keysOrRanges, startedBefore, null, 
testKind, STARTED_BEFORE, ANY_DEPS, ANY_STATUS, map, p1, accumulate);
@@ -873,7 +873,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId 
testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, 
TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)
         {
             accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
-                return commands.mapReduceFull(testTxnId, testKind, 
testStartedAt, testDep, testStatus, map, p1, prev);
+                return commands.mapReduceFull(keysOrRanges, testTxnId, 
testKind, testStartedAt, testDep, testStatus, map, p1, prev);
             }, accumulate);
 
             return mapReduceRangesInternal(keysOrRanges, testTxnId, testTxnId, 
testKind, testStartedAt, testDep, testStatus, map, p1, accumulate);
diff --git a/accord-core/src/main/java/accord/local/KeyHistory.java 
b/accord-core/src/main/java/accord/local/KeyHistory.java
index a3b46dc5..1e104802 100644
--- a/accord-core/src/main/java/accord/local/KeyHistory.java
+++ b/accord-core/src/main/java/accord/local/KeyHistory.java
@@ -24,6 +24,7 @@ package accord.local;
  */
 public enum KeyHistory
 {
+    NONE,
     // TODO (required): deprecate
     TIMESTAMPS,
 
@@ -48,7 +49,33 @@ public enum KeyHistory
     /**
      * Load recovery information for all keys into memory before processing 
the command.
      */
-    RECOVER,
+    RECOVER;
+
+    public boolean satisfiesIfPresent(KeyHistory that)
+    {
+        return satisfies(that, ASYNC);
+    }
+
+    public boolean satisfies(KeyHistory that)
+    {
+        return satisfies(that, SYNC);
+    }
+
+    private boolean satisfies(KeyHistory that, KeyHistory ifSyncRequireAtLeast)
+    {
+        switch (that)
+        {
+            default: throw new AssertionError("Unhandled KeyHistory: " + that);
+            case NONE:
+                return true;
+            case RECOVER:
+            case TIMESTAMPS:
+                return this == that;
+            case ASYNC:
+            case INCR:
+            case SYNC:
+                return this.compareTo(ifSyncRequireAtLeast) >= 0;
+        }
+    }
 
-    NONE
 }
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 428d2ca0..fa4162ee 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -40,6 +40,7 @@ import accord.local.SafeCommandStore.CommandFunction;
 import accord.local.SafeCommandStore.TestDep;
 import accord.local.SafeCommandStore.TestStartedAt;
 import accord.local.SafeCommandStore.TestStatus;
+import accord.primitives.Routables;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.local.cfk.PostProcess.NotifyUnmanagedResult;
@@ -1052,7 +1053,8 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
      * commands that do not know any deps will be ignored, as will any with an 
executeAt prior to the txnId.
      * <p>
      */
-    public <P1, T> T mapReduceFull(TxnId testTxnId,
+    public <P1, T> T mapReduceFull(Routables<?> keysOrRanges,
+                                   TxnId testTxnId,
                                    Kinds testKind,
                                    TestStartedAt testStartedAt,
                                    TestDep testDep,
@@ -1141,7 +1143,8 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
         return initialValue;
     }
 
-    public <P1, T> T mapReduceActive(Timestamp startedBefore,
+    public <P1, T> T mapReduceActive(Routables<?> keysOrRanges,
+                                     Timestamp startedBefore,
                                      Kinds testKind,
                                      CommandFunction<P1, T, T> map, P1 p1, T 
initialValue)
     {
diff --git 
a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java 
b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
index d5a60b71..9661ebfc 100644
--- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
@@ -24,6 +24,7 @@ import accord.utils.CheckpointIntervalArrayBuilder.Accessor;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
 import static accord.utils.SortedArrays.Search.CEIL;
+import static accord.utils.SortedArrays.Search.FLOOR;
 
 public class CheckpointIntervalArray<Ranges, Range, Key>
 {
@@ -216,4 +217,34 @@ public class CheckpointIntervalArray<Ranges, Range, Key>
         forEachRange.accept(p1, p2, p3, p4, start, end);
         return end;
     }
+
+    @Inline
+    public <P1, P2, P3, P4> int forEachKey(Key key, IndexedQuadConsumer<P1, 
P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> 
forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+    {
+        if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges))
+            return minIndex;
+
+        int end = accessor.binarySearch(ranges, minIndex, 
accessor.size(ranges), key, (a, b) -> -accessor.compareStartTo(b, a), FLOOR);
+        if (end < 0) end = -1 - end;
+        if (end <= minIndex) return minIndex;
+
+        int floor = accessor.binarySearch(ranges, minIndex, 
accessor.size(ranges), key, (a, b) -> -accessor.compareStartTo(b, a), CEIL);
+        int start = floor;
+        if (floor < 0)
+        {
+            // if there's no precise match on start, step backwards;
+            // if this range does not overlap us, step forwards again for start
+            // but retain the floor index for performing scan and checkpoint 
searches from
+            // as this contains all ranges that might overlap us (whereas 
those that end
+            // after us but before the next range's start would be missed by 
the next range index)
+            start = floor = -2 - floor;
+            if (start < 0)
+                start = floor = 0;
+            else if (accessor.compareEndTo(accessor.get(ranges, start), key) < 
0)
+                ++start;
+        }
+
+        int bound = accessor.endInclusive(ranges) ? -1 : 0;
+        return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, 
forEachRange, p1, p2, p3, p4, minIndex);
+    }
 }
diff --git 
a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java 
b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
index 95e1d86e..c50c0eb4 100644
--- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
@@ -76,6 +76,9 @@ public class CheckpointIntervalArrayBuilder<Ranges, Range, 
RoutingKey>
         RoutingKey end(Ranges ranges, int index);
         RoutingKey end(Range range);
         Comparator<RoutingKey> keyComparator();
+        int compareEndTo(Range range, RoutingKey key);
+        int compareStartTo(Range range, RoutingKey key);
+        boolean endInclusive(Ranges ranges);
         int binarySearch(Ranges ranges, int from, int to, RoutingKey find, 
AsymmetricComparator<RoutingKey, Range> comparator, SortedArrays.Search op);
     }
 
diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeList.java 
b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
index 5e677e82..c0741810 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeList.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
@@ -22,11 +22,9 @@ import accord.primitives.Range;
 import accord.primitives.RoutableKey;
 import accord.utils.CheckpointIntervalArrayBuilder.Links;
 import accord.utils.CheckpointIntervalArrayBuilder.Strategy;
-import net.nicoulaj.compilecommand.annotations.Inline;
 
 import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS;
 import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE;
-import static accord.utils.SortedArrays.Search.*;
 
 /**
  * Based on CINTIA, the Checkpoint INTerval Array
@@ -85,36 +83,6 @@ public class SearchableRangeList extends 
CheckpointIntervalArray<Range[], Range,
         super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds, 
headers, checkpointLists, maxScanAndCheckpointMatches);
     }
 
-    @Inline
-    public <P1, P2, P3, P4> int forEachKey(RoutableKey key, 
IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, 
IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 
p4, int minIndex)
-    {
-        if (ranges.length == 0 || minIndex == ranges.length)
-            return minIndex;
-
-        int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length, 
key, (a, b) -> -b.compareStartTo(a), FLOOR);
-        if (end < 0) end = -1 - end;
-        if (end <= minIndex) return minIndex;
-
-        int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length, 
key, (a, b) -> -b.compareStartTo(a), CEIL);
-        int start = floor;
-        if (floor < 0)
-        {
-            // if there's no precise match on start, step backwards;
-            // if this range does not overlap us, step forwards again for start
-            // but retain the floor index for performing scan and checkpoint 
searches from
-            // as this contains all ranges that might overlap us (whereas 
those that end
-            // after us but before the next range's start would be missed by 
the next range index)
-            start = floor = -2 - floor;
-            if (start < 0)
-                start = floor = 0;
-            else if (ranges[start].compareEndTo(key) < 0)
-                ++start;
-        }
-
-        int bound = ranges[0].endInclusive() ? -1 : 0;
-        return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, 
forEachRange, p1, p2, p3, p4, minIndex);
-    }
-
     public static SearchableRangeList build(Range[] ranges)
     {
         if (ranges.length == 0)
diff --git 
a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java 
b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
index 546ee16a..73d6dc77 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
@@ -74,6 +74,24 @@ public class SearchableRangeListBuilder extends 
CheckpointIntervalArrayBuilder<R
             return Comparator.naturalOrder();
         }
 
+        @Override
+        public int compareEndTo(Range range, RoutableKey key)
+        {
+            return range.compareEndTo(key);
+        }
+
+        @Override
+        public int compareStartTo(Range range, RoutableKey key)
+        {
+            return range.compareStartTo(key);
+        }
+
+        @Override
+        public boolean endInclusive(Range[] ranges)
+        {
+            return ranges[0].endInclusive();
+        }
+
         @Override
         public int binarySearch(Range[] ranges, int from, int to, RoutableKey 
find, AsymmetricComparator<RoutableKey, Range> comparator, SortedArrays.Search 
op)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to