This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d6157fc Fix:  - Replay should notify listeners  - TopologyManager can 
report the actually updated epochs for use by callers (TODO use this to filter 
redundant notifications)  - No-op read validation
0d6157fc is described below

commit 0d6157fc33dd16f1768030e66205e72fc1d7e9ac
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Jul 10 10:39:37 2025 +0100

    Fix:
     - Replay should notify listeners
     - TopologyManager can report the actually updated epochs for use by 
callers (TODO use this to filter redundant notifications)
     - No-op read validation
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20754
---
 accord-core/src/main/java/accord/api/Data.java     |  6 ++++
 .../src/main/java/accord/impl/AbstractLoader.java  | 36 ++++++++++++++--------
 .../main/java/accord/topology/TopologyManager.java | 19 +++++++-----
 3 files changed, 41 insertions(+), 20 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Data.java 
b/accord-core/src/main/java/accord/api/Data.java
index c1b680e3..7322185f 100644
--- a/accord-core/src/main/java/accord/api/Data.java
+++ b/accord-core/src/main/java/accord/api/Data.java
@@ -42,6 +42,12 @@ public interface Data
         {
             return this;
         }
+
+        @Override
+        public boolean validateReply(TxnId txnId, Timestamp executeAt, boolean 
futureReadPossible)
+        {
+            return true;
+        }
     };
 
     /**
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java 
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index 3e0828d7..152f67c8 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -27,8 +27,13 @@ import accord.local.SafeCommandStore;
 import accord.primitives.Participants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 
+import static accord.primitives.SaveStatus.Applying;
 import static accord.primitives.SaveStatus.PreApplied;
+import static accord.primitives.SaveStatus.TruncatedApply;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+import static accord.primitives.Status.Applied;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
 import static accord.primitives.Txn.Kind.Write;
@@ -39,27 +44,32 @@ public abstract class AbstractLoader implements 
Journal.Loader
     {
         SafeCommand safeCommand = safeStore.unsafeGet(txnId);
         Command command = safeCommand.current();
-        if (command.is(Stable) || command.saveStatus() == PreApplied)
+        if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && 
command.saveStatus().compareTo(PreApplied) <= 0)
         {
             if (Commands.maybeExecute(safeStore, safeCommand, command, true, 
true))
                 return;
         }
-        else if (command.txnId().is(Write) && 
command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && 
!command.hasBeen(Truncated))
+        else if (command.saveStatus().compareTo(Applying) >= 0 && 
command.saveStatus().compareTo(TruncatedApplyWithOutcome) <= 0)
         {
-            CommandStore unsafeStore = safeStore.commandStore();
-            Command.Executed executed = command.asExecuted();
-            Participants<?> executes = executed.participants().stillExecutes();
-            if (!executes.isEmpty())
+            if (command.txnId().is(Write))
             {
-                command.writes()
-                       .apply(safeStore, executes, command.partialTxn())
-                       .invoke(() -> unsafeStore.build(txnId, ss -> {
-                           Commands.postApply(ss, txnId, -1, true);
-                       }))
-                       .begin(safeStore.agent());
-                return;
+                CommandStore unsafeStore = safeStore.commandStore();
+                Command.Executed executed = command.asExecuted();
+                Participants<?> executes = 
executed.participants().stillExecutes();
+                if (!executes.isEmpty())
+                {
+                    command.writes()
+                           .apply(safeStore, executes, command.partialTxn())
+                           .invoke(() -> unsafeStore.build(txnId, ss -> {
+                               Commands.postApply(ss, txnId, -1, true);
+                           }))
+                           .begin(safeStore.agent());
+                    return;
+                }
             }
+            else Invariants.expect(command.hasBeen(Applied));
         }
         safeCommand.update(safeStore, command, true);
+        safeStore.notifyListeners(safeCommand, null);
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 521c8a4b..7708c9cd 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -419,7 +419,7 @@ public class TopologyManager
          * Mark the epoch as "closed" for the provided ranges; this means that 
no new transactions
          * that intersect with this range may be proposed in the epoch (they 
will be rejected).
          */
-        public void epochClosed(Ranges ranges, long epoch)
+        public Ranges epochClosed(Ranges ranges, long epoch)
         {
             Invariants.requireArgument(epoch > 0);
             int i;
@@ -437,18 +437,20 @@ public class TopologyManager
             if (i == -1)
             {
                 Invariants.require(epoch < minEpoch(), "Could not find epoch 
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
-                return; // notification came for an already truncated epoch
+                return Ranges.EMPTY; // notification came for an already 
truncated epoch
             }
 
+            Ranges report = ranges = epochs[i++].recordClosed(ranges);
             while (!ranges.isEmpty() && i < epochs.length)
                 ranges = epochs[i++].recordClosed(ranges);
+            return report;
         }
 
         /**
          * Mark the epoch as "retired" for the provided ranges; this means 
that all transactions that can be
          * proposed for this epoch have now been executed globally.
          */
-        public void epochRetired(Ranges ranges, long epoch)
+        public Ranges epochRetired(Ranges ranges, long epoch)
         {
             Invariants.requireArgument(epoch > 0);
             int retiredIdx;
@@ -462,11 +464,14 @@ public class TopologyManager
             {
                 retiredIdx = indexOf(epoch);
                 if (retiredIdx < 0)
-                    return;
+                    return Ranges.EMPTY;
             }
 
-            for (int i = retiredIdx; !ranges.isEmpty() && i < epochs.length; 
i++)
-                ranges = epochs[i].recordRetired(ranges);
+            int i = retiredIdx;
+            Ranges report = ranges = epochs[i++].recordRetired(ranges);
+            while (!ranges.isEmpty() && i < epochs.length)
+                ranges = epochs[i++].recordRetired(ranges);
+            return report;
         }
 
         private Notifications pending(long epoch)
@@ -1572,7 +1577,7 @@ public class TopologyManager
         }
     }
 
-    public interface Collectors<C, K, T>
+    interface Collectors<C, K, T>
     {
         C allocate(int size);
         C update(C collector, EpochState epoch, K select, boolean 
permitMissing);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to