This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit aa1d855c9e8863a0c4e72f255e26a5b161b6767d
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Tue Sep 24 22:44:30 2024 +0200

    Fixes after rebase and an attempt to reconstruct redundant before
---
 .../java/accord/impl/InMemoryCommandStore.java     | 58 +++++++++++++++++-----
 .../src/main/java/accord/local/Bootstrap.java      | 32 ++++++------
 .../src/main/java/accord/local/Cleanup.java        |  2 +-
 .../src/main/java/accord/local/CommandStore.java   | 43 ++++++----------
 .../src/main/java/accord/local/Commands.java       |  4 +-
 .../main/java/accord/local/SafeCommandStore.java   |  2 +-
 .../src/test/java/accord/burn/BurnTest.java        |  3 +-
 .../accord/impl/basic/DelayedCommandStores.java    |  1 -
 8 files changed, 83 insertions(+), 62 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index c74e9f27..65bc1618 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -42,6 +42,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,16 +51,19 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
+import accord.api.Scheduler;
 import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.Commands;
+import accord.local.DurableBefore;
 import accord.local.KeyHistory;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
+import accord.local.RedundantBefore;
 import accord.local.RedundantStatus;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
@@ -127,16 +131,36 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
     // To simulate the delay in simulatedAsyncPersist
     private final Scheduler scheduler;
-    private static <T> FieldPersister<T> 
simulatedAsyncPersistFactory(Scheduler scheduler)
-    {
-        return (commandStore, toPersist) -> simulatedAsyncPersist(scheduler, 
commandStore, toPersist);
-    }
 
-    private static <T> AsyncResult<?> simulatedAsyncPersist(Scheduler 
scheduler, CommandStore store, T toPersist)
+    private static final class SimulatedFieldPersister<T> implements 
FieldPersister<T>
     {
-        AsyncResult.Settable<?> result = AsyncResults.settable();
-        scheduler.once(() -> result.trySuccess(null), 100, 
TimeUnit.MICROSECONDS);
-        return result;
+        private T lastValue;
+        private final Scheduler scheduler;
+        private final Node node;
+        private final int id;
+        public SimulatedFieldPersister(Scheduler scheduler, T defaultValue, 
Node node, int id)
+        {
+            this.scheduler = scheduler;
+            this.lastValue = defaultValue;
+            this.node = node;
+            this.id = id;
+        }
+
+        public AsyncResult<?> persist(CommandStore store, T toPersist)
+        {
+            System.out.println("Persisting for " + node.id() + "-store-" + id);
+            AsyncResult.Settable<?> result = AsyncResults.settable();
+            scheduler.once(() -> {
+                lastValue = toPersist;
+                result.trySuccess(null);
+            }, 100, TimeUnit.MICROSECONDS);
+            return result;
+        }
+
+        public T restore()
+        {
+            return lastValue;
+        }
     }
 
     public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Scheduler scheduler)
@@ -148,10 +172,10 @@ public abstract class InMemoryCommandStore extends 
CommandStore
               progressLogFactory,
               listenersFactory,
               epochUpdateHolder,
-              simulatedAsyncPersistFactory(scheduler),
-              simulatedAsyncPersistFactory(scheduler),
-              simulatedAsyncPersistFactory(scheduler),
-              simulatedAsyncPersistFactory(scheduler));
+              new SimulatedFieldPersister<>(scheduler, DurableBefore.EMPTY, 
(Node) time, id),
+              new SimulatedFieldPersister<>(scheduler, RedundantBefore.EMPTY, 
(Node) time, id),
+              new SimulatedFieldPersister<>(scheduler, 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id),
+              new SimulatedFieldPersister<>(scheduler, 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY), (Node) time, id));
         this.scheduler = scheduler;
     }
 
@@ -1358,6 +1382,16 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         commandsForKey.clear();
         rangeCommands.clear();
         historicalRangeCommands.clear();
