This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit aa08b04e8235d9c0b7e2bbe3bbe8773184bbbe46
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jun 25 21:22:33 2025 +0100

    Follow-up to CASSANDRA-20726:
     - Fix shouldCleanup handling of erase/expunge
     - Fix CommandChange handling of minUniqueHlc being cleared
     - Don't clear minUniqueHlc when fast applying; instead simply validate 
!isWaiting
    
    patch by Benedict; reviewed by Benedict for CASSANDRA-20726
---
 .../src/main/java/accord/impl/CommandChange.java   | 101 ++++++++++++++++-----
 .../java/accord/impl/InMemoryCommandStore.java     |  19 ++++
 .../src/main/java/accord/local/Command.java        |   3 +
 .../src/main/java/accord/local/Commands.java       |   2 +
 .../src/main/java/accord/messages/ReadData.java    |   7 +-
 .../java/accord/impl/basic/InMemoryJournal.java    |   7 +-
 6 files changed, 112 insertions(+), 27 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 291e19db..1d6d339f 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -62,8 +62,10 @@ import static accord.impl.CommandChange.Field.RESULT;
 import static accord.impl.CommandChange.Field.SAVE_STATUS;
 import static accord.impl.CommandChange.Field.WAITING_ON;
 import static accord.impl.CommandChange.Field.WRITES;
+import static accord.local.Cleanup.ERASE;
 import static accord.local.Cleanup.EXPUNGE;
 import static accord.local.Cleanup.NO;
+import static accord.local.Cleanup.VESTIGIAL;
 import static accord.local.Command.Accepted.accepted;
 import static accord.local.Command.Committed.committed;
 import static accord.local.Command.Executed.executed;
@@ -147,6 +149,7 @@ public class CommandChange
                 mask |= setIsNullAndChanged(DURABILITY, mask);
             eraseKnownFieldsMask[i] = mask;
         }
+        eraseKnownFieldsMask[VESTIGIAL.ordinal()] = 
eraseKnownFieldsMask[ERASE.ordinal()];
     }
 
     private static <T> boolean forceFieldChangedToNullFlag(SaveStatus 
saveStatus, Predicate<T> predicate, T erased)
@@ -309,6 +312,19 @@ public class CommandChange
             if (!hasUpdate)
                 return NO;
 
+            if (cleanup != null)
+            {
+                switch (cleanup)
+                {
+                    case EXPUNGE:
+                        return EXPUNGE;
+                    case ERASE:
+                        if (EXPUNGE == Cleanup.shouldCleanup(input, txnId, 
null, SaveStatus.Erased, NotDurable, null, redundantBefore, durableBefore))
+                            return EXPUNGE;
+                        return ERASE;
+                }
+            }
+
             Durability durability = this.durability;
             if (durability == null) durability = NotDurable;
             StoreParticipants participants = this.participants;
@@ -316,8 +332,13 @@ public class CommandChange
             //  would be better to break this dependency, or otherwise encode 
it better.
             //  In particular it would be nice to avoid doing this twice for 
each command on load, as we also do this in SafeCommandStore.
             //  Perhaps we can special-case loading, and simply update the 
participants here so we can avoid doing it again on access
-            if (input == Input.FULL && participants != null)
-                participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt : 
null);
+            if (input == Input.FULL)
+            {
+                if (saveStatus == null)
+                    return EXPUNGE;
+                if (participants != null)
+                    participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null);
+            }
             Cleanup cleanup = Cleanup.shouldCleanup(input, txnId, executeAt, 
saveStatus, durability, participants, redundantBefore, durableBefore);
             if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
                 cleanup = this.cleanup;
@@ -461,6 +482,7 @@ public class CommandChange
 
             hasUpdate = true;
             cleanup = addCleanup;
+            flags |= setChanged(CLEANUP);
             if (!cleanup.appliesTo(saveStatus))
                 return false;
             return forceSetNulls(clearFields, 
eraseKnownFieldsMask[cleanup.newStatus.ordinal()]);
@@ -534,6 +556,7 @@ public class CommandChange
             if (this.waitingOn != null)
                 waitingOn = this.waitingOn.provide(txnId, partialDeps, 
executesAtLeast, minUniqueHlc);
 
