This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit c6b2cbaac852c5c2453ff9b289db0866ccfa0345
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Sep 25 18:23:10 2024 +0100

    wip: journal replay finishing touches
---
 .../src/main/java/accord/api/DataStore.java        |  3 +-
 .../java/accord/impl/InMemoryCommandStore.java     | 48 +++++++++---
 .../src/main/java/accord/local/CommandStore.java   | 85 ++++++++++------------
 .../accord/impl/list/ListFetchCoordinator.java     |  5 +-
 .../src/test/java/accord/impl/list/ListStore.java  | 12 ++-
 .../src/test/java/accord/impl/mock/MockStore.java  |  8 ++
 .../main/java/accord/maelstrom/MaelstromStore.java |  9 +++
 7 files changed, 109 insertions(+), 61 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/DataStore.java 
b/accord-core/src/main/java/accord/api/DataStore.java
index a93ff073..45d62101 100644
--- a/accord-core/src/main/java/accord/api/DataStore.java
+++ b/accord-core/src/main/java/accord/api/DataStore.java
@@ -25,6 +25,7 @@ import accord.local.SafeCommandStore;
 import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
@@ -112,6 +113,6 @@ public interface DataStore
     }
 
     FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, 
SyncPoint syncPoint, FetchRanges callback);
-    default AsyncResult<Void> snapshot(Ranges ranges) { return 
AsyncResults.success(null); };
+    AsyncResult<Void> snapshot(Ranges ranges, TxnId before);
     default void restoreFromSnapshot() {};
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 1b474779..2793b6e9 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -56,10 +56,12 @@ import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.Commands;
+import accord.local.DurableBefore;
 import accord.local.KeyHistory;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
+import accord.local.RedundantBefore;
 import accord.local.RedundantStatus;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
@@ -84,6 +86,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.Txn.Kind.Kinds;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
+import accord.utils.ReducingRangeMap;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
@@ -428,15 +431,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         return new InMemorySafeStore(this, ranges, context, commands, 
timestampsForKey, commandsForKeys);
     }
 
-    private void loadCommandsForKey(RoutableKey key,
-                                    KeyHistory keyHistory,
-                                    Map<RoutableKey, 
InMemorySafeTimestampsForKey> timestampsForKey,
-                                    Map<RoutableKey, 
InMemorySafeCommandsForKey> commandsForKey)
-    {
-        commandsForKey.put(key, commandsForKey((Key) 
key).createSafeReference());
-        timestampsForKey.put(key, timestampsForKey((Key) 
key).createSafeReference());
-    }
-
     protected void validateRead(Command current) {}
 
     protected final InMemorySafeStore createSafeStore(PreLoadContext context, 
RangesForEpoch ranges)
@@ -1466,4 +1460,40 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             historicalRangeCommands.merge(txnId, ranges.slice(allRanges), 
Ranges::with);
         });
     }
+
+    @Override
+    public void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch)
+    {
+        super.unsafeSetRangesForEpoch(newRangesForEpoch);
+    }
+
+    @Override
+    public void unsafeSetDurableBefore(DurableBefore newDurableBefore)
+    {
+        super.unsafeSetDurableBefore(newDurableBefore);
+    }
+
+    @Override
+    public void unsafeSetRedundantBefore(RedundantBefore newRedundantBefore)
+    {
+        super.unsafeSetRedundantBefore(newRedundantBefore);
+    }
+
+    @Override
+    public void unsafeSetRejectBefore(ReducingRangeMap<Timestamp> 
newRejectBefore)
+    {
+        super.unsafeSetRejectBefore(newRejectBefore);
+    }
+
+    @Override
+    public void unsafeSetSafeToRead(NavigableMap<Timestamp, Ranges> 
newSafeToRead)
+    {
+        super.unsafeSetSafeToRead(newSafeToRead);
+    }
+
+    @Override
+    public void unsafeSetBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
+    {
+        super.unsafeSetBootstrapBeganAt(newBootstrapBeganAt);
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 298d0c04..966c4418 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -33,6 +33,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +43,6 @@ import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.api.VisibleForImplementationTesting;
 import accord.coordinate.CollectCalculatedDeps;
 import accord.local.Command.WaitingOn;
 import accord.local.CommandStores.RangesForEpoch;
@@ -217,9 +217,9 @@ public abstract class CommandStore implements AgentExecutor
 
         update = epochUpdateHolder.getAndSet(null);
         if (!update.addGlobalRanges.isEmpty())
-            setDurableBefore(DurableBefore.merge(durableBefore, 
DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)));
+            upsertDurableBefore(DurableBefore.create(update.addGlobalRanges, 
TxnId.NONE, TxnId.NONE));
         if (update.addRedundantBefore.size() > 0)