+
+        durableBeforePersistentField.clearAndRestore();
+        redundantBeforePersistentField.clearAndRestore();
+        bootstrapBeganAtPersistentField.clearAndRestore();
+        safeToReadPersistentField.clearAndRestore();
+    }
+
+    protected void setRedundantBefore(RedundantBefore newRedundantBefore)
+    {
+        super.setRedundantBefore(newRedundantBefore);
     }
 
     public interface Loader
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index fe2fa9ed..b1ed9846 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -143,22 +143,22 @@ class Bootstrap
             // of these ranges as part of this attempt
             Ranges commitRanges = valid;
             store.markBootstrapping(safeStore0.commandStore(), globalSyncId, 
valid).flatMap(ignore ->
-                CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
-                   // ATM all known implementations store ranges in-memory, 
but this will not be true soon, so this will need to be addressed
-                   .flatMap(syncPoint -> node.withEpoch(epoch, () -> 
store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), 
KeyHistory.COMMANDS), safeStore1 -> {
-                       if (valid.isEmpty()) // we've lost ownership of the 
range
-                           return AsyncResults.success(Ranges.EMPTY);
-
-                   
Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, 
valid);
-                   
safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, 
safeStore1);
-                   return fetch = safeStore1.dataStore().fetch(node, 
safeStore1, valid, syncPoint, this);
-               })))
-               .flatMap(i -> i)
-               .flatMap(ranges -> store.execute(contextFor(localSyncId), 
safeStore -> {
-                   if (!ranges.isEmpty())
-                       Commands.markBootstrapComplete(safeStore, localSyncId, 
ranges);
-               }))
-               .begin(this);
+            CoordinateSyncPoint.exclusive(node, globalSyncId, commitRanges)
+                               // ATM all known implementations store ranges 
in-memory, but this will not be true soon, so this will need to be addressed
+                               .flatMap(syncPoint -> node.withEpoch(epoch, () 
-> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys(), 
KeyHistory.COMMANDS), safeStore1 -> {
+                                   if (valid.isEmpty()) // we've lost 
ownership of the range
+                                       return 
AsyncResults.success(Ranges.EMPTY);
+
+                                   
Commands.createBootstrapCompleteMarkerTransaction(safeStore1, localSyncId, 
valid);
+                                   
safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor, 
safeStore1);
+                                   return fetch = 
safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this);
+                               })))
+                               .flatMap(i -> i)
+                               .flatMap(ranges -> 
store.execute(contextFor(localSyncId), safeStore -> {
+                                   if (!ranges.isEmpty())
+                                       
Commands.markBootstrapComplete(safeStore, localSyncId, ranges);
+                               })))
+                 .begin(this);
         }
 
         // we no longer want to fetch these ranges (perhaps we no longer own 
them)
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index eea389b5..c86192bd 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -135,7 +135,7 @@ public enum Cleanup
                 //      - we can impose additional validations here IF we 
receive an epoch upper bound
                 //      - we should be more robust to the presence/absence of 
executeAt
                 //      - be cognisant of future epochs that participated only 
for PreAccept/Accept, but where txn was not committed to execute in the epoch 
(this is why we provide null toEpoch here)
-                illegalState("Command " + txnId + " that is being loaded is 
not owned by this shard on route " + route);
+                illegalState(String.format("Command %s that is being loaded is 
not owned by this shard on route %s. Redundant before: %s", txnId, route, 
redundantBefore));
             }
         }
         switch (redundant)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index ab91b32c..b57569ac 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -18,30 +18,6 @@
 
 package accord.local;
 
-import accord.api.LocalListeners;
-import accord.api.ProgressLog;
-import accord.api.DataStore;
-import accord.api.VisibleForImplementationTesting;
-import accord.coordinate.CollectCalculatedDeps;
-import accord.local.Command.WaitingOn;
-
-import javax.annotation.Nullable;
-import accord.api.Agent;
-
-import accord.local.CommandStores.RangesForEpoch;
-import accord.primitives.Deps;
-import accord.primitives.KeyDeps;
-import accord.primitives.Keys;
-import accord.primitives.Range;
-import accord.primitives.Routables;
-import accord.utils.async.AsyncChain;
-
-import accord.api.ConfigurationService.EpochReady;
-import accord.utils.DeterministicIdentitySet;
-import accord.utils.Invariants;
-import accord.utils.ReducingRangeMap;
-import accord.utils.async.AsyncResult;
-
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Collections;
 import java.util.HashMap;
@@ -75,6 +51,7 @@ import accord.api.VisibleForImplementationTesting;
 import accord.coordinate.CollectCalculatedDeps;
 import accord.local.Command.WaitingOn;
 import accord.local.CommandStores.RangesForEpoch;
+import accord.primitives.Deps;
 import accord.primitives.FullRoute;
 import accord.primitives.KeyDeps;
 import accord.primitives.Keys;