+            Invariants.require(saveStatus != null);
             switch (saveStatus.status)
             {
                 case NotDefined:
@@ -585,32 +608,55 @@ public class CommandChange
         {
             return "Builder {" +
                    "txnId=" + txnId
-                   + (isChanged(PARTICIPANTS, flags)      ? ", participants=" 
+ participants : "")
-                   + (isChanged(SAVE_STATUS, flags)       ? ", saveStatus=" + 
saveStatus : "")
-                   + (isChanged(DURABILITY, flags)        ? ", durability=" + 
durability : "")
-                   + (isChanged(EXECUTE_AT, flags)        ? ", executeAt=" + 
executeAt : "")
-                   + (isChanged(PROMISED, flags)          ? ", promised=" + 
promised : "")
-                   + (isChanged(ACCEPTED, flags)          ? ", 
acceptedOrCommitted=" + acceptedOrCommitted : "")
-                   + (isChanged(PARTIAL_TXN, flags)       ? ", partialTxn=" + 
safeToString(partialTxn) : "")
-                   + (isChanged(PARTIAL_DEPS, flags)      ? ", partialDeps=" + 
partialDeps : "")
-                   + (isChanged(WAITING_ON, flags)        ? ", waitingOn=" + 
waitingOn : "")
-                   + (isChanged(MIN_UNIQUE_HLC, flags)    ? ", minUniqueHlc=" 
+ minUniqueHlc : "")
-                   + (isChanged(EXECUTES_AT_LEAST, flags) ? ", 
executesAtLeast=" + executesAtLeast : "")
-                   + (isChanged(WRITES, flags)            ? ", writes=" + 
writes : "")
-                   + (isChanged(RESULT, flags)            ? ", result=" + 
result : "")
-                   + (isChanged(CLEANUP, flags)           ? ", cleanup=" + 
cleanup : "") +
-                   '}';
-        }
-
-        private static String safeToString(Object obj)
+                   + safeToString(PARTICIPANTS, flags, participants)
+                   + safeToString(SAVE_STATUS, flags, saveStatus)
+                   + safeToString(DURABILITY, flags, durability)
+                   + safeToString(EXECUTE_AT, flags, executeAt)
+                   + safeToString(PROMISED, flags, promised)
+                   + safeToString(ACCEPTED, flags, acceptedOrCommitted)
+                   + safeToString(PARTIAL_TXN, flags, partialTxn)
+                   + safeToString(PARTIAL_DEPS, flags, partialDeps)
+                   + safeToString(WAITING_ON, flags, waitingOn)
+                   + safeToString(MIN_UNIQUE_HLC, flags, minUniqueHlc)
+                   + safeToString(EXECUTES_AT_LEAST, flags, executesAtLeast)
+                   + safeToString(WRITES, flags, writes)
+                   + safeToString(RESULT, flags, result)
+                   + safeToString(CLEANUP, flags, cleanup)
+                   + '}';
+        }
+
+        private static Object safeToString(Field field, int flags, Object obj)
+        {
+            if (!isChanged(field, flags))
+                return "";
+
+            return field.name().toLowerCase() + '=' + 
safeToString(isNull(field, flags), obj);
+        }
+
+        private static Object safeToString(boolean isNull, Object obj)
         {
+            if (isNull)
+            {
+                if (obj == null)
+                    return "null";
+
+                try
+                {
+                    return "null<" + obj + '>';
+                }
+                catch (Throwable t)
+                {
+                    return "null<err>";
+                }
+            }
+
             try
             {
                 return obj.toString();
             }
             catch (Throwable t)
             {
-                return "<error evaluating>";
+                return "<err>";
             }
         }
     }
@@ -694,7 +740,7 @@ public class CommandChange
             if (before.waitingOn() != after.waitingOn())
             {
                 flags |= setChanged(WAITING_ON);
-                flags |= addIdentityFlags(getMinUniqueHlc(before), 
getMinUniqueHlc(after), MIN_UNIQUE_HLC);
+                flags |= addIdentityFlags(0, getMinUniqueHlc(before), 
getMinUniqueHlc(after), MIN_UNIQUE_HLC);
             }
             flags |= addEqualityFlags(before.executesAtLeast(), 
after.executesAtLeast(), EXECUTES_AT_LEAST);
             flags |= addIdentityFlags(before.writes(), after.writes(), WRITES);
@@ -743,6 +789,13 @@ public class CommandChange
         return setChanged(field);
     }
 