-            setRedundantBefore(RedundantBefore.merge(redundantBefore, 
update.addRedundantBefore));
+            upsertRedundantBefore(update.addRedundantBefore);
         if (update.newRangesForEpoch != null)
             rangesForEpoch = update.newRangesForEpoch;
         return rangesForEpoch;
@@ -230,6 +230,11 @@ public abstract class CommandStore implements AgentExecutor
         return rangesForEpoch;
     }
 
+    protected void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch)
+    {
+        rangesForEpoch = newRangesForEpoch;
+    }
+
     public abstract boolean inStore();
 
     public void maybeExecuteImmediately(Runnable task)
@@ -246,39 +251,42 @@ public abstract class CommandStore implements 
AgentExecutor
     protected abstract void registerHistoricalTransactions(Deps deps, 
SafeCommandStore safeStore);
 
     // implementations are expected to override this for persistence
-    protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+    public void upsertDurableBefore(DurableBefore addDurableBefore)
     {
-        this.rejectBefore = newRejectBefore;
+        durableBefore = DurableBefore.merge(durableBefore, addDurableBefore);
     }
 
-    protected final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
+    protected void unsafeSetRejectBefore(ReducingRangeMap<Timestamp> 
newRejectBefore)
     {
-        this.bootstrapBeganAt = newBootstrapBeganAt;
+        this.rejectBefore = newRejectBefore;
     }
 
-    public DurableBefore durableBefore()
+    protected void upsertRedundantBefore(RedundantBefore addRedundantBefore)
     {
-        return durableBefore;
+        redundantBefore = RedundantBefore.merge(redundantBefore, 
addRedundantBefore);
     }
 
-    public final void upsertDurableBefore(DurableBefore addDurableBefore)
+    protected void unsafeSetDurableBefore(DurableBefore newDurableBefore)
     {
-        durableBefore = DurableBefore.merge(durableBefore, addDurableBefore);
+        durableBefore = newDurableBefore;
     }
 
-    protected final void setDurableBefore(DurableBefore newDurableBefore)
+    protected void unsafeSetRedundantBefore(RedundantBefore newRedundantBefore)
     {
-        durableBefore = newDurableBefore;
+        redundantBefore = newRedundantBefore;
     }
 
-    protected void upsertRedundantBefore(RedundantBefore addRedundantBefore)
+    /**
+     * This method may be invoked on a non-CommandStore thread
+     */
+    protected synchronized void unsafeSetSafeToRead(NavigableMap<Timestamp, 
Ranges> newSafeToRead)
     {
-        redundantBefore = RedundantBefore.merge(redundantBefore, 
addRedundantBefore);
+        this.safeToRead = newSafeToRead;
     }
 
-    protected void setRedundantBefore(RedundantBefore newRedundantBefore)
+    protected void unsafeSetBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
     {
-        redundantBefore = newRedundantBefore;
+        this.bootstrapBeganAt = newBootstrapBeganAt;
     }
 
     /**
@@ -300,21 +308,13 @@ public abstract class CommandStore implements 
AgentExecutor
         setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt));
     }
 
-    /**
-     * This method may be invoked on a non-CommandStore thread
-     */
-    protected final synchronized void setSafeToRead(NavigableMap<Timestamp, 
Ranges> newSafeToRead)
-    {
-        this.safeToRead = newSafeToRead;
-    }
-
     public final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId 
