This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f9d248a Follow-up to CASSANDRA-20726: - Fix shouldCleanup handling
of erase/expunge - Fix CommandChange handling of minUniqueHlc being cleared -
Don't clear minUniqueHlc when fast applying; instead simply validate !isWaiting
6f9d248a is described below
commit 6f9d248a9b903d5de354a030a5d2e6e7b0e34b8b
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jun 25 21:22:33 2025 +0100
Follow-up to CASSANDRA-20726:
- Fix shouldCleanup handling of erase/expunge
- Fix CommandChange handling of minUniqueHlc being cleared
- Don't clear minUniqueHlc when fast applying; instead simply validate
!isWaiting
patch by Benedict; reviewed by Benedict for CASSANDRA-20726
---
.../src/main/java/accord/impl/CommandChange.java | 102 ++++++++++++++++-----
.../java/accord/impl/InMemoryCommandStore.java | 20 ++++
.../src/main/java/accord/local/Command.java | 3 +
.../src/main/java/accord/local/Commands.java | 2 +
.../src/main/java/accord/messages/ReadData.java | 7 +-
.../java/accord/impl/basic/InMemoryJournal.java | 7 +-
6 files changed, 114 insertions(+), 27 deletions(-)
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 291e19db..96aa34ba 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -62,8 +62,11 @@ import static accord.impl.CommandChange.Field.RESULT;
import static accord.impl.CommandChange.Field.SAVE_STATUS;
import static accord.impl.CommandChange.Field.WAITING_ON;
import static accord.impl.CommandChange.Field.WRITES;
+import static accord.local.Cleanup.ERASE;
import static accord.local.Cleanup.EXPUNGE;
+import static accord.local.Cleanup.INVALIDATE;
import static accord.local.Cleanup.NO;
+import static accord.local.Cleanup.VESTIGIAL;
import static accord.local.Command.Accepted.accepted;
import static accord.local.Command.Committed.committed;
import static accord.local.Command.Executed.executed;
@@ -147,6 +150,7 @@ public class CommandChange
mask |= setIsNullAndChanged(DURABILITY, mask);
eraseKnownFieldsMask[i] = mask;
}
+ eraseKnownFieldsMask[VESTIGIAL.ordinal()] =
eraseKnownFieldsMask[ERASE.ordinal()];
}
private static <T> boolean forceFieldChangedToNullFlag(SaveStatus
saveStatus, Predicate<T> predicate, T erased)
@@ -309,6 +313,19 @@ public class CommandChange
if (!hasUpdate)
return NO;
+ if (cleanup != null)
+ {
+ switch (cleanup)
+ {
+ case EXPUNGE:
+ return EXPUNGE;
+ case ERASE:
+ if (EXPUNGE == Cleanup.shouldCleanup(input, txnId,
null, SaveStatus.Erased, NotDurable, null, redundantBefore, durableBefore))
+ return EXPUNGE;
+ return ERASE;
+ }
+ }
+
Durability durability = this.durability;
if (durability == null) durability = NotDurable;
StoreParticipants participants = this.participants;
@@ -316,8 +333,13 @@ public class CommandChange
// would be better to break this dependency, or otherwise encode
it better.
// In particular it would be nice to avoid doing this twice for
each command on load, as we also do this in SafeCommandStore.
// Perhaps we can special-case loading, and simply update the
participants here so we can avoid doing it again on access
- if (input == Input.FULL && participants != null)
- participants = participants.filter(LOAD, redundantBefore,
txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt :
null);
+ if (input == Input.FULL)
+ {
+ if (saveStatus == null)
+ return EXPUNGE;
+ if (participants != null)
+ participants = participants.filter(LOAD, redundantBefore,
txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null);
+ }
Cleanup cleanup = Cleanup.shouldCleanup(input, txnId, executeAt,
saveStatus, durability, participants, redundantBefore, durableBefore);
if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
cleanup = this.cleanup;
@@ -461,6 +483,7 @@ public class CommandChange
hasUpdate = true;
cleanup = addCleanup;
+ flags |= setChanged(CLEANUP);
if (!cleanup.appliesTo(saveStatus))
return false;
return forceSetNulls(clearFields,
eraseKnownFieldsMask[cleanup.newStatus.ordinal()]);
@@ -534,6 +557,7 @@ public class CommandChange
if (this.waitingOn != null)
waitingOn = this.waitingOn.provide(txnId, partialDeps,
executesAtLeast, minUniqueHlc);
+ Invariants.require(saveStatus != null);
switch (saveStatus.status)
{
case NotDefined:
@@ -585,32 +609,55 @@ public class CommandChange
{
return "Builder {" +
"txnId=" + txnId
- + (isChanged(PARTICIPANTS, flags) ? ", participants="
+ participants : "")
- + (isChanged(SAVE_STATUS, flags) ? ", saveStatus=" +
saveStatus : "")
- + (isChanged(DURABILITY, flags) ? ", durability=" +
durability : "")
- + (isChanged(EXECUTE_AT, flags) ? ", executeAt=" +
executeAt : "")
- + (isChanged(PROMISED, flags) ? ", promised=" +
promised : "")
- + (isChanged(ACCEPTED, flags) ? ",
acceptedOrCommitted=" + acceptedOrCommitted : "")
- + (isChanged(PARTIAL_TXN, flags) ? ", partialTxn=" +
safeToString(partialTxn) : "")
- + (isChanged(PARTIAL_DEPS, flags) ? ", partialDeps=" +
partialDeps : "")
- + (isChanged(WAITING_ON, flags) ? ", waitingOn=" +
waitingOn : "")
- + (isChanged(MIN_UNIQUE_HLC, flags) ? ", minUniqueHlc="
+ minUniqueHlc : "")
- + (isChanged(EXECUTES_AT_LEAST, flags) ? ",
executesAtLeast=" + executesAtLeast : "")
- + (isChanged(WRITES, flags) ? ", writes=" +
writes : "")
- + (isChanged(RESULT, flags) ? ", result=" +
result : "")
- + (isChanged(CLEANUP, flags) ? ", cleanup=" +
cleanup : "") +
- '}';
- }
-
- private static String safeToString(Object obj)
+ + safeToString(PARTICIPANTS, flags, participants)
+ + safeToString(SAVE_STATUS, flags, saveStatus)
+ + safeToString(DURABILITY, flags, durability)
+ + safeToString(EXECUTE_AT, flags, executeAt)
+ + safeToString(PROMISED, flags, promised)
+ + safeToString(ACCEPTED, flags, acceptedOrCommitted)
+ + safeToString(PARTIAL_TXN, flags, partialTxn)
+ + safeToString(PARTIAL_DEPS, flags, partialDeps)
+ + safeToString(WAITING_ON, flags, waitingOn)
+ + safeToString(MIN_UNIQUE_HLC, flags, minUniqueHlc)
+ + safeToString(EXECUTES_AT_LEAST, flags, executesAtLeast)
+ + safeToString(WRITES, flags, writes)
+ + safeToString(RESULT, flags, result)
+ + safeToString(CLEANUP, flags, cleanup)
+ + '}';
+ }
+
+ private static Object safeToString(Field field, int flags, Object obj)
+ {
+ if (!isChanged(field, flags))
+ return "";
+
+ return field.name().toLowerCase() + '=' +
safeToString(isNull(field, flags), obj);
+ }
+
+ private static Object safeToString(boolean isNull, Object obj)
{
+ if (isNull)
+ {
+ if (obj == null)
+ return "null";
+
+ try
+ {
+ return "null<" + obj + '>';
+ }
+ catch (Throwable t)
+ {
+ return "null<err>";
+ }
+ }
+
try
{
return obj.toString();
}
catch (Throwable t)
{
- return "<error evaluating>";
+ return "<err>";
}
}
}
@@ -694,7 +741,7 @@ public class CommandChange
if (before.waitingOn() != after.waitingOn())
{
flags |= setChanged(WAITING_ON);
- flags |= addIdentityFlags(getMinUniqueHlc(before),
getMinUniqueHlc(after), MIN_UNIQUE_HLC);
+ flags |= addIdentityFlags(0, getMinUniqueHlc(before),
getMinUniqueHlc(after), MIN_UNIQUE_HLC);
}
flags |= addEqualityFlags(before.executesAtLeast(),
after.executesAtLeast(), EXECUTES_AT_LEAST);
flags |= addIdentityFlags(before.writes(), after.writes(), WRITES);
@@ -743,6 +790,13 @@ public class CommandChange
return setChanged(field);
}
+ private static int addIdentityFlags(long treatAsNull, long l, long r,
Field field)
+ {
+ if (l == r) return 0;
+ if (r == treatAsNull) return setIsNullAndChanged(field);
+ return setChanged(field);
+ }
+
public static boolean anyFieldChanged(int flags)
{
return (flags >>> 16) != 0;
@@ -813,9 +867,9 @@ public class CommandChange
}
@VisibleForTesting
- public static boolean isNull(Field field, int oldFlags)
+ public static boolean isNull(Field field, int flags)
{
- return (oldFlags & (1 << field.ordinal())) != 0;
+ return (flags & (1 << field.ordinal())) != 0;
}
public static int unsetFieldIsNull(Field field, int oldFlags)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 950a7e3f..bc67d688 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -69,6 +69,7 @@ import accord.primitives.AbstractRanges;
import accord.primitives.AbstractUnseekableKeys;
import accord.primitives.PartialDeps;
import accord.primitives.Participants;
+import accord.primitives.Range;
import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
import accord.primitives.Routable.Domain;
@@ -90,6 +91,7 @@ import org.agrona.collections.ObjectHashSet;
import static accord.local.Cleanup.Input.FULL;
import static accord.local.KeyHistory.ASYNC;
+import static accord.local.KeyHistory.NONE;
import static accord.local.KeyHistory.SYNC;
import static accord.local.RedundantStatus.Coverage.ALL;
import static accord.local.StoreParticipants.Filter.LOAD;
@@ -360,6 +362,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
break;
case Range:
// load range cfks here
+ break;
}
}
return createSafeStore(context, ranges, commands, commandsForKey);
@@ -631,6 +634,23 @@ public abstract class InMemoryCommandStore extends
CommandStore
return command;
}
+ @Override
+ protected InMemorySafeCommandsForKey ifLoadedInternal(RoutingKey key)
+ {
+ if (context.keyHistory() != NONE && context.keys().domain() ==
Range && context.keys().contains(key))
+ {
+ GlobalCommandsForKey globalCfk =
commandStore().commandsForKey.get(key);
+ if (globalCfk == null)
+ return null;
+
+ InMemorySafeCommandsForKey safeCfk =
globalCfk.createSafeReference();
+ commandsForKey.put(key, safeCfk);
+ return safeCfk;
+ }
+
+ return super.ifLoadedInternal(key);
+ }
+
@Override
protected InMemorySafeCommandsForKey getInternal(RoutingKey key)
{
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 24a29ea3..7169d60b 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -1344,6 +1344,9 @@ public abstract class Command implements ICommand
}
}
+ /**
+ * Note: we don't guarantee to maintain this once an actual uniqueHlc
is known.
+ */
public long minUniqueHlc()
{
return 0;
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index f29f376c..7cbb938b 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -527,6 +527,8 @@ public class Commands
PartialDeps partialDeps = prepareDeps(validated, participants,
command, deps);
participants = prepareParticipants(validated, participants, command);
+ // TODO (required): validate safe to fast apply against local state if
running burn test
+ // note: we may overwrite minUniqueHlc here
WaitingOn waitingOn = newSaveStatus != SaveStatus.PreApplied
? WaitingOn.none(txnId.domain(), partialDeps)
: command.hasBeen(Stable)
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index 657238ea..50beaf96 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -32,6 +32,7 @@ import accord.api.Timeouts;
import accord.api.Timeouts.RegisteredTimeout;
import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.local.Command;
+import accord.local.Command.Committed;
import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.Node;
@@ -480,7 +481,11 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
Ranges unavailable = unavailable(safeStore, command);
if (txnId.is(Write))
{
- long uniqueHlc = command.asCommitted().waitingOn().minUniqueHlc();
+ Committed committed = command.asCommitted();
+ long uniqueHlc = committed.waitingOn().minUniqueHlc();
+ if (committed.executeAt().hasDistinctHlcAndUniqueHlc())
+ uniqueHlc = committed.executeAt().uniqueHlc();
+
if (uniqueHlc > 0)
{
synchronized (this)
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 33cf5816..618f121d 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -148,8 +148,11 @@ public class InMemoryJournal implements Journal
return null;
Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore,
durableBefore);
- if (cleanup == EXPUNGE)
- return null;
+ switch (cleanup)
+ {
+ case ERASE: return Command.Truncated.erased(txnId);
+ case EXPUNGE: return null;
+ }
return builder.construct(redundantBefore);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]