This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-19944-persistent-fields
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 1f6d998835c64e7a4f6ac8bc23c5e052760ac493
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Sep 26 15:59:36 2024 +0200

    Make it easier to log changes in rangesForEpoch, durableBefore, 
redundantBefore, safeToRead, and rangesForEpoch
---
 .../java/accord/impl/InMemoryCommandStore.java     | 13 ++-
 .../src/main/java/accord/local/CommandStore.java   | 94 ++++++++++++----------
 .../main/java/accord/local/SafeCommandStore.java   | 31 +++++++
 .../java/accord/messages/SetGloballyDurable.java   |  2 +-
 4 files changed, 94 insertions(+), 46 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 6ba93447..e66b25a9 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -479,8 +479,8 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     {
         if (current != null)
             throw illegalState("Another operation is in progress or it's store 
was not cleared");
-        RangesForEpoch rangesForEpoch = updateRangesForEpoch();
         current = createSafeStore(context, rangesForEpoch);
+        updateRangesForEpoch(current);
         return current;
     }
 
@@ -663,7 +663,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         protected final Map<TxnId, InMemorySafeCommand> commands;
         private final Map<RoutableKey, InMemorySafeTimestampsForKey> 
timestampsForKey;
         private final Map<RoutableKey, InMemorySafeCommandsForKey> 
commandsForKey;
-        private final RangesForEpoch ranges;
+        private RangesForEpoch ranges;
 
         public InMemorySafeStore(InMemoryCommandStore commandStore,
                                  RangesForEpoch ranges,
@@ -677,7 +677,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             this.commands = commands;
             this.commandsForKey = commandsForKey;
             this.timestampsForKey = timestampsForKey;
-            this.ranges = Invariants.nonNull(ranges);
+            this.ranges = ranges;
         }
 
         @Override
@@ -797,6 +797,13 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             return ranges;
         }
 
+        @Override
+        public void setRangesForEpoch(RangesForEpoch rangesForEpoch)
+        {
+            super.setRangesForEpoch(rangesForEpoch);
+            ranges = rangesForEpoch;
+        }
+
         @Override
         public NodeTimeService time()
         {
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 966c4418..2ebe1c44 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -33,7 +33,6 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -209,20 +208,19 @@ public abstract class CommandStore implements 
AgentExecutor
         return agent;
     }
 
-    public RangesForEpoch updateRangesForEpoch()
+    public void updateRangesForEpoch(SafeCommandStore safeStore)
     {
         EpochUpdate update = epochUpdateHolder.get();
         if (update == null)
-            return rangesForEpoch;
+            return;
 
         update = epochUpdateHolder.getAndSet(null);
         if (!update.addGlobalRanges.isEmpty())
-            upsertDurableBefore(DurableBefore.create(update.addGlobalRanges, 
TxnId.NONE, TxnId.NONE));
+            
safeStore.upsertDurableBefore(DurableBefore.create(update.addGlobalRanges, 
TxnId.NONE, TxnId.NONE));
         if (update.addRedundantBefore.size() > 0)
-            upsertRedundantBefore(update.addRedundantBefore);
+            safeStore.upsertRedundantBefore(update.addRedundantBefore);
         if (update.newRangesForEpoch != null)
-            rangesForEpoch = update.newRangesForEpoch;
-        return rangesForEpoch;
+            safeStore.setRangesForEpoch(update.newRangesForEpoch);
     }
 
     public RangesForEpoch unsafeRangesForEpoch()
@@ -250,8 +248,7 @@ public abstract class CommandStore implements AgentExecutor
 
     protected abstract void registerHistoricalTransactions(Deps deps, 
SafeCommandStore safeStore);
 
-    // implementations are expected to override this for persistence
-    public void upsertDurableBefore(DurableBefore addDurableBefore)
+    protected void upsertDurableBefore(DurableBefore addDurableBefore)
     {
         durableBefore = DurableBefore.merge(durableBefore, addDurableBefore);
     }
@@ -261,6 +258,7 @@ public abstract class CommandStore implements AgentExecutor
         this.rejectBefore = newRejectBefore;
     }
 
+    // Should be called _only_ via safe command store
     protected void upsertRedundantBefore(RedundantBefore addRedundantBefore)
     {
         redundantBefore = RedundantBefore.merge(redundantBefore, 
addRedundantBefore);
@@ -276,6 +274,13 @@ public abstract class CommandStore implements AgentExecutor
         redundantBefore = newRedundantBefore;
     }
 
+    protected void upsertRejectBefore(TxnId txnId, Ranges ranges)
+    {
+        ReducingRangeMap<Timestamp> newRejectBefore = rejectBefore != null ? 
rejectBefore : new ReducingRangeMap<>();
+        newRejectBefore = ReducingRangeMap.add(newRejectBefore, ranges, txnId, 
Timestamp::max);
+        unsafeSetRejectBefore(newRejectBefore);
+    }
+
     /**
      * This method may be invoked on a non-CommandStore thread
      */