txnId, Ranges ranges)
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint);
         ReducingRangeMap<Timestamp> newRejectBefore = rejectBefore != null ? 
rejectBefore : new ReducingRangeMap<>();
         newRejectBefore = ReducingRangeMap.add(newRejectBefore, ranges, txnId, 
Timestamp::max);
-        setRejectBefore(newRejectBefore);
+        unsafeSetRejectBefore(newRejectBefore);
     }
 
     public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges)
@@ -322,7 +322,7 @@ public abstract class CommandStore implements AgentExecutor
         // TODO (desired): narrow ranges to those that are owned
         Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint);
         RedundantBefore newRedundantBefore = 
RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, 
TxnId.NONE, TxnId.NONE, TxnId.NONE));
-        setRedundantBefore(newRedundantBefore);
+        unsafeSetRedundantBefore(newRedundantBefore);
         updatedRedundantBefore(safeStore, txnId, ranges);
     }
 
@@ -515,7 +515,7 @@ public abstract class CommandStore implements AgentExecutor
 
     final void markBootstrapping(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges)
     {
-        setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt));
+        unsafeSetBootstrapBeganAt(bootstrap(globalSyncId, ranges, 
bootstrapBeganAt));
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, 
globalSyncId);
         upsertRedundantBefore(addRedundantBefore);
         upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, 
TxnId.NONE));
@@ -532,7 +532,7 @@ public abstract class CommandStore implements AgentExecutor
         upsertDurableBefore(addDurableBefore);
         updatedRedundantBefore(safeStore, globalSyncId, slicedRanges);
         safeStore = safeStore; // make unusable in lambda
-        safeStore.dataStore().snapshot(slicedRanges).begin((success, fail) -> {
+        safeStore.dataStore().snapshot(slicedRanges, 
globalSyncId).begin((success, fail) -> {
             if (fail != null)
             {
                 logger.error("Unsuccessful dataStore snapshot; unable to 
update GC markers", fail);
@@ -569,7 +569,7 @@ public abstract class CommandStore implements AgentExecutor
         agent.onStale(staleSince, ranges);
 
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast);
-        setRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore));
+        upsertRedundantBefore(addRedundantBefore);
         // find which ranges need to bootstrap, subtracting those already in 
progress that cover the id
 
         markUnsafeToRead(ranges);
@@ -614,10 +614,15 @@ public abstract class CommandStore implements 
AgentExecutor
         return redundantBefore;
     }
 
-    @VisibleForImplementationTesting
+    public DurableBefore durableBefore()
+    {
+        return durableBefore;
+    }
+
+    @VisibleForTesting
     public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return 
bootstrapBeganAt; }
 
-    @VisibleForImplementationTesting
+    @VisibleForTesting
     public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; }
 
     public final boolean isRejectedIfNotPreAccepted(TxnId txnId, 
Unseekables<?> participants)
@@ -771,25 +776,13 @@ public abstract class CommandStore implements 
AgentExecutor
     final synchronized void markUnsafeToRead(Ranges ranges)
     {
         if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges)))
-            setSafeToRead(purgeHistory(safeToRead, ranges));
+            unsafeSetSafeToRead(purgeHistory(safeToRead, ranges));
     }
 
     final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp 
at, Ranges ranges)
     {
         Ranges validatedSafeToRead = 
redundantBefore.validateSafeToRead(forBootstrapAt, ranges);
-        setSafeToRead(purgeAndInsert(safeToRead, at, validatedSafeToRead));
-    }
-
-    protected static class BootstrapSyncPoint
-    {
-        final TxnId syncTxnId;
-        final Ranges ranges;
-
-        protected BootstrapSyncPoint(TxnId syncTxnId, Ranges ranges)
-        {
-            this.syncTxnId = syncTxnId;
-            this.ranges = ranges;
-        }
+        unsafeSetSafeToRead(purgeAndInsert(safeToRead, at, 
validatedSafeToRead));
     }
 
     protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, 