+    private static int addIdentityFlags(long treatAsNull, long l, long r, 
Field field)
+    {
+        if (l == r) return 0;
+        if (r == treatAsNull) return setIsNullAndChanged(field);
+        return setChanged(field);
+    }
+
     public static boolean anyFieldChanged(int flags)
     {
         return (flags >>> 16) != 0;
@@ -813,9 +866,9 @@ public class CommandChange
     }
 
     @VisibleForTesting
-    public static boolean isNull(Field field, int oldFlags)
+    public static boolean isNull(Field field, int flags)
     {
-        return (oldFlags & (1 << field.ordinal())) != 0;
+        return (flags & (1 << field.ordinal())) != 0;
     }
 
     public static int unsetFieldIsNull(Field field, int oldFlags)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 950a7e3f..31c7dfbf 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -90,6 +90,7 @@ import org.agrona.collections.ObjectHashSet;
 
 import static accord.local.Cleanup.Input.FULL;
 import static accord.local.KeyHistory.ASYNC;
+import static accord.local.KeyHistory.NONE;
 import static accord.local.KeyHistory.SYNC;
 import static accord.local.RedundantStatus.Coverage.ALL;
 import static accord.local.StoreParticipants.Filter.LOAD;
@@ -360,6 +361,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                     break;
                 case Range:
                     // load range cfks here
+                    break;
             }
         }
         return createSafeStore(context, ranges, commands, commandsForKey);
@@ -631,6 +633,23 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             return command;
         }
 
+        @Override
+        protected InMemorySafeCommandsForKey ifLoadedInternal(RoutingKey key)
+        {
+            if (context.keyHistory() != NONE && context.keys().domain() == 
Range && context.keys().contains(key))
+            {
+                GlobalCommandsForKey globalCfk = 
commandStore().commandsForKey.get(key);
+                if (globalCfk == null)
+                    return null;
+
+                InMemorySafeCommandsForKey safeCfk = 
globalCfk.createSafeReference();
+                commandsForKey.put(key, safeCfk);
+                return safeCfk;
+            }
+
+            return super.ifLoadedInternal(key);
+        }
+
         @Override
         protected InMemorySafeCommandsForKey getInternal(RoutingKey key)
         {
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index 24a29ea3..7169d60b 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -1344,6 +1344,9 @@ public abstract class Command implements ICommand
             }
         }
 
+        /**
+         * Note: we don't guarantee to maintain this once an actual uniqueHlc 
is known.
+         */
         public long minUniqueHlc()
         {
             return 0;
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index f29f376c..7cbb938b 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -527,6 +527,8 @@ public class Commands
         PartialDeps partialDeps = prepareDeps(validated, participants, 
command, deps);
         participants = prepareParticipants(validated, participants, command);
 
+        // TODO (required): validate safe to fast apply against local state if 
running burn test
+        // note: we may overwrite minUniqueHlc here
         WaitingOn waitingOn = newSaveStatus != SaveStatus.PreApplied
                               ? WaitingOn.none(txnId.domain(), partialDeps)
                               : command.hasBeen(Stable)
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 657238ea..50beaf96 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -32,6 +32,7 @@ import accord.api.Timeouts;
 import accord.api.Timeouts.RegisteredTimeout;
 import accord.coordinate.ExecuteFlag.ExecuteFlags;
 import accord.local.Command;
+import accord.local.Command.Committed;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.Node;
@@ -480,7 +481,11 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
         Ranges unavailable = unavailable(safeStore, command);
         if (txnId.is(Write))
         {
-            long uniqueHlc = command.asCommitted().waitingOn().minUniqueHlc();
+            Committed committed = command.asCommitted();
+            long uniqueHlc = committed.waitingOn().minUniqueHlc();
+            if (committed.executeAt().hasDistinctHlcAndUniqueHlc())
+                uniqueHlc = committed.executeAt().uniqueHlc();
+
             if (uniqueHlc > 0)
             {
                 synchronized (this)
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 33cf5816..618f121d 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -148,8 +148,11 @@ public class InMemoryJournal implements Journal
             return null;
 
         Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, 
durableBefore);
-        if (cleanup == EXPUNGE)
-            return null;
+        switch (cleanup)
+        {
+            case ERASE: return Command.Truncated.erased(txnId);
+            case EXPUNGE: return null;
+        }
         return builder.construct(redundantBefore);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to