This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch ex-sp-pl
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 65caf7fc31bc1559457cff63ec2b3f6089930f26
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Sun Sep 29 15:53:16 2024 +0100

    fix replay
---
 accord-core/src/main/java/accord/api/Write.java    |  2 ++
 .../java/accord/impl/InMemoryCommandStore.java     | 23 ++++++++++++++++++++--
 .../src/main/java/accord/local/Cleanup.java        |  1 +
 .../src/main/java/accord/local/Commands.java       | 17 +---------------
 .../src/main/java/accord/primitives/Writes.java    | 11 +++++++++++
 .../src/test/java/accord/impl/list/ListWrite.java  | 12 +++++++++++
 6 files changed, 48 insertions(+), 18 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Write.java 
b/accord-core/src/main/java/accord/api/Write.java
index 7f3fad7d..e6052fdb 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -33,4 +33,6 @@ import accord.utils.async.AsyncChain;
 public interface Write
 {
     AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, TxnId 
txnId, Timestamp executeAt, DataStore store, PartialTxn txn);
+    // TODO (expected): this is used only for testing today; hide it somewhere 
outside of the public API
+    default void applyUnsafe(Seekable key, SafeCommandStore safeStore, TxnId 
txnId, Timestamp executeAt, DataStore store, PartialTxn txn) { throw new 
UnsupportedOperationException(); }
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 89fe806f..11eaf5f9 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -1399,15 +1399,34 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                                      SafeCommand safeCommand = 
safeStore.unsafeGet(txnId);
                                      Command local = safeCommand.current();
                                      if (local.is(Stable) || 
local.is(PreApplied))
+                                     {
                                          Commands.maybeExecute(safeStore, 
safeCommand, local, true, true);
-                                     else if 
(local.saveStatus().compareTo(Applying) >= 0 && !local.is(Invalidated) && 
!local.is(Truncated))
-                                         Commands.applyWrites(safeStore, 
context, local).begin(agent);
+                                     }
+                                     else if 
(local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated))
+                                     {
+                                         unsafeApplyWrites(safeStore, context, 
safeCommand, local);
+                                     }
                                      return null;
                                  });
             }
         };
     }
 
+    public static void unsafeApplyWrites(SafeCommandStore safeStore, 
PreLoadContext context, SafeCommand safeCommand, Command command)
+    {
+        CommandStore unsafeStore = safeStore.commandStore();
+        Command.Executed executed = command.asExecuted();
+        Participants<?> executes = executed.participants().executes(safeStore, 
command.txnId(), command.executeAt());
+        if (!executes.isEmpty())
+        {
+            command.writes().applyUnsafe(safeStore, 
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
+            safeCommand.applied(safeStore);
+            safeStore.notifyListeners(safeCommand, command);
+        }
+    }
+
+
+
     @VisibleForTesting
     public void load(Deps loading)
     {
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index 63ef7664..78dc341c 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -95,6 +95,7 @@ public enum Cleanup
         return shouldCleanupInternal(txnId, status, durability, participants, 
redundantBefore, durableBefore).filter(status);
     }
 
+    // TODO (required): simulate compaction of log records in burn test
     @VisibleForImplementation
     public static Cleanup shouldCleanupPartial(TxnId txnId, SaveStatus status, 
Durability durability, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 637d791f..a9a2805c 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -484,7 +484,7 @@ public class Commands
      * for transactions below a SyncPoint where we adopted the range, and that 
will be obtained from peers,
      * and therefore we do not want to execute locally
      */
-    private static Ranges applyRanges(SafeCommandStore safeStore, Timestamp 
executeAt)
+    public static Ranges applyRanges(SafeCommandStore safeStore, Timestamp 
executeAt)
     {
         return safeStore.ranges().allAt(executeAt.epoch());
     }
@@ -517,21 +517,6 @@ public class Commands
                }));
     }
 
-    public static AsyncChain<Void> applyWrites(SafeCommandStore safeStore, 
PreLoadContext context, Command command)
-    {
-        CommandStore unsafeStore = safeStore.commandStore();
-        Command.Executed executed = command.asExecuted();
-        Participants<?> executes = executed.participants().executes(safeStore, 
command.txnId(), command.executeAt());
-        if (!executes.isEmpty())
-            return command.writes().apply(safeStore, applyRanges(safeStore, 
command.executeAt()), command.partialTxn())
-                          .flatMap(unused -> unsafeStore.submit(context, ss -> 
{
-                              postApply(ss, command.txnId());
-                              return null;
-                          }));
-        else
-            return AsyncChains.success(null);
-    }
-
     private static void apply(SafeCommandStore safeStore, Command.Executed 
command, Participants<?> executes)
     {
         CommandStore unsafeStore = safeStore.commandStore();
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java 
b/accord-core/src/main/java/accord/primitives/Writes.java
index 05bb56ad..27a531a5 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -85,6 +85,17 @@ public class Writes
         return AsyncChains.reduce(futures, (l, r) -> null);
     }
 
+    public void applyUnsafe(SafeCommandStore safeStore, Ranges ranges, 
PartialTxn txn)
+    {
+        if (write == null || ranges.isEmpty())
+            return;
+
+        Routables.foldl(keys, ranges, (key, obj, index) -> {
+            write.applyUnsafe(key, safeStore, txnId, executeAt, 
safeStore.dataStore(), txn);
+            return obj;
+        }, null);
+    }
+
     @Override
     public String toString()
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java 
b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 764d0ce0..aced2e04 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -67,6 +67,18 @@ public class ListWrite extends TreeMap<Key, int[]> 
implements Write
         });
     }
 
+    public void applyUnsafe(Seekable key, SafeCommandStore safeStore, TxnId 
txnId, Timestamp executeAt, DataStore store, PartialTxn txn)
+    {
+        ListStore s = (ListStore) store;
+        if (!containsKey(key))
+            return;
+
+        
TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, 
?>) safeStore, ((Key)key).toUnseekable(), txnId, executeAt, true);
+        logger.trace("unsafe applying WRITE on {} at {} key:{}", s.node, 
executeAt, key);
+        int[] data = get(key);
+        s.data.merge((Key)key, new Timestamped<>(executeAt, data, 
Arrays::toString), ListStore::merge);
+    }
+
     @Override
     public boolean equals(Object o)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to