@@ -312,9 +317,7 @@ public abstract class CommandStore implements AgentExecutor
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.checkArgument(txnId.kind() == ExclusiveSyncPoint);
-        ReducingRangeMap<Timestamp> newRejectBefore = rejectBefore != null ? 
rejectBefore : new ReducingRangeMap<>();
-        newRejectBefore = ReducingRangeMap.add(newRejectBefore, ranges, txnId, 
Timestamp::max);
-        unsafeSetRejectBefore(newRejectBefore);
+        safeStore.upsertRejectBefore(txnId, ranges);
     }
 
     public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges)
@@ -515,10 +518,11 @@ public abstract class CommandStore implements 
AgentExecutor
 
     final void markBootstrapping(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges)
     {
-        unsafeSetBootstrapBeganAt(bootstrap(globalSyncId, ranges, 
bootstrapBeganAt));
+        safeStore.upsertSetBootstrapBeganAt(bootstrap(globalSyncId, ranges, 
bootstrapBeganAt));
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE, 
globalSyncId);
-        upsertRedundantBefore(addRedundantBefore);
-        upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, 
TxnId.NONE));
+        safeStore.upsertRedundantBefore(addRedundantBefore);
+        safeStore.upsertDurableBefore(DurableBefore.create(ranges, TxnId.NONE, 
TxnId.NONE));
+        // TODO: can we use `upsert` for notifications?
         updatedRedundantBefore(safeStore, globalSyncId, ranges);
     }
 
@@ -527,9 +531,9 @@ public abstract class CommandStore implements AgentExecutor
     {
         final Ranges slicedRanges = 
durableRanges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal);
         RedundantBefore addShardRedundant = 
RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, 
TxnId.NONE, globalSyncId, TxnId.NONE, TxnId.NONE);
-        upsertRedundantBefore(addShardRedundant);
+        safeStore.upsertRedundantBefore(addShardRedundant);
         DurableBefore addDurableBefore = DurableBefore.create(slicedRanges, 
globalSyncId, globalSyncId);
-        upsertDurableBefore(addDurableBefore);
+        safeStore.upsertDurableBefore(addDurableBefore);
         updatedRedundantBefore(safeStore, globalSyncId, slicedRanges);
         safeStore = safeStore; // make unusable in lambda
         safeStore.dataStore().snapshot(slicedRanges, 
globalSyncId).begin((success, fail) -> {
@@ -541,7 +545,7 @@ public abstract class CommandStore implements AgentExecutor
 
             execute(PreLoadContext.empty(), safeStore0 -> {
                 RedundantBefore addGc = RedundantBefore.create(slicedRanges, 
Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, globalSyncId, 
TxnId.NONE);
-                upsertRedundantBefore(addGc);
+                safeStore0.upsertRedundantBefore(addGc);
             });
         });
     }
@@ -569,9 +573,10 @@ public abstract class CommandStore implements AgentExecutor
         agent.onStale(staleSince, ranges);
 
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
TxnId.NONE, TxnId.NONE, TxnId.NONE, TxnId.NONE, staleUntilAtLeast);
-        upsertRedundantBefore(addRedundantBefore);
+        safeStore.upsertRedundantBefore(addRedundantBefore);
         // find which ranges need to bootstrap, subtracting those already in 
progress that cover the id
 
+        // safeStore.upsertUnsafeToRead(ranges);
         markUnsafeToRead(ranges);
     }
 
@@ -583,19 +588,24 @@ public abstract class CommandStore implements 
AgentExecutor
     // with safeToRead
     Supplier<EpochReady> initialise(long epoch, Ranges ranges)
     {
-        // Merge in a base for any ranges that needs to be covered
-        DurableBefore addDurableBefore = DurableBefore.create(ranges, 
TxnId.NONE, TxnId.NONE);
-        upsertDurableBefore(addDurableBefore);
-        // TODO (review): Convoluted check to not overwrite existing 
bootstraps with TxnId.NONE
-        // If loading from disk didn't finish before this then we might 
initialize the range at TxnId.NONE?
-        // Does CommandStores.topology ensure that doesn't happen? Is it fine 
if it does because it will get superseded?
-        Ranges newBootstrapRanges = ranges;
-        for (Ranges existing : bootstrapBeganAt.values())
-            newBootstrapRanges = newBootstrapRanges.without(existing);
-        if (!newBootstrapRanges.isEmpty())
-            bootstrapBeganAt = bootstrap(TxnId.NONE, newBootstrapRanges, 
bootstrapBeganAt);
-        safeToRead = purgeAndInsert(safeToRead, TxnId.NONE, ranges);
-        return () -> new EpochReady(epoch, DONE, DONE, DONE, DONE);
+        return () -> {
+            AsyncResult<Void> done = execute(empty(), (safeStore) -> {
+                // Merge in a base for any ranges that needs to be covered
+                DurableBefore addDurableBefore = DurableBefore.create(ranges, 
TxnId.NONE, TxnId.NONE);
+                safeStore.upsertDurableBefore(addDurableBefore);
+                // TODO (review): Convoluted check to not overwrite existing 
bootstraps with TxnId.NONE
+                // If loading from disk didn't finish before this then we 
might initialize the range at TxnId.NONE?
+                // Does CommandStores.topology ensure that doesn't happen? Is 
it fine if it does because it will get superseded?
+                Ranges newBootstrapRanges = ranges;
+                for (Ranges existing : bootstrapBeganAt.values())
+                    newBootstrapRanges = newBootstrapRanges.without(existing);
+                if (!newBootstrapRanges.isEmpty())
+                    bootstrapBeganAt = bootstrap(TxnId.NONE, 
newBootstrapRanges, bootstrapBeganAt);
+                safeStore.upsertSafeToRead(purgeAndInsert(safeToRead, 
TxnId.NONE, ranges));
+            }).beginAsResult();
+
+            return new EpochReady(epoch, DONE, DONE, DONE, DONE);
+        };
     }
 
     public final Ranges safeToReadAt(Timestamp at)
