This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit aa08b04e8235d9c0b7e2bbe3bbe8773184bbbe46 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Wed Jun 25 21:22:33 2025 +0100 Follow-up to CASSANDRA-20726: - Fix shouldCleanup handling of erase/expunge - Fix CommandChange handling of minUniqueHlc being cleared - Don't clear minUniqueHlc when fast applying; instead simply validate !isWaiting patch by Benedict; reviewed by Benedict for CASSANDRA-20726 --- .../src/main/java/accord/impl/CommandChange.java | 101 ++++++++++++++++----- .../java/accord/impl/InMemoryCommandStore.java | 19 ++++ .../src/main/java/accord/local/Command.java | 3 + .../src/main/java/accord/local/Commands.java | 2 + .../src/main/java/accord/messages/ReadData.java | 7 +- .../java/accord/impl/basic/InMemoryJournal.java | 7 +- 6 files changed, 112 insertions(+), 27 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java index 291e19db..1d6d339f 100644 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -62,8 +62,10 @@ import static accord.impl.CommandChange.Field.RESULT; import static accord.impl.CommandChange.Field.SAVE_STATUS; import static accord.impl.CommandChange.Field.WAITING_ON; import static accord.impl.CommandChange.Field.WRITES; +import static accord.local.Cleanup.ERASE; import static accord.local.Cleanup.EXPUNGE; import static accord.local.Cleanup.NO; +import static accord.local.Cleanup.VESTIGIAL; import static accord.local.Command.Accepted.accepted; import static accord.local.Command.Committed.committed; import static accord.local.Command.Executed.executed; @@ -147,6 +149,7 @@ public class CommandChange mask |= setIsNullAndChanged(DURABILITY, mask); eraseKnownFieldsMask[i] = mask; } + eraseKnownFieldsMask[VESTIGIAL.ordinal()] = eraseKnownFieldsMask[ERASE.ordinal()]; } private static <T> boolean forceFieldChangedToNullFlag(SaveStatus saveStatus, Predicate<T> predicate, T erased) @@ -309,6 +312,19 @@ public class CommandChange if (!hasUpdate) return NO; + if (cleanup != null) + { + switch (cleanup) + { + case EXPUNGE: + return EXPUNGE; + case ERASE: + if (EXPUNGE == Cleanup.shouldCleanup(input, txnId, null, SaveStatus.Erased, NotDurable, null, redundantBefore, durableBefore)) + return EXPUNGE; + return ERASE; + } + } + Durability durability = this.durability; if (durability == null) durability = NotDurable; StoreParticipants participants = this.participants; @@ -316,8 +332,13 @@ public class CommandChange // would be better to break this dependency, or otherwise encode it better. // In particular it would be nice to avoid doing this twice for each command on load, as we also do this in SafeCommandStore. // Perhaps we can special-case loading, and simply update the participants here so we can avoid doing it again on access - if (input == Input.FULL && participants != null) - participants = participants.filter(LOAD, redundantBefore, txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt : null); + if (input == Input.FULL) + { + if (saveStatus == null) + return EXPUNGE; + if (participants != null) + participants = participants.filter(LOAD, redundantBefore, txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null); + } Cleanup cleanup = Cleanup.shouldCleanup(input, txnId, executeAt, saveStatus, durability, participants, redundantBefore, durableBefore); if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0) cleanup = this.cleanup; @@ -461,6 +482,7 @@ public class CommandChange hasUpdate = true; cleanup = addCleanup; + flags |= setChanged(CLEANUP); if (!cleanup.appliesTo(saveStatus)) return false; return forceSetNulls(clearFields, eraseKnownFieldsMask[cleanup.newStatus.ordinal()]); @@ -534,6 +556,7 @@ public class CommandChange if (this.waitingOn != null) waitingOn = this.waitingOn.provide(txnId, partialDeps, executesAtLeast, minUniqueHlc); + Invariants.require(saveStatus != null); switch (saveStatus.status) { case NotDefined: @@ -585,32 +608,55 @@ public class CommandChange { return "Builder {" + "txnId=" + txnId - + (isChanged(PARTICIPANTS, flags) ? ", participants=" + participants : "") - + (isChanged(SAVE_STATUS, flags) ? ", saveStatus=" + saveStatus : "") - + (isChanged(DURABILITY, flags) ? ", durability=" + durability : "") - + (isChanged(EXECUTE_AT, flags) ? ", executeAt=" + executeAt : "") - + (isChanged(PROMISED, flags) ? ", promised=" + promised : "") - + (isChanged(ACCEPTED, flags) ? ", acceptedOrCommitted=" + acceptedOrCommitted : "") - + (isChanged(PARTIAL_TXN, flags) ? ", partialTxn=" + safeToString(partialTxn) : "") - + (isChanged(PARTIAL_DEPS, flags) ? ", partialDeps=" + partialDeps : "") - + (isChanged(WAITING_ON, flags) ? ", waitingOn=" + waitingOn : "") - + (isChanged(MIN_UNIQUE_HLC, flags) ? ", minUniqueHlc=" + minUniqueHlc : "") - + (isChanged(EXECUTES_AT_LEAST, flags) ? ", executesAtLeast=" + executesAtLeast : "") - + (isChanged(WRITES, flags) ? ", writes=" + writes : "") - + (isChanged(RESULT, flags) ? ", result=" + result : "") - + (isChanged(CLEANUP, flags) ? ", cleanup=" + cleanup : "") + - '}'; - } - - private static String safeToString(Object obj) + + safeToString(PARTICIPANTS, flags, participants) + + safeToString(SAVE_STATUS, flags, saveStatus) + + safeToString(DURABILITY, flags, durability) + + safeToString(EXECUTE_AT, flags, executeAt) + + safeToString(PROMISED, flags, promised) + + safeToString(ACCEPTED, flags, acceptedOrCommitted) + + safeToString(PARTIAL_TXN, flags, partialTxn) + + safeToString(PARTIAL_DEPS, flags, partialDeps) + + safeToString(WAITING_ON, flags, waitingOn) + + safeToString(MIN_UNIQUE_HLC, flags, minUniqueHlc) + + safeToString(EXECUTES_AT_LEAST, flags, executesAtLeast) + + safeToString(WRITES, flags, writes) + + safeToString(RESULT, flags, result) + + safeToString(CLEANUP, flags, cleanup) + + '}'; + } + + private static Object safeToString(Field field, int flags, Object obj) + { + if (!isChanged(field, flags)) + return ""; + + return field.name().toLowerCase() + '=' + safeToString(isNull(field, flags), obj); + } + + private static Object safeToString(boolean isNull, Object obj) { + if (isNull) + { + if (obj == null) + return "null"; + + try + { + return "null<" + obj + '>'; + } + catch (Throwable t) + { + return "null<err>"; + } + } + try { return obj.toString(); } catch (Throwable t) { - return "<error evaluating>"; + return "<err>"; } } } @@ -694,7 +740,7 @@ public class CommandChange if (before.waitingOn() != after.waitingOn()) { flags |= setChanged(WAITING_ON); - flags |= addIdentityFlags(getMinUniqueHlc(before), getMinUniqueHlc(after), MIN_UNIQUE_HLC); + flags |= addIdentityFlags(0, getMinUniqueHlc(before), getMinUniqueHlc(after), MIN_UNIQUE_HLC); } flags |= addEqualityFlags(before.executesAtLeast(), after.executesAtLeast(), EXECUTES_AT_LEAST); flags |= addIdentityFlags(before.writes(), after.writes(), WRITES); @@ -743,6 +789,13 @@ public class CommandChange return setChanged(field); } + private static int addIdentityFlags(long treatAsNull, long l, long r, Field field) + { + if (l == r) return 0; + if (r == treatAsNull) return setIsNullAndChanged(field); + return setChanged(field); + } + public static boolean anyFieldChanged(int flags) { return (flags >>> 16) != 0; @@ -813,9 +866,9 @@ public class CommandChange } @VisibleForTesting - public static boolean isNull(Field field, int oldFlags) + public static boolean isNull(Field field, int flags) { - return (oldFlags & (1 << field.ordinal())) != 0; + return (flags & (1 << field.ordinal())) != 0; } public static int unsetFieldIsNull(Field field, int oldFlags) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 950a7e3f..31c7dfbf 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -90,6 +90,7 @@ import org.agrona.collections.ObjectHashSet; import static accord.local.Cleanup.Input.FULL; import static accord.local.KeyHistory.ASYNC; +import static accord.local.KeyHistory.NONE; import static accord.local.KeyHistory.SYNC; import static accord.local.RedundantStatus.Coverage.ALL; import static accord.local.StoreParticipants.Filter.LOAD; @@ -360,6 +361,7 @@ public abstract class InMemoryCommandStore extends CommandStore break; case Range: // load range cfks here + break; } } return createSafeStore(context, ranges, commands, commandsForKey); @@ -631,6 +633,23 @@ public abstract class InMemoryCommandStore extends CommandStore return command; } + @Override + protected InMemorySafeCommandsForKey ifLoadedInternal(RoutingKey key) + { + if (context.keyHistory() != NONE && context.keys().domain() == Range && context.keys().contains(key)) + { + GlobalCommandsForKey globalCfk = commandStore().commandsForKey.get(key); + if (globalCfk == null) + return null; + + InMemorySafeCommandsForKey safeCfk = globalCfk.createSafeReference(); + commandsForKey.put(key, safeCfk); + return safeCfk; + } + + return super.ifLoadedInternal(key); + } + @Override protected InMemorySafeCommandsForKey getInternal(RoutingKey key) { diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index 24a29ea3..7169d60b 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -1344,6 +1344,9 @@ public abstract class Command implements ICommand } } + /** + * Note: we don't guarantee to maintain this once an actual uniqueHlc is known. + */ public long minUniqueHlc() { return 0; diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index f29f376c..7cbb938b 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -527,6 +527,8 @@ public class Commands PartialDeps partialDeps = prepareDeps(validated, participants, command, deps); participants = prepareParticipants(validated, participants, command); + // TODO (required): validate safe to fast apply against local state if running burn test + // note: we may overwrite minUniqueHlc here WaitingOn waitingOn = newSaveStatus != SaveStatus.PreApplied ? WaitingOn.none(txnId.domain(), partialDeps) : command.hasBeen(Stable) diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 657238ea..50beaf96 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -32,6 +32,7 @@ import accord.api.Timeouts; import accord.api.Timeouts.RegisteredTimeout; import accord.coordinate.ExecuteFlag.ExecuteFlags; import accord.local.Command; +import accord.local.Command.Committed; import accord.local.CommandStore; import accord.local.CommandStores; import accord.local.Node; @@ -480,7 +481,11 @@ public abstract class ReadData implements PreLoadContext, Request, MapReduceCons Ranges unavailable = unavailable(safeStore, command); if (txnId.is(Write)) { - long uniqueHlc = command.asCommitted().waitingOn().minUniqueHlc(); + Committed committed = command.asCommitted(); + long uniqueHlc = committed.waitingOn().minUniqueHlc(); + if (committed.executeAt().hasDistinctHlcAndUniqueHlc()) + uniqueHlc = committed.executeAt().uniqueHlc(); + if (uniqueHlc > 0) { synchronized (this) diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 33cf5816..618f121d 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -148,8 +148,11 @@ public class InMemoryJournal implements Journal return null; Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, durableBefore); - if (cleanup == EXPUNGE) - return null; + switch (cleanup) + { + case ERASE: return Command.Truncated.erased(txnId); + case EXPUNGE: return null; + } return builder.construct(redundantBefore); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
