This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0d6157fc Fix: - Replay should notify listeners - TopologyManager can
report the actually updated epochs for use by callers (TODO use this to filter
redundant notifications) - No-op read validation
0d6157fc is described below
commit 0d6157fc33dd16f1768030e66205e72fc1d7e9ac
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Jul 10 10:39:37 2025 +0100
Fix:
- Replay should notify listeners
- TopologyManager can report the actually updated epochs for use by
callers (TODO use this to filter redundant notifications)
- No-op read validation
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20754
---
accord-core/src/main/java/accord/api/Data.java | 6 ++++
.../src/main/java/accord/impl/AbstractLoader.java | 36 ++++++++++++++--------
.../main/java/accord/topology/TopologyManager.java | 19 +++++++-----
3 files changed, 41 insertions(+), 20 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Data.java
b/accord-core/src/main/java/accord/api/Data.java
index c1b680e3..7322185f 100644
--- a/accord-core/src/main/java/accord/api/Data.java
+++ b/accord-core/src/main/java/accord/api/Data.java
@@ -42,6 +42,12 @@ public interface Data
{
return this;
}
+
+ @Override
+ public boolean validateReply(TxnId txnId, Timestamp executeAt, boolean
futureReadPossible)
+ {
+ return true;
+ }
};
/**
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index 3e0828d7..152f67c8 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -27,8 +27,13 @@ import accord.local.SafeCommandStore;
import accord.primitives.Participants;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.SaveStatus.PreApplied;
+import static accord.primitives.SaveStatus.TruncatedApply;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+import static accord.primitives.Status.Applied;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.Write;
@@ -39,27 +44,32 @@ public abstract class AbstractLoader implements
Journal.Loader
{
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
Command command = safeCommand.current();
- if (command.is(Stable) || command.saveStatus() == PreApplied)
+ if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 &&
command.saveStatus().compareTo(PreApplied) <= 0)
{
if (Commands.maybeExecute(safeStore, safeCommand, command, true,
true))
return;
}
- else if (command.txnId().is(Write) &&
command.saveStatus().compareTo(SaveStatus.Stable) >= 0 &&
!command.hasBeen(Truncated))
+ else if (command.saveStatus().compareTo(Applying) >= 0 &&
command.saveStatus().compareTo(TruncatedApplyWithOutcome) <= 0)
{
- CommandStore unsafeStore = safeStore.commandStore();
- Command.Executed executed = command.asExecuted();
- Participants<?> executes = executed.participants().stillExecutes();
- if (!executes.isEmpty())
+ if (command.txnId().is(Write))
{
- command.writes()
- .apply(safeStore, executes, command.partialTxn())
- .invoke(() -> unsafeStore.build(txnId, ss -> {
- Commands.postApply(ss, txnId, -1, true);
- }))
- .begin(safeStore.agent());
- return;
+ CommandStore unsafeStore = safeStore.commandStore();
+ Command.Executed executed = command.asExecuted();
+ Participants<?> executes =
executed.participants().stillExecutes();
+ if (!executes.isEmpty())
+ {
+ command.writes()
+ .apply(safeStore, executes, command.partialTxn())
+ .invoke(() -> unsafeStore.build(txnId, ss -> {
+ Commands.postApply(ss, txnId, -1, true);
+ }))
+ .begin(safeStore.agent());
+ return;
+ }
}
+ else Invariants.expect(command.hasBeen(Applied));
}
safeCommand.update(safeStore, command, true);
+ safeStore.notifyListeners(safeCommand, null);
}
}
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 521c8a4b..7708c9cd 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -419,7 +419,7 @@ public class TopologyManager
* Mark the epoch as "closed" for the provided ranges; this means that
no new transactions
* that intersect with this range may be proposed in the epoch (they
will be rejected).
*/
- public void epochClosed(Ranges ranges, long epoch)
+ public Ranges epochClosed(Ranges ranges, long epoch)
{
Invariants.requireArgument(epoch > 0);
int i;
@@ -437,18 +437,20 @@ public class TopologyManager
if (i == -1)
{
Invariants.require(epoch < minEpoch(), "Could not find epoch
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
- return; // notification came for an already truncated epoch
+ return Ranges.EMPTY; // notification came for an already
truncated epoch
}
+ Ranges report = ranges = epochs[i++].recordClosed(ranges);
while (!ranges.isEmpty() && i < epochs.length)
ranges = epochs[i++].recordClosed(ranges);
+ return report;
}
/**
* Mark the epoch as "retired" for the provided ranges; this means
that all transactions that can be
* proposed for this epoch have now been executed globally.
*/
- public void epochRetired(Ranges ranges, long epoch)
+ public Ranges epochRetired(Ranges ranges, long epoch)
{
Invariants.requireArgument(epoch > 0);
int retiredIdx;
@@ -462,11 +464,14 @@ public class TopologyManager
{
retiredIdx = indexOf(epoch);
if (retiredIdx < 0)
- return;
+ return Ranges.EMPTY;
}
- for (int i = retiredIdx; !ranges.isEmpty() && i < epochs.length;
i++)
- ranges = epochs[i].recordRetired(ranges);
+ int i = retiredIdx;
+ Ranges report = ranges = epochs[i++].recordRetired(ranges);
+ while (!ranges.isEmpty() && i < epochs.length)
+ ranges = epochs[i++].recordRetired(ranges);
+ return report;
}
private Notifications pending(long epoch)
@@ -1572,7 +1577,7 @@ public class TopologyManager
}
}
- public interface Collectors<C, K, T>
+ interface Collectors<C, K, T>
{
C allocate(int size);
C update(C collector, EpochState epoch, K select, boolean
permitMissing);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]