This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f3782e2a More follow-up to CASSANDRA-19967 and CASSANDRA-19869
f3782e2a is described below

commit f3782e2a98004843cc3384a6983478c1128a1d6a
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Oct 2 12:42:11 2024 +0100

    More follow-up to CASSANDRA-19967 and CASSANDRA-19869
---
 .../java/accord/impl/InMemoryCommandStore.java     |   4 +-
 .../main/java/accord/impl/TimestampsForKeys.java   |   9 +-
 .../src/main/java/accord/local/Cleanup.java        |  11 +-
 .../src/main/java/accord/local/Command.java        |   2 +-
 .../src/main/java/accord/local/CommandStore.java   | 175 +--------------------
 .../src/main/java/accord/local/CommandStores.java  |   5 -
 .../src/main/java/accord/local/Commands.java       |  20 +--
 .../main/java/accord/local/RedundantBefore.java    | 146 +++++++++++++++++
 .../main/java/accord/local/SafeCommandStore.java   |  34 +++-
 .../main/java/accord/local/StoreParticipants.java  |   2 +-
 .../java/accord/local/cfk/SafeCommandsForKey.java  |   2 +-
 .../src/main/java/accord/messages/PreAccept.java   |   2 +-
 .../src/main/java/accord/messages/Propagate.java   |   8 +-
 .../java/accord/messages/QueryDurableBefore.java   |   2 +-
 .../src/main/java/accord/messages/ReadData.java    |   2 +-
 .../java/accord/messages/SetGloballyDurable.java   |   2 +-
 .../src/test/java/accord/impl/basic/Journal.java   |   3 +-
 .../src/test/java/accord/impl/list/ListRead.java   |   2 +-
 18 files changed, 222 insertions(+), 209 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 915ecec7..deee1947 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -397,7 +397,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 boolean done = command.hasBeen(Truncated);
                 if (!done)
                 {
-                    if (redundantBefore().status(txnId, command.route()) == 
RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
+                    if (unsafeGetRedundantBefore().status(txnId, 
command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
                         return;
 
                     Route<?> route = command.route().slice(allRanges);
@@ -759,7 +759,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 return;
 
             Ranges slice = ranges(txnId, updated.executeAtOrTxnId());
-            slice = commandStore.redundantBefore().removeShardRedundant(txnId, 
updated.executeAtOrTxnId(), slice);
+            slice = 
commandStore.unsafeGetRedundantBefore().removeShardRedundant(txnId, 
updated.executeAtOrTxnId(), slice);
             commandStore.rangeCommands.computeIfAbsent(txnId, ignore -> new 
RangeCommand(commandStore.commands.get(txnId)))
                          
.update(((AbstractRanges)updated.participants().touches()).toRanges().slice(slice,
 Minimal));
         }
diff --git a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java 
b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
index bd0e0ef7..baea052c 100644
--- a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
+++ b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
@@ -21,6 +21,7 @@ package accord.impl;
 import accord.api.RoutingKey;
 import accord.api.VisibleForImplementation;
 import accord.local.CommandStore;
+import accord.local.SafeCommandStore;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -38,7 +39,7 @@ public class TimestampsForKeys
 
     private TimestampsForKeys() {}
 
-    public static TimestampsForKey updateLastExecutionTimestamps(CommandStore 
commandStore, SafeTimestampsForKey tfk, TxnId txnId, Timestamp executeAt, 
boolean isForWriteTxn)
+    public static TimestampsForKey 
updateLastExecutionTimestamps(SafeCommandStore safeStore, SafeTimestampsForKey 
tfk, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
     {
         TimestampsForKey current = tfk.current();
 
@@ -46,7 +47,7 @@ public class TimestampsForKeys
 
         if (executeAt.compareTo(lastWrite) < 0)
         {
-            if 
(commandStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, 
current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
+            if 
(safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, 
current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
                 return current;
             throw illegalState("%s is less than the most recent write 
timestamp %s", executeAt, lastWrite);
         }
@@ -59,7 +60,7 @@ public class TimestampsForKeys
 
         if (cmp < 0)
         {
-            if 
(!commandStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
+            if 
(!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
                 return current;
             throw illegalState("%s is less than the most recent executed 
timestamp %s", executeAt, lastExecuted);
         }
@@ -83,6 +84,6 @@ public class TimestampsForKeys
     @VisibleForImplementation
     public static <D> TimestampsForKey 
updateLastExecutionTimestamps(AbstractSafeCommandStore<?,?,?> safeStore, 
RoutingKey key, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
     {
-        return updateLastExecutionTimestamps(safeStore.commandStore(), 
safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn);
+        return updateLastExecutionTimestamps(safeStore, 
safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn);
     }
 }
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index 78dc341c..b07c8a09 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -76,18 +76,19 @@ public enum Cleanup
 
     public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command 
command)
     {
-        return shouldCleanup(safeStore.commandStore(), command, 
command.participants());
+        return shouldCleanup(safeStore, command, command.participants());
     }
 
     public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command 
command, @Nonnull StoreParticipants participants)
     {
-        return shouldCleanup(safeStore.commandStore(), command, participants);
+        return shouldCleanup(command.txnId(), command.saveStatus(), 
command.durability(), participants,
+                             safeStore.redundantBefore(), 
safeStore.durableBefore());
     }
 
-    public static Cleanup shouldCleanup(CommandStore commandStore, Command 
command, @Nonnull StoreParticipants participants)
+    public static Cleanup shouldCleanup(Command command, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
-        return shouldCleanup(command.txnId(), command.saveStatus(), 
command.durability(), participants,
-                               commandStore.redundantBefore(), 
commandStore.durableBefore());
+        return shouldCleanup(command.txnId(), command.saveStatus(), 
command.durability(), command.participants(),
+                             redundantBefore, durableBefore);
     }
 
     public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, 
Durability durability, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index c3e2c851..8fad6318 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -1368,7 +1368,7 @@ public abstract class Command implements CommonAttributes
                         long maxEpoch = prevEpoch;
                         long epoch = rangesForEpoch.epochs[i];
                         Ranges ranges = rangesForEpoch.ranges[i];
-                        ranges = 
safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges);
+                        ranges = 
safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
                         if (!ranges.isEmpty())
                         {
                             Unseekables<?> executionParticipants = 
participants.route.slice(ranges, Minimal);
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 44d3ca56..d2125ddb 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -22,18 +22,13 @@ import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.DataStore;
 import accord.coordinate.CollectCalculatedDeps;
-import accord.local.Command.WaitingOn;
 
 import javax.annotation.Nullable;
 import accord.api.Agent;
 
 import accord.local.CommandStores.RangesForEpoch;
-import accord.primitives.KeyDeps;
 import accord.primitives.Participants;
-import accord.primitives.Range;
 import accord.primitives.Routables;
-import accord.primitives.RoutingKeys;
-import accord.primitives.Status;
 import accord.primitives.Unseekables;
 import accord.utils.async.AsyncChain;
 
@@ -44,7 +39,6 @@ import accord.utils.async.AsyncResult;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
@@ -62,12 +56,10 @@ import org.slf4j.LoggerFactory;
 
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
-import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.async.AsyncResults;
-import org.agrona.collections.Int2ObjectHashMap;
 
 import static accord.api.ConfigurationService.EpochReady.DONE;
 import static accord.local.KeyHistory.COMMANDS;
@@ -637,180 +629,29 @@ public abstract class CommandStore implements 
AgentExecutor
         };
     }
 
-    public final Ranges safeToReadAt(Timestamp at)
+    public final boolean isRejectedIfNotPreAccepted(TxnId txnId, 
Unseekables<?> participants)
     {
-        return safeToRead.lowerEntry(at).getValue();
-    }
+        if (rejectBefore == null)
+            return false;
 
-    // TODO (desired): Commands.durability() can use this to upgrade to 
Majority without further info
-    public final Status.Durability globalDurability(TxnId txnId)
-    {
-        return durableBefore.min(txnId);
+        return rejectBefore.rejects(txnId, participants);
     }
 
-    public final RedundantBefore redundantBefore()
+    public final RedundantBefore unsafeGetRedundantBefore()
     {
         return redundantBefore;
     }
 
-    public DurableBefore durableBefore()
+    public DurableBefore unsafeGetDurableBefore()
     {
         return durableBefore;
     }
 
     @VisibleForTesting
-    public final NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return 
bootstrapBeganAt; }
+    public final NavigableMap<TxnId, Ranges> unsafeGetBootstrapBeganAt() { 
return bootstrapBeganAt; }
 
     @VisibleForTesting
-    public NavigableMap<Timestamp, Ranges> safeToRead() { return safeToRead; }
-
-    public final boolean isRejectedIfNotPreAccepted(TxnId txnId, 
Unseekables<?> participants)
-    {
-        if (rejectBefore == null)
-            return false;
-
-        return rejectBefore.rejects(txnId, participants);
-    }
-
-    public final void removeRedundantDependencies(Unseekables<?> participants, 
WaitingOn.Update builder)
-    {
-        // Note: we do not need to track the bootstraps we implicitly depend 
upon, because we will not serve any read requests until this has completed
-        //  and since we are a timestamp store, and we write only this will 
sort itself out naturally
-        // TODO (required): make sure we have no races on HLC around SyncPoint 
else this resolution may not work (we need to know the micros equivalent 
timestamp of the snapshot)
-        class KeyState
-        {
-            Int2ObjectHashMap<RoutingKeys> partiallyBootstrapping;
-
-            /**
-             * Are the participating ranges for the txn fully covered by 
bootstrapping ranges for this command store
-             */
-            boolean isFullyBootstrapping(WaitingOn.Update builder, Range 
range, int txnIdx)
-            {
-                if (builder.directKeyDeps.foldEachKey(txnIdx, range, true, 
(r0, k, p) -> p && r0.contains(k)))
-                    return true;
-
-                if (partiallyBootstrapping == null)
-                    partiallyBootstrapping = new Int2ObjectHashMap<>();
-                RoutingKeys prev = partiallyBootstrapping.get(txnIdx);
-                RoutingKeys remaining = prev;
-                if (remaining == null) remaining = 
builder.directKeyDeps.participatingKeys(txnIdx);
-                else Invariants.checkState(!remaining.isEmpty());
-                remaining = remaining.without(range);
-                if (prev == null) Invariants.checkState(!remaining.isEmpty());
-                partiallyBootstrapping.put(txnIdx, remaining);
-                return remaining.isEmpty();
-            }
-        }
-
-        KeyDeps directKeyDeps = builder.directKeyDeps;
-        if (!directKeyDeps.isEmpty())
-        {
-            redundantBefore().foldl(directKeyDeps.keys(), (e, s, d, b) -> {
-                // TODO (desired, efficiency): foldlInt so we can track the 
lower rangeidx bound and not revisit unnecessarily
-                // find the txnIdx below which we are known to be fully 
redundant locally due to having been applied or invalidated
-                int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
-                if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
-                int appliedIdx = 
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
-                if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
-
-                // remove intersecting transactions with known redundant txnId
-                // note that we must exclude all transactions that are 
pre-bootstrap, and perform the more complicated dance below,
-                // as these transactions may be only partially applied, and we 
may need to wait for them on another key.
-                if (appliedIdx > bootstrapIdx)
-                {
-                    d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0, 
s0, txnIdx) -> {
-                        b0.removeWaitingOnDirectKeyTxnId(txnIdx);
-                    });
-                }
-
-                if (bootstrapIdx > 0)
-                {
-                    d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0, 
s0, r, txnIdx) -> {
-                        if (b0.isWaitingOnDirectKeyTxnIdx(txnIdx) && 
s0.isFullyBootstrapping(b0, r, txnIdx))
-                            b0.removeWaitingOnDirectKeyTxnId(txnIdx);
-                    });
-                }
-                return s;
-            }, new KeyState(), directKeyDeps, builder, ignore -> false);
-        }
-
-        /**
-         * If we have to handle bootstrapping ranges for range transactions, 
these may only partially cover the
-         * transaction, in which case we should not remove the transaction as 
a dependency. But if it is fully
-         * covered by bootstrapping ranges then we *must* remove it as a 
dependency.
-         */
-        class RangeState
-        {
-            Range range;
-            int bootstrapIdx, appliedIdx;
-            Map<Integer, Ranges> partiallyBootstrapping;
-
-            /**
-             * Are the participating ranges for the txn fully covered by 
bootstrapping ranges for this command store
-             */
-            boolean isFullyBootstrapping(int rangeTxnIdx)
-            {
-                // if all deps for the txnIdx are contained in the range, 
don't inflate any shared object state
-                if (builder.directRangeDeps.foldEachRange(rangeTxnIdx, range, 
true, (r1, r2, p) -> p && r1.contains(r2)))
-                    return true;
-
-                if (partiallyBootstrapping == null)
-                    partiallyBootstrapping = new HashMap<>();
-                Ranges prev = partiallyBootstrapping.get(rangeTxnIdx);
-                Ranges remaining = prev;
-                if (remaining == null) remaining = 
builder.directRangeDeps.ranges(rangeTxnIdx);
-                else Invariants.checkState(!remaining.isEmpty());
-                remaining = remaining.without(Ranges.of(range));
-                if (prev == null) Invariants.checkState(!remaining.isEmpty());
-                partiallyBootstrapping.put(rangeTxnIdx, remaining);
-                return remaining.isEmpty();
-            }
-        }
-
-        RangeDeps rangeDeps = builder.directRangeDeps;
-        // TODO (required, consider): slice to only those ranges we own, maybe 
don't even construct rangeDeps.covering()
-        redundantBefore().foldl(participants, (e, s, d, b) -> {
-            int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
-            if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
-            s.bootstrapIdx = bootstrapIdx;
-
-            int appliedIdx = 
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
-            if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
-            s.appliedIdx = appliedIdx;
-
-            // remove intersecting transactions with known redundant txnId
-            if (appliedIdx > bootstrapIdx)
-            {
-                // TODO (desired):
-                // TODO (desired): move the bounds check into forEach, 
matching structure used for keys
-                d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
-                    if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx)
-                        b0.removeWaitingOnDirectRangeTxnId(txnIdx);
-                });
-            }
-
-            if (bootstrapIdx > 0)
-            {
-                // if we have any ranges where bootstrap is involved, we have 
to do a more complicated dance since
-                // this may imply only partial redundancy (we may still depend 
on the transaction for some other range)
-                s.range = e.range;
-                // TODO (desired): move the bounds check into forEach, 
matching structure used for keys
-                d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
-                    if (txnIdx < s0.bootstrapIdx && 
b0.isWaitingOnDirectRangeTxnIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx))
-                        b0.removeWaitingOnDirectRangeTxnId(txnIdx);
-                });
-            }
-            return s;
-        }, new RangeState(), rangeDeps, builder, ignore -> false);
-    }
-
-    public final boolean hasLocallyRedundantDependencies(TxnId 
minimumDependencyId, Timestamp executeAt, Participants<?> 
participantsOfWaitingTxn)
-    {
-        // TODO (required): consider race conditions when bootstrapping into 
an active command store, that may have seen a higher txnId than this?
-        //   might benefit from maintaining a per-CommandStore largest TxnId 
register to ensure we allocate a higher TxnId for our ExclSync,
-        //   or from using whatever summary records we have for the range, 
once we maintain them
-        return redundantBefore.status(minimumDependencyId, 
participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE)
 >= 0;
-    }
+    public NavigableMap<Timestamp, Ranges> unsafeGetSafeToRead() { return 
safeToRead; }
 
     final void markUnsafeToRead(Ranges ranges)
     {
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 53f619d5..2e366661 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -208,11 +208,6 @@ public abstract class CommandStores
             return allAt(txnId);
         }
 
