This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit fe2fcc41aee0b26c9010aa077c1d8cd2f9e5bb37
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Nov 11 17:07:45 2024 +0000

    wip: key recovery should witness range transactions (and range transactions 
should witness only precisely intersecting txns))
---
 accord-core/src/main/java/accord/impl/CommandsSummary.java     |  7 +++++--
 .../src/main/java/accord/impl/InMemoryCommandStore.java        |  4 ++--
 accord-core/src/main/java/accord/local/RedundantBefore.java    | 10 ++++++++++
 accord-core/src/main/java/accord/local/cfk/CommandsForKey.java |  9 +++++++--
 4 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/CommandsSummary.java 
b/accord-core/src/main/java/accord/impl/CommandsSummary.java
index 7793c6e5..acb8bf97 100644
--- a/accord-core/src/main/java/accord/impl/CommandsSummary.java
+++ b/accord-core/src/main/java/accord/impl/CommandsSummary.java
@@ -22,20 +22,23 @@ import accord.local.SafeCommandStore.CommandFunction;
 import accord.local.SafeCommandStore.TestDep;
 import accord.local.SafeCommandStore.TestStartedAt;
 import accord.local.SafeCommandStore.TestStatus;
+import accord.primitives.Routables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn.Kind.Kinds;
 import accord.primitives.TxnId;
 
 public interface CommandsSummary
 {
-    <P1, T> T mapReduceFull(TxnId testTxnId,
+    <P1, T> T mapReduceFull(Routables<?> keysOrRanges,
+                            TxnId testTxnId,
                             Kinds testKind,
                             TestStartedAt testStartedAt,
                             TestDep testDep,
                             TestStatus testStatus,
                             CommandFunction<P1, T, T> map, P1 p1, T 
initialValue);
 
-    <P1, T> T mapReduceActive(Timestamp startedBefore,
+    <P1, T> T mapReduceActive(Routables<?> keysOrRanges,
+                              Timestamp startedBefore,
                               Kinds testKind,
                               CommandFunction<P1, T, T> map, P1 p1, T 
initialValue);
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 69e547c5..eb827e5b 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -862,7 +862,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, 
Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, 
T accumulate)
         {
             accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
-                return commands.mapReduceActive(startedBefore, testKind, map, 
p1, prev);
+                return commands.mapReduceActive(keysOrRanges, startedBefore, 
testKind, map, p1, prev);
             }, accumulate);
 
             return mapReduceRangesInternal(keysOrRanges, startedBefore, null, 
testKind, STARTED_BEFORE, ANY_DEPS, ANY_STATUS, map, p1, accumulate);
@@ -873,7 +873,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId 
testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, 
TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)
         {
             accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
-                return commands.mapReduceFull(testTxnId, testKind, 
testStartedAt, testDep, testStatus, map, p1, prev);
+                return commands.mapReduceFull(keysOrRanges, testTxnId, 
testKind, testStartedAt, testDep, testStatus, map, p1, prev);
             }, accumulate);
 
             return mapReduceRangesInternal(keysOrRanges, testTxnId, testTxnId, 
testKind, testStartedAt, testDep, testStatus, map, p1, accumulate);
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 4aef63a2..7ba76cd6 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -35,6 +35,7 @@ import accord.primitives.Participants;
 import accord.primitives.Range;
 import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
 import accord.primitives.Routables;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
@@ -627,6 +628,15 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, Entry::min, 
null, get, ignore -> false));
     }
 
+    public TxnId get(RoutableKey participant, Function<Entry, TxnId> get, 
TxnId ifNull)
+    {
+        Entry entry = get(participant);
+        if (entry == null)
+            return ifNull;
+        TxnId result = get.apply(entry);
+        return result == null ? ifNull : result;
+    }
+
     public TxnId max(Routables<?> participants, Function<Entry, TxnId> get)
     {
         return foldl(participants, Entry::max, TxnId.NONE, get, ignore -> 
false);
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 428d2ca0..60fc08e5 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -40,6 +40,7 @@ import accord.local.SafeCommandStore.CommandFunction;
 import accord.local.SafeCommandStore.TestDep;
 import accord.local.SafeCommandStore.TestStartedAt;
 import accord.local.SafeCommandStore.TestStatus;
+import accord.primitives.Routables;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.local.cfk.PostProcess.NotifyUnmanagedResult;
@@ -1052,13 +1053,15 @@ public class CommandsForKey extends 
CommandsForKeyUpdate implements CommandsSumm
      * commands that do not know any deps will be ignored, as will any with an 
executeAt prior to the txnId.
      * <p>
      */
-    public <P1, T> T mapReduceFull(TxnId testTxnId,
+    public <P1, T> T mapReduceFull(Routables<?> keysOrRanges,
+                                   TxnId testTxnId,
                                    Kinds testKind,
                                    TestStartedAt testStartedAt,
                                    TestDep testDep,
                                    TestStatus testStatus,
                                    CommandFunction<P1, T, T> map, P1 p1, T 
initialValue)
     {
+        Invariants.paranoid(keysOrRanges.contains(key));
         int start, end, loadingIndex = 0;
         // if this is null the TxnId is known in byId
         // otherwise, it must be non-null and represents the transactions (if 
any) that have requested it be loaded due to being pruned
@@ -1141,10 +1144,12 @@ public class CommandsForKey extends 
CommandsForKeyUpdate implements CommandsSumm
         return initialValue;
     }
 
-    public <P1, T> T mapReduceActive(Timestamp startedBefore,
+    public <P1, T> T mapReduceActive(Routables<?> keysOrRanges,
+                                     Timestamp startedBefore,
                                      Kinds testKind,
                                      CommandFunction<P1, T, T> map, P1 p1, T 
initialValue)
     {
+        Invariants.paranoid(keysOrRanges.contains(key));
         TxnId prunedBefore = prunedBefore();
         int end = insertPos(startedBefore);
         Timestamp maxCommittedWriteBefore;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to