@@ -619,12 +629,6 @@ public abstract class CommandStore implements AgentExecutor
         return durableBefore;
     }
 
-    @VisibleForTesting
-    public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return 
bootstrapBeganAt; }
-
-    @VisibleForTesting
-    public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; }
-
     public final boolean isRejectedIfNotPreAccepted(TxnId txnId, 
Unseekables<?> participants)
     {
         if (rejectBefore == null)
@@ -773,16 +777,22 @@ public abstract class CommandStore implements 
AgentExecutor
         return redundantBefore.status(minimumDependencyId, executeAt, 
participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE)
 >= 0;
     }
 
-    final synchronized void markUnsafeToRead(Ranges ranges)
+    final void markUnsafeToRead(Ranges ranges)
     {
         if (safeToRead.values().stream().anyMatch(r -> r.intersects(ranges)))
-            unsafeSetSafeToRead(purgeHistory(safeToRead, ranges));
+        {
+            execute(empty(), safeStore -> {
+                safeStore.upsertSafeToRead(purgeHistory(safeToRead, ranges));
+            });
+        }
     }
 
     final synchronized void markSafeToRead(Timestamp forBootstrapAt, Timestamp 
at, Ranges ranges)
     {
-        Ranges validatedSafeToRead = 
redundantBefore.validateSafeToRead(forBootstrapAt, ranges);
-        unsafeSetSafeToRead(purgeAndInsert(safeToRead, at, 
validatedSafeToRead));
+        execute(empty(), safeStore -> {
+            Ranges validatedSafeToRead = 
redundantBefore.validateSafeToRead(forBootstrapAt, ranges);
+            safeStore.upsertSafeToRead(purgeAndInsert(safeToRead, at, 
validatedSafeToRead));
+        });
     }
 
     protected static ImmutableSortedMap<TxnId, Ranges> bootstrap(TxnId at, 
Ranges ranges, NavigableMap<TxnId, Ranges> bootstrappedAt)
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index c2a02d18..82ba1c06 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -18,6 +18,7 @@
 
 package accord.local;
 
+import java.util.NavigableMap;
 import javax.annotation.Nullable;
 
 import accord.api.Agent;
@@ -248,6 +249,36 @@ public abstract class SafeCommandStore
         commandStore().updateMaxConflicts(prev, updated);
     }
 
+    public void upsertRedundantBefore(RedundantBefore addRedundantBefore)
+    {
+        commandStore().upsertRedundantBefore(addRedundantBefore);
+    }
+
+    public void upsertSetBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
+    {
+        commandStore().unsafeSetBootstrapBeganAt(newBootstrapBeganAt);
+    }
+
+    public void upsertDurableBefore(DurableBefore addDurableBefore)
+    {
+        commandStore().upsertDurableBefore(addDurableBefore);
+    }
+
+    public void upsertSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead)
+    {
+        commandStore().unsafeSetSafeToRead(newSafeToRead);
+    }
+
+    public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
+    {
+        commandStore().unsafeSetRangesForEpoch(rangesForEpoch);
+    }
+
+    public void upsertRejectBefore(TxnId txnId, Ranges ranges)
+    {
+        commandStore().upsertRejectBefore(txnId, ranges);
+    }
+
     public void updateCommandsForKey(Command prev, Command next)
     {
         if (!CommandsForKey.needsUpdate(prev, next))
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java 
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index 6a295d88..7d912435 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -52,7 +52,7 @@ public class SetGloballyDurable extends 
AbstractEpochRequest<SimpleReply>
         DurableBefore cur = safeStore.commandStore().durableBefore();
         DurableBefore upd = DurableBefore.merge(durableBefore, cur);
         // This is done asynchronously
-        safeStore.commandStore().upsertDurableBefore(upd);
+        safeStore.upsertDurableBefore(upd);
         return Ok;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to