Ranges ranges, NavigableMap<TxnId, Ranges> bootstrappedAt)
diff --git 
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java 
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index 575dda1b..a4687514 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -66,10 +66,7 @@ public class ListFetchCoordinator extends 
AbstractFetchCoordinator
         persisting.add(commandStore.execute(PreLoadContext.empty(), safeStore 
-> {
             listData.forEach((key, value) -> listStore.data.merge(key, value, 
Timestamped::merge));
         }).addCallback((ignore, fail) -> {
-            if (fail == null) {
-                success(from, received);
-                listStore.snapshot();
-            }
+            if (fail == null) success(from, received);
             else fail(from, received, fail);
         }).beginAsResult());
     }
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 81416540..2473d6dc 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -41,8 +41,13 @@ import accord.coordinate.Timeout;
 import accord.coordinate.TopologyMismatch;
 import accord.coordinate.tracking.AllTracker;
 import accord.coordinate.tracking.RequestStatus;
+import accord.impl.InMemoryCommandStore;
 import accord.impl.basic.SimulatedFault;
+import accord.local.CommandStore;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.DurableBefore;
 import accord.local.Node;
+import accord.local.RedundantBefore;
 import accord.local.SafeCommandStore;
 import accord.messages.Callback;
 import accord.messages.ReadData;
@@ -59,6 +64,7 @@ import accord.topology.Topologies;
 import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.RandomSource;
+import accord.utils.ReducingRangeMap;
 import accord.utils.Timestamped;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
@@ -194,7 +200,7 @@ public class ListStore implements DataStore
     private final Deque<PendingSnapshot> pendingSnapshots = new ArrayDeque<>();
     private long pendingDelay = 0;
 
-    public AsyncResult<Void> snapshot()
+    public AsyncResult<Void> snapshot(Ranges ranges, TxnId before)
     {
         Snapshot snapshot = new Snapshot(data, addedAts, removedAts, 
purgedAts, fetchCompletes, pendingRemoves);
         AsyncResult.Settable<Void> result = new 
AsyncResults.SettableResult<>();
@@ -227,12 +233,16 @@ public class ListStore implements DataStore
 
     public void restoreFromSnapshot()
     {
+        if (snapshot == null)
+            return;
+
         data.putAll(snapshot.data);
         addedAts.addAll(snapshot.addedAts);
         removedAts.addAll(snapshot.removedAts);
         purgedAts.addAll(snapshot.purgedAts);
         fetchCompletes.addAll(snapshot.fetchCompletes);
         pendingRemoves.addAll(snapshot.pendingRemoves);
+        InMemoryCommandStore commandStore = (InMemoryCommandStore) 
CommandStore.current();
     }
 
     public void clear()
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java 
b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index e3613fb2..803c7efc 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -33,9 +33,11 @@ import accord.primitives.Seekable;
 import accord.primitives.Seekables;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
 public class MockStore implements DataStore
@@ -149,4 +151,10 @@ public class MockStore implements DataStore
         callback.fetched(ranges);
         return new ImmediateFetchFuture(ranges);
     }
+
+    @Override
+    public AsyncResult<Void> snapshot(Ranges ranges, TxnId before)
+    {
+        return AsyncResults.success(null);
+    }
 }
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
index 013db36d..777700cc 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
@@ -27,7 +27,10 @@ import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
+import accord.primitives.TxnId;
 import accord.utils.Timestamped;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
 import accord.utils.async.AsyncResults.SettableResult;
 
 public class MaelstromStore implements DataStore
@@ -57,4 +60,10 @@ public class MaelstromStore implements DataStore
     {
         return new ImmediateFetchResult(ranges);
     }
+
+    @Override
+    public AsyncResult<Void> snapshot(Ranges ranges, TxnId before)
+    {
+        return AsyncResults.success(null);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to