@@ -212,10 +189,10 @@ public abstract class CommandStore implements 
AgentExecutor
     private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new 
DeterministicIdentitySet<>());
     @Nullable private ReducingRangeMap<Timestamp> rejectBefore;
 
-    private final PersistentField<DurableBefore, DurableBefore> 
durableBeforePersistentField;
-    private final PersistentField<RedundantBefore, RedundantBefore> 
redundantBeforePersistentField;
-    private final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, 
Ranges>> bootstrapBeganAtPersistentField;
-    private final PersistentField<NavigableMap<Timestamp, Ranges>, 
NavigableMap<Timestamp, Ranges>> safeToReadPersistentField;
+    protected final PersistentField<DurableBefore, DurableBefore> 
durableBeforePersistentField;
+    protected final PersistentField<RedundantBefore, RedundantBefore> 
redundantBeforePersistentField;
+    protected final PersistentField<BootstrapSyncPoint, NavigableMap<TxnId, 
Ranges>> bootstrapBeganAtPersistentField;
+    protected final PersistentField<NavigableMap<Timestamp, Ranges>, 
NavigableMap<Timestamp, Ranges>> safeToReadPersistentField;
 
     protected CommandStore(int id,
                            NodeTimeService time,
@@ -335,7 +312,7 @@ public abstract class CommandStore implements AgentExecutor
     }
 
     // For implementations to use after persistence
-    protected final void setRedundantBefore(RedundantBefore newRedundantBefore)
+    protected void setRedundantBefore(RedundantBefore newRedundantBefore)
     {
         redundantBefore = newRedundantBefore;
     }
@@ -914,6 +891,8 @@ public abstract class CommandStore implements AgentExecutor
         }
 
         AsyncResult<?> persist(CommandStore store, T toPersist);
+
+        default T restore() { return  null; };
     }
 
     // A helper class for implementing fields that needs to be asynchronously 
persisted and concurrent updates
@@ -945,6 +924,12 @@ public abstract class CommandStore implements AgentExecutor
             this.set = set;
         }
 
+        public void clearAndRestore()
+        {
+            pendingValue = null;
+            set.accept(persister.restore());
+        }
+
         public AsyncResult<?> mergeAndUpdate(@Nonnull I inputValue, @Nullable 
Timestamp gcBefore, @Nullable Ranges updatedRanges, boolean 
remergeAfterPersistence)
         {
             checkNotNull(merge, "merge cannot be null");
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 956edf77..5d83e33d 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -75,7 +75,9 @@ import static accord.local.Status.PreApplied;
 import static accord.local.Status.PreCommitted;
 import static accord.local.Status.Stable;
 import static accord.local.Status.Truncated;
+import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Route.isFullRoute;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static accord.utils.Invariants.illegalState;
 
 public class Commands
@@ -615,7 +617,7 @@ public class Commands
         return maybeExecute(safeStore, safeCommand, safeCommand.current(), 
alwaysNotifyListeners, notifyWaitingOn);
     }
 
-    public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand 
safeCommand, Command command, boolean alwaysNotifyListeners, boolean 
notifyWaitingOn)
+    public static boolean maybeExecute(SafeCommandStore safeStore0, 
SafeCommand safeCommand0, Command command, boolean alwaysNotifyListeners, 
boolean notifyWaitingOn)
     {
         if (logger.isTraceEnabled())
             logger.trace("{}: Maybe executing with status {}. Will notify 
listeners on noop: {}", command.txnId(), command.status(), 
alwaysNotifyListeners);
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index c2a02d18..bcabdb90 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -230,7 +230,7 @@ public abstract class SafeCommandStore
         if (newSaveStatus == Applied && oldSaveStatus != Applied)
         {
             Ranges ranges = updated.route().slice(ranges().all(), 
Minimal).toRanges();
-            commandStore().markExclusiveSyncPointLocallyApplied(this, txnId, 
ranges);
+            
commandStore().markExclusiveSyncPointLocallyApplied(this.commandStore(), txnId, 
ranges);
         }
     }
 
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 6d6b1cfa..053d4e7c 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -575,7 +575,8 @@ public class BurnTest
     @Test
     public void testOne()
     {
-        run(System.nanoTime());
+//        run(System.nanoTime());
+        run(98634622518625l);
     }
 
     private static void run(long seed)
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index b583b70b..c1cb4fac 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -40,7 +40,6 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.api.Result;
 import accord.api.Scheduler;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to