-        public @Nonnull Ranges unsafeToReadAt(Timestamp at)
-        {
-            return allAt(at).without(store.safeToReadAt(at));
-        }
-
         public @Nonnull Ranges allAt(Timestamp at)
         {
             return allAt(at.epoch());
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index a85ad650..76c74a86 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -634,10 +634,10 @@ public class Commands
 
     protected static WaitingOn.Update updateWaitingOn(SafeCommandStore 
safeStore, CommonAttributes waiting, Timestamp executeAt, WaitingOn.Update 
update, Participants<?> participants)
     {
-        CommandStore commandStore = safeStore.commandStore();
+        RedundantBefore redundantBefore = safeStore.redundantBefore();
         TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
-        if (minWaitingOnTxnId != null && 
commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), 
executeAt, participants))
-            safeStore.commandStore().removeRedundantDependencies(participants, 
update);
+        if (minWaitingOnTxnId != null && 
redundantBefore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), 
executeAt, participants))
+            redundantBefore.removeRedundantDependencies(participants, update);
 
         update.forEachWaitingOnId(safeStore, update, waiting, executeAt, 
(store, upd, w, exec, i) -> {
             SafeCommand dep = store.ifLoadedAndInitialised(upd.txnId(i));
@@ -859,7 +859,7 @@ public class Commands
     {
         TxnId txnId = command.txnId();
         participants = command.participants().supplement(participants);
-        RedundantStatus status = 
safeStore.commandStore().redundantBefore().status(txnId, participants.owns());
+        RedundantStatus status = safeStore.redundantBefore().status(txnId, 
participants.owns());
         switch (status)
         {
             default: throw new AssertionError("Unhandled RedundantStatus: " + 
status);
@@ -937,7 +937,7 @@ public class Commands
                     depSafe = safeStore.ifInitialised(loadDepId);
                     if (depSafe == null)
                     {
-                        RedundantStatus redundantStatus = 
safeStore.commandStore().redundantBefore().status(waitingId, 
waiting.partialDeps().participants(loadDepId));
+                        RedundantStatus redundantStatus = 
safeStore.redundantBefore().status(waitingId, 
waiting.partialDeps().participants(loadDepId));
                         switch (redundantStatus)
                         {
                             default: throw new AssertionError("Unexpected 
redundant status: " + redundantStatus);
@@ -1013,7 +1013,7 @@ public class Commands
                         // TODO (desired): slightly costly to invert a large 
partialDeps collection
                         Participants<?> participants = 
waiting.partialDeps().participants(dep.txnId());
                         participants = 
waiting.participants().dependencyExecutesAtLeast(safeStore, participants, 
waitingId, waiting.executeAt());
-                        RedundantStatus redundantStatus = 
safeStore.commandStore().redundantBefore().status(dep.txnId(), participants);
+                        RedundantStatus redundantStatus = 
safeStore.redundantBefore().status(dep.txnId(), participants);
                         switch (redundantStatus)
                         {
                             default: throw new AssertionError("Unknown 
redundant status: " + redundantStatus);
@@ -1101,13 +1101,13 @@ public class Commands
 
     static Command removeRedundantDependencies(SafeCommandStore safeStore, 
SafeCommand safeCommand, @Nullable TxnId redundant)
     {
-        CommandStore commandStore = safeStore.commandStore();
         Command.Committed current = safeCommand.current().asCommitted();
 
+        RedundantBefore redundantBefore = safeStore.redundantBefore();
         WaitingOn.Update update = new WaitingOn.Update(current.waitingOn);
         TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
-        if (minWaitingOnTxnId != null && 
commandStore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), 
current.executeAt(), current.participants().owns))
-            
safeStore.commandStore().removeRedundantDependencies(current.participants().owns,
 update);
+        if (minWaitingOnTxnId != null && 
redundantBefore.hasLocallyRedundantDependencies(update.minWaitingOnTxnId(), 
current.executeAt(), current.participants().owns))
+            
redundantBefore.removeRedundantDependencies(current.participants().owns, 
update);
 
         // if we are a range transaction, being redundant for this transaction 
does not imply we are redundant for all transactions
         if (redundant != null)
@@ -1248,7 +1248,7 @@ public class Commands
         {
             // TODO (required, later): in the event we are depending on a 
stale key for an insert into a non-stale key, we cannot proceed and must mark 
the new key stale
             //  I think today this is unsupported in practice, but must be 
addressed before we improve efficiency of result handling
-            Ranges staleRanges = 
permitStaleMissing.commandStore().redundantBefore().staleRanges();
+            Ranges staleRanges = 
permitStaleMissing.redundantBefore().staleRanges();
             required = required.without(staleRanges);
             return adding == null ? required.isEmpty() : covers.test(adding, 
required);
         }
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index a0d7ede8..41b84ae8 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -18,6 +18,8 @@
 
 package accord.local;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -27,16 +29,20 @@ import accord.api.VisibleForImplementation;
 import accord.primitives.AbstractRanges;
 import accord.primitives.Deps;
 import accord.primitives.EpochSupplier;
+import accord.primitives.KeyDeps;
 import accord.primitives.Participants;
 import accord.primitives.Range;
+import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.Routables;
+import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 import accord.utils.ReducingIntervalMap;
 import accord.utils.ReducingRangeMap;
+import org.agrona.collections.Int2ObjectHashMap;
 
 import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY;
 import static accord.local.RedundantBefore.PreBootstrapOrStale.POST_BOOTSTRAP;
@@ -675,4 +681,144 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             }
         }
     }
+
+    public final void removeRedundantDependencies(Unseekables<?> participants, 
Command.WaitingOn.Update builder)
+    {
+        // Note: we do not need to track the bootstraps we implicitly depend 
upon, because we will not serve any read requests until this has completed
+        //  and since we are a timestamp store, and we write only this will 
sort itself out naturally
+        // TODO (required): make sure we have no races on HLC around SyncPoint 
else this resolution may not work (we need to know the micros equivalent 
timestamp of the snapshot)
+        class KeyState
+        {
+            Int2ObjectHashMap<RoutingKeys> partiallyBootstrapping;
+
+            /**
+             * Are the participating ranges for the txn fully covered by 
bootstrapping ranges for this command store
+             */
+            boolean isFullyBootstrapping(Command.WaitingOn.Update builder, 
Range range, int txnIdx)
+            {
+                if (builder.directKeyDeps.foldEachKey(txnIdx, range, true, 
(r0, k, p) -> p && r0.contains(k)))
+                    return true;
+
+                if (partiallyBootstrapping == null)
+                    partiallyBootstrapping = new Int2ObjectHashMap<>();
+                RoutingKeys prev = partiallyBootstrapping.get(txnIdx);
+                RoutingKeys remaining = prev;
+                if (remaining == null) remaining = 
builder.directKeyDeps.participatingKeys(txnIdx);
+                else Invariants.checkState(!remaining.isEmpty());
+                remaining = remaining.without(range);
+                if (prev == null) Invariants.checkState(!remaining.isEmpty());
+                partiallyBootstrapping.put(txnIdx, remaining);
+                return remaining.isEmpty();
+            }
+        }
+
+        KeyDeps directKeyDeps = builder.directKeyDeps;
+        if (!directKeyDeps.isEmpty())
+        {
+            foldl(directKeyDeps.keys(), (e, s, d, b) -> {
+                // TODO (desired, efficiency): foldlInt so we can track the 
lower rangeidx bound and not revisit unnecessarily
+                // find the txnIdx below which we are known to be fully 
redundant locally due to having been applied or invalidated
+                int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
+                if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
+                int appliedIdx = 
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
+                if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
+
+                // remove intersecting transactions with known redundant txnId
+                // note that we must exclude all transactions that are 
pre-bootstrap, and perform the more complicated dance below,
+                // as these transactions may be only partially applied, and we 
may need to wait for them on another key.
+                if (appliedIdx > bootstrapIdx)
+                {
+                    d.forEach(e.range, bootstrapIdx, appliedIdx, b, s, (b0, 
s0, txnIdx) -> {
+                        b0.removeWaitingOnDirectKeyTxnId(txnIdx);
+                    });
+                }
+
+                if (bootstrapIdx > 0)
+                {
+                    d.forEach(e.range, 0, bootstrapIdx, b, s, e.range, (b0, 
s0, r, txnIdx) -> {
+                        if (b0.isWaitingOnDirectKeyTxnIdx(txnIdx) && 
s0.isFullyBootstrapping(b0, r, txnIdx))
+                            b0.removeWaitingOnDirectKeyTxnId(txnIdx);
+                    });
+                }
+                return s;
+            }, new KeyState(), directKeyDeps, builder, ignore -> false);
+        }
+
+        /**
+         * If we have to handle bootstrapping ranges for range transactions, 
these may only partially cover the
+         * transaction, in which case we should not remove the transaction as 
a dependency. But if it is fully
+         * covered by bootstrapping ranges then we *must* remove it as a 
dependency.
+         */
+        class RangeState
+        {
+            Range range;
+            int bootstrapIdx, appliedIdx;
+            Map<Integer, Ranges> partiallyBootstrapping;
+
+            /**
+             * Are the participating ranges for the txn fully covered by 
bootstrapping ranges for this command store
+             */
+            boolean isFullyBootstrapping(int rangeTxnIdx)
+            {
+                // if all deps for the txnIdx are contained in the range, 
don't inflate any shared object state
+                if (builder.directRangeDeps.foldEachRange(rangeTxnIdx, range, 
true, (r1, r2, p) -> p && r1.contains(r2)))
+                    return true;
+
+                if (partiallyBootstrapping == null)
+                    partiallyBootstrapping = new HashMap<>();
+                Ranges prev = partiallyBootstrapping.get(rangeTxnIdx);
+                Ranges remaining = prev;
+                if (remaining == null) remaining = 
builder.directRangeDeps.ranges(rangeTxnIdx);
+                else Invariants.checkState(!remaining.isEmpty());
+                remaining = remaining.without(Ranges.of(range));
+                if (prev == null) Invariants.checkState(!remaining.isEmpty());
+                partiallyBootstrapping.put(rangeTxnIdx, remaining);
+                return remaining.isEmpty();
+            }
+        }
+
+        RangeDeps rangeDeps = builder.directRangeDeps;
+        // TODO (required, consider): slice to only those ranges we own, maybe 
don't even construct rangeDeps.covering()
+        foldl(participants, (e, s, d, b) -> {
+            int bootstrapIdx = d.txnIds().find(e.bootstrappedAt);
+            if (bootstrapIdx < 0) bootstrapIdx = -1 - bootstrapIdx;
+            s.bootstrapIdx = bootstrapIdx;
+
+            int appliedIdx = 
d.txnIds().find(e.locallyAppliedOrInvalidatedBefore);
+            if (appliedIdx < 0) appliedIdx = -1 - appliedIdx;
+            s.appliedIdx = appliedIdx;
+
+            // remove intersecting transactions with known redundant txnId
+            if (appliedIdx > bootstrapIdx)
+            {
+                // TODO (desired):
+                // TODO (desired): move the bounds check into forEach, 
matching structure used for keys
+                d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
+                    if (txnIdx >= s0.bootstrapIdx && txnIdx < s0.appliedIdx)
+                        b0.removeWaitingOnDirectRangeTxnId(txnIdx);
+                });
+            }
+
+            if (bootstrapIdx > 0)
+            {
+                // if we have any ranges where bootstrap is involved, we have 
to do a more complicated dance since
+                // this may imply only partial redundancy (we may still depend 
on the transaction for some other range)
+                s.range = e.range;
+                // TODO (desired): move the bounds check into forEach, 
matching structure used for keys
+                d.forEach(e.range, b, s, (b0, s0, txnIdx) -> {
+                    if (txnIdx < s0.bootstrapIdx && 
b0.isWaitingOnDirectRangeTxnIdx(txnIdx) && s0.isFullyBootstrapping(txnIdx))
+                        b0.removeWaitingOnDirectRangeTxnId(txnIdx);
+                });
+            }
+            return s;
+        }, new RangeState(), rangeDeps, builder, ignore -> false);
+    }
+
+    public final boolean hasLocallyRedundantDependencies(TxnId 
minimumDependencyId, Timestamp executeAt, Participants<?> 
participantsOfWaitingTxn)
+    {
+        // TODO (required): consider race conditions when bootstrapping into 
an active command store, that may have seen a higher txnId than this?
+        //   might benefit from maintaining a per-CommandStore largest TxnId 
register to ensure we allocate a higher TxnId for our ExclSync,
+        //   or from using whatever summary records we have for the range, 
once we maintain them
+        return status(minimumDependencyId, 
participantsOfWaitingTxn).compareTo(RedundantStatus.PARTIALLY_PRE_BOOTSTRAP_OR_STALE)
 >= 0;
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index dd25ad3e..bb2bf7e8 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -153,7 +153,7 @@ public abstract class SafeCommandStore
 
     protected SafeCommandsForKey maybeCleanup(SafeCommandsForKey safeCfk)
     {
-        RedundantBefore.Entry entry = 
commandStore().redundantBefore().get(safeCfk.key().toUnseekable());
+        RedundantBefore.Entry entry = 
redundantBefore().get(safeCfk.key().toUnseekable());
         if (entry != null)
             safeCfk.updateRedundantBefore(this, entry);
         return safeCfk;
@@ -356,6 +356,26 @@ public abstract class SafeCommandStore
     public abstract NodeTimeService time();
     public abstract CommandStores.RangesForEpoch ranges();
 
+    protected NavigableMap<TxnId, Ranges> bootstrapBeganAt()
+    {
+        return commandStore().unsafeGetBootstrapBeganAt();
+    }
+
+    protected NavigableMap<Timestamp, Ranges> safeToReadAt()
+    {
+        return commandStore().unsafeGetSafeToRead();
+    }
+
+    public RedundantBefore redundantBefore()
+    {
+        return commandStore().unsafeGetRedundantBefore();
+    }
+
+    public DurableBefore durableBefore()
+    {
+        return commandStore().unsafeGetDurableBefore();
+    }
+
     public Ranges futureRanges(TxnId txnId)
     {
         return ranges().allBefore(txnId.epoch());
@@ -376,11 +396,21 @@ public abstract class SafeCommandStore
         return ranges().allBetween(txnId.epoch(), untilLocalEpoch);
     }
 
+    public final Ranges safeToReadAt(Timestamp at)
+    {
+        return safeToReadAt().lowerEntry(at).getValue();
+    }
+
+    public @Nonnull Ranges unsafeToReadAt(Timestamp at)
+    {
+        return ranges().allAt(at).without(safeToReadAt(at));
+    }
+
     // if we have to re-bootstrap (due to failed bootstrap or catching up on a 
range) then we may
     // have dangling redundant commands; these can safely be executed locally 
because we are a timestamp store
     final boolean isFullyPreBootstrapOrStale(Command command, Participants<?> 
forKeys)
     {
-        return 
commandStore().redundantBefore().preBootstrapOrStale(command.txnId(), forKeys) 
== FULLY;
+        return redundantBefore().preBootstrapOrStale(command.txnId(), forKeys) 
== FULLY;
     }
 
     public void registerListener(SafeCommand listeningTo, SaveStatus await, 
TxnId waiting)
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java 
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index 03753722..0ba6f6a3 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -173,7 +173,7 @@ public class StoreParticipants
                         ? safeStore.ranges().all()
                         : safeStore.ranges().allAt(executeAt.epoch());
 
-        return 
safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges);
+        return safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
     }
 
     public Participants<?> executes(SafeCommandStore safeStore, TxnId txnId, 
Timestamp executeAt)
diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
index 8207dd6e..18e24219 100644
--- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
@@ -113,6 +113,6 @@ public abstract class SafeCommandsForKey implements 
SafeState<CommandsForKey>
 
     public void refresh(SafeCommandStore safeStore)
     {
-        updateRedundantBefore(safeStore, 
safeStore.commandStore().redundantBefore().get(key));
+        updateRedundantBefore(safeStore, safeStore.redundantBefore().get(key));
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java 
b/accord-core/src/main/java/accord/messages/PreAccept.java
index 1b901e8f..bcb01261 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -263,7 +263,7 @@ public class PreAccept extends 
WithUnsynced<PreAccept.PreAcceptReply>
                                       }, executeAt.equals(txnId) ? null : 
txnId, builder);
 
             // TODO (required): make sure any sync point is in the past
