This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 55e13755a5 Accord fixes: - Bad ArrayBuffers recycling logic - RX
must ensure dependencies TRANSITIVE_VISIBLE - Permit constructing "antiRange"
that spans multiple prefixes - Not computing range CommandSummary IsDep
correctly - Truncated commands that aren't shard durable could not repopulate
CFK on replay, permitting recovery of another command to make an incorrect
decision - NPE on async persist of RX (i.e. supplying no callback) - NPE in
Builder.shouldCleanup when durabi [...]
55e13755a5 is described below
commit 55e13755a5906f82889a137e219c890c5785fa06
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Feb 26 11:08:22 2025 +0000
Accord fixes:
- Bad ArrayBuffers recycling logic
- RX must ensure dependencies TRANSITIVE_VISIBLE
- Permit constructing "antiRange" that spans multiple prefixes
- Not computing range CommandSummary IsDep correctly
- Truncated commands that aren't shard durable could not repopulate CFK on
replay, permitting recovery of another command to make an incorrect decision
- NPE on async persist of RX (i.e. supplying no callback)
- NPE in Builder.shouldCleanup when durability is null
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20370
---
modules/accord | 2 +-
.../cassandra/service/accord/AccordJournal.java | 13 +---------
.../service/accord/AccordObjectSizes.java | 30 ++++++++++++++--------
.../cassandra/service/accord/api/TokenKey.java | 2 +-
.../service/accord/serializers/TokenKeyTest.java | 2 +-
.../apache/cassandra/utils/AccordGenerators.java | 26 ++++++++++---------
6 files changed, 38 insertions(+), 37 deletions(-)
diff --git a/modules/accord b/modules/accord
index a341979bd8..2b9e54004f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a341979bd8fc1d26192cd6bc1edb145e7945e2e9
+Subproject commit 2b9e54004f702c7b626e94391af21fa080292975
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 93d02f2833..4ce3e2a64c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import accord.impl.CommandChange;
import accord.impl.CommandChange.Field;
-import accord.impl.RetiredSafeCommand;
import accord.local.Cleanup;
import accord.local.Command;
import accord.local.CommandStore;
@@ -87,8 +86,6 @@ import static accord.impl.CommandChange.toIterableSetFields;
import static accord.impl.CommandChange.unsetIterable;
import static accord.impl.CommandChange.validateFlags;
import static accord.local.Cleanup.Input.FULL;
-import static accord.primitives.SaveStatus.Erased;
-import static accord.primitives.SaveStatus.Vestigial;
import static
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier, Shutdownable
@@ -222,15 +219,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
Builder builder = load(commandStoreId, txnId);
- Cleanup cleanup = builder.shouldCleanup(FULL, agent, redundantBefore,
durableBefore);
- switch (cleanup)
- {
- case VESTIGIAL:
- return RetiredSafeCommand.erased(txnId, Vestigial);
- case EXPUNGE:
- case ERASE:
- return RetiredSafeCommand.erased(txnId, Erased);
- }
+ builder.maybeCleanup(FULL, agent, redundantBefore, durableBefore);
return builder.construct(redundantBefore);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 9036116067..d1e507fd2c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -53,7 +53,6 @@ import accord.primitives.RoutingKeys;
import accord.primitives.SaveStatus;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
-import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.Txn.Kind;
import accord.primitives.TxnId;
@@ -73,7 +72,16 @@ import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.utils.ObjectSizes;
+import static accord.local.Command.Accepted.accepted;
+import static accord.local.Command.Committed.committed;
+import static accord.local.Command.Executed.executed;
+import static accord.local.Command.NotAcceptedWithoutDefinition.notAccepted;
+import static accord.local.Command.NotDefined.notDefined;
+import static accord.local.Command.PreAccepted.preaccepted;
+import static accord.local.Command.Truncated.*;
+import static accord.local.StoreParticipants.empty;
import static accord.local.cfk.CommandsForKey.InternalStatus.ACCEPTED;
+import static accord.primitives.Status.Durability.NotDurable;
import static accord.primitives.TxnId.NO_TXNIDS;
import static org.apache.cassandra.utils.ObjectSizes.measure;
@@ -294,7 +302,7 @@ public class AccordObjectSizes
Participants<?> empty = route.slice(0, 0);
ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID)
.setParticipants(StoreParticipants.create(route, empty, executes ? empty :
null, executes ? empty : null, empty, route))
-
.durability(Status.Durability.NotDurable)
+ .durability(NotDurable)
.executeAt(EMPTY_TXNID)
.promised(Ballot.ZERO);
if (hasDeps)
@@ -312,14 +320,15 @@ public class AccordObjectSizes
return builder;
}
- final static long NOT_DEFINED =
measure(Command.NotDefined.notDefined(attrs(false, false, false)));
- final static long PREACCEPTED =
measure(Command.PreAccepted.preaccepted(attrs(false, true, false),
SaveStatus.PreAccepted));
- final static long NOTACCEPTED =
measure(Command.NotAcceptedWithoutDefinition.notAccepted(attrs(false, false,
false), SaveStatus.AcceptedInvalidate));
- final static long ACCEPTED =
measure(Command.Accepted.accepted(attrs(true, false, false),
SaveStatus.AcceptedMedium));
- final static long COMMITTED =
measure(Command.Committed.committed(attrs(true, true, false),
SaveStatus.Committed));
- final static long EXECUTED =
measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied));
- final static long TRUNCATED =
measure(Command.Truncated.truncated(attrs(false, false, false),
SaveStatus.TruncatedApply, EMPTY_TXNID, null, null));
- final static long INVALIDATED =
measure(Command.Truncated.invalidated(EMPTY_TXNID, attrs(false, false,
false).participants()));
+ final static long NOT_DEFINED = measure(notDefined(attrs(false, false,
false)));
+ final static long PREACCEPTED = measure(preaccepted(attrs(false, true,
false), SaveStatus.PreAccepted));
+ final static long NOTACCEPTED = measure(notAccepted(attrs(false,
false, false), SaveStatus.AcceptedInvalidate));
+ final static long ACCEPTED = measure(accepted(attrs(true, false,
false), SaveStatus.AcceptedMedium));
+ final static long COMMITTED = measure(committed(attrs(true, true,
false), SaveStatus.Committed));
+ final static long EXECUTED = measure(executed(attrs(true, true, true),
SaveStatus.Applied));
+ // TODO (expected): TruncatedAwaitsOnlyDeps
+ final static long TRUNCATED = measure(vestigial(EMPTY_TXNID,
attrs(false, false, false).participants()));
+ final static long INVALIDATED = measure(invalidated(EMPTY_TXNID,
attrs(false, false, false).participants()));
private static long emptySize(Command command)
{
@@ -357,6 +366,7 @@ public class AccordObjectSizes
case TruncatedApply:
case TruncatedUnapplied:
case TruncatedApplyWithOutcome:
+ case TruncatedApplyWithOutcomeAndDeps:
case Vestigial:
case Erased:
return TRUNCATED;
diff --git a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
index ea4e829fba..d29c306869 100644
--- a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
@@ -458,7 +458,7 @@ public final class TokenKey extends AccordRoutableKey
implements RoutingKey, Ran
{
while (index <= escapeLimit)
{
- if (bytes[index] == 0 && (index == escapeLimit || bytes[index
+ 1] <= ESCAPE_BYTE))
+ if (bytes[index] == 0 && (index == escapeLimit || (bytes[index
+ 1] & 0xff) <= ESCAPE_BYTE))
return index;
++index;
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
index 78f8fb15d9..9bdb85c268 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
@@ -112,7 +112,7 @@ public class TokenKeyTest
@Test
public void compare()
{
-
qt().withSeed(0L).forAll(fromQT(partitioners().assuming(IPartitioner::accordSupported)).flatMap(partitioner
-> routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN),
fromQT(token(partitioner)), partitioner)))
+
qt().forAll(fromQT(partitioners().assuming(IPartitioner::accordSupported)).flatMap(partitioner
-> routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN),
fromQT(token(partitioner)), partitioner)))
.check(key -> {
ByteBuffer keyBytes = serializer.serialize(key);
for (TokenKey test : mutateAfter(key))
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 19a922450a..f97cdcf5d9 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -251,18 +251,19 @@ public class AccordGenerators
public Command build(SaveStatus saveStatus)
{
+ ICommand command = attributes(saveStatus);
switch (saveStatus)
{
default: throw new AssertionError("Unhandled saveStatus: " +
saveStatus);
case Uninitialised:
case NotDefined:
- return
Command.NotDefined.notDefined(attributes(saveStatus), Ballot.ZERO);
+ return Command.NotDefined.notDefined(command, Ballot.ZERO);
case PreAccepted:
case PreAcceptedWithVote:
case PreAcceptedWithDeps:
- return
Command.PreAccepted.preaccepted(attributes(saveStatus), saveStatus);
+ return Command.PreAccepted.preaccepted(command,
saveStatus);
case AcceptedInvalidate:
- return
Command.NotAcceptedWithoutDefinition.acceptedInvalidate(attributes(saveStatus));
+ return
Command.NotAcceptedWithoutDefinition.acceptedInvalidate(command);
case AcceptedMedium:
case AcceptedMediumWithDefinition:
@@ -277,33 +278,34 @@ public class AccordGenerators
case PreCommittedWithDefAndDeps:
case PreCommittedWithDefAndFixedDeps:
case PreCommitted:
- return Command.Accepted.accepted(attributes(saveStatus),
saveStatus);
+ return Command.Accepted.accepted(command, saveStatus);
case Committed:
- return Command.Committed.committed(attributes(saveStatus),
saveStatus);
+ return Command.Committed.committed(command, saveStatus);
case Stable:
case ReadyToExecute:
- return Command.Committed.committed(attributes(saveStatus),
saveStatus);
+ return Command.Committed.committed(command, saveStatus);
case PreApplied:
case Applying:
case Applied:
- return Command.Executed.executed(attributes(saveStatus),
saveStatus);
+ return Command.Executed.executed(command, saveStatus);
case TruncatedApply:
case TruncatedUnapplied:
- if (txnId.kind().awaitsOnlyDeps()) return
Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, null, null,
txnId);
- else return Truncated.truncated(attributes(saveStatus),
saveStatus, executeAt, null, null);
+ if (txnId.kind().awaitsOnlyDeps()) return
Truncated.truncated(command, saveStatus, executeAt, null, null, null, txnId);
+ else return Truncated.truncated(command, saveStatus,
executeAt, null, null, null, null);
+ case TruncatedApplyWithOutcomeAndDeps:
case TruncatedApplyWithOutcome:
- if (txnId.kind().awaitsOnlyDeps()) return
Truncated.truncated(attributes(saveStatus), saveStatus, executeAt,
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new
TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId);
- else return Truncated.truncated(attributes(saveStatus),
saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt,
keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null, new
TxnData());
+ if (txnId.kind().awaitsOnlyDeps()) return
Truncated.truncated(command, saveStatus, executeAt, command.partialDeps(),
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new
TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId);
+ else return Truncated.truncated(command, saveStatus,
executeAt, command.partialDeps(), txnId.is(Write) ? new Writes(txnId,
executeAt, keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null,
new TxnData(), null);
case Erased:
case Vestigial:
case Invalidated:
- return Truncated.invalidated(txnId,
attributes(saveStatus).participants());
+ return Truncated.invalidated(txnId,
command.participants());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]