-            Deps redundant = 
safeStore.commandStore().redundantBefore().collectDeps(participants.touches(), 
redundantBuilder, minEpoch, executeAt).build();
+            Deps redundant = 
safeStore.redundantBefore().collectDeps(participants.touches(), 
redundantBuilder, minEpoch, executeAt).build();
             Deps result = builder.build().with(redundant);
             Invariants.checkState(!result.contains(txnId));
             return result;
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index f819dadc..a50f7deb 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -354,7 +354,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         if (participants.owns().isEmpty())
             return known.knownForAny();
 
-        RedundantStatus status = 
safeStore.commandStore().redundantBefore().status(txnId, participants.owns());
+        RedundantStatus status = safeStore.redundantBefore().status(txnId, 
participants.owns());
 
         // if our peers have truncated this command, then either:
         // 1) we have already applied it locally; 2) the command doesn't apply 
locally; 3) we are stale; or 4) the command is invalidated
@@ -367,7 +367,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
             }
 
             Ranges ranges = safeStore.ranges().allSince(txnId.epoch());
-            ranges = 
safeStore.commandStore().redundantBefore().everExpectToExecute(txnId, ranges);
+            ranges = safeStore.redundantBefore().everExpectToExecute(txnId, 
ranges);
             if (!ranges.isEmpty())
             {
                 // even though command stores only lose ranges, we still adopt 
ranges as of some epoch, and re-bootstrap.
@@ -384,14 +384,14 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
             return null;
 
         Participants<?> executes = participants.executes(safeStore, txnId, 
executeAtIfKnown);
-        status = safeStore.commandStore().redundantBefore().status(txnId, 
executes);
+        status = safeStore.redundantBefore().status(txnId, executes);
         if (tryPurge(safeStore, safeCommand, status))
             return null;
 
         // compute the ranges we expect to execute - i.e. those we own, and 
are not stale or pre-bootstrap
         // TODO (required): use StoreParticipants.executes
         Ranges ranges = safeStore.ranges().allAt(executeAtIfKnown.epoch());
-        ranges = 
safeStore.commandStore().redundantBefore().expectToExecute(txnId, 
executeAtIfKnown, ranges);
+        ranges = safeStore.redundantBefore().expectToExecute(txnId, 
executeAtIfKnown, ranges);
         if (ranges.isEmpty() || (executes = executes.slice(ranges, 
Minimal)).isEmpty())
         {
             // TODO (expected): we might prefer to adopt Redundant status, and 
permit ourselves to later accept the result of the execution and/or definition
diff --git a/accord-core/src/main/java/accord/messages/QueryDurableBefore.java 
b/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
index 057e9a38..5c9d694c 100644
--- a/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
+++ b/accord-core/src/main/java/accord/messages/QueryDurableBefore.java
@@ -42,7 +42,7 @@ public class QueryDurableBefore extends 
AbstractEpochRequest<QueryDurableBefore.
     @Override
     public DurableBeforeReply apply(SafeCommandStore safeStore)
     {
-        return new 
DurableBeforeReply(safeStore.commandStore().durableBefore());
+        return new DurableBeforeReply(safeStore.durableBefore());
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 7dfa12d0..5d75f0af 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -285,7 +285,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
     {
         Timestamp executeAt = command.executesAtLeast();
         // TODO (required): for awaitsOnlyDeps commands, if we cannot infer an 
actual executeAtLeast we should confirm no situation where txnId is not an 
adequately conservative value for unavailable/unsafeToRead
-        return safeStore.ranges().unsafeToReadAt(executeAt);
+        return safeStore.unsafeToReadAt(executeAt);
     }
 
     void read(SafeCommandStore safeStore, Command command)
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java 
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index 3c396ec7..56093563 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -48,7 +48,7 @@ public class SetGloballyDurable extends 
AbstractEpochRequest<SimpleReply>
     @Override
     public SimpleReply apply(SafeCommandStore safeStore)
     {
-        DurableBefore cur = safeStore.commandStore().durableBefore();
+        DurableBefore cur = safeStore.durableBefore();
         DurableBefore upd = DurableBefore.merge(durableBefore, cur);
         // This is done asynchronously
         safeStore.upsertDurableBefore(upd);
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 1a53472b..6d0b04da 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -89,8 +89,7 @@ public class Journal
                 Command command = reconstruct(diffs, Reconstruct.Last).get(0);
                 if (command.status() == Truncated || command.status() == 
Invalidated)
                     continue; // Already truncated
-                StoreParticipants participants = 
Invariants.nonNull(command.participants());
-                Cleanup cleanup = Cleanup.shouldCleanup(store, command, 
participants);
+                Cleanup cleanup = Cleanup.shouldCleanup(command, 
store.unsafeGetRedundantBefore(), store.unsafeGetDurableBefore());
                 switch (cleanup)
                 {
                     case NO:
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java 
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index eb765767..915442ad 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -70,7 +70,7 @@ public class ListRead implements Read
         ListStore s = (ListStore)store;
         logger.trace("submitting READ on {} at {} key:{}", s.node, executeAt, 
key);
         return executor.apply(safeStore.commandStore()).submit(() -> {
-            Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
+            Ranges unavailable = safeStore.unsafeToReadAt(executeAt);
             ListData result = new ListData();
             switch (key.domain())
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to