This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 55e13755a5 Accord fixes:  - Bad ArrayBuffers recycling logic  - RX 
must ensure dependencies TRANSITIVE_VISIBLE  - Permit constructing "antiRange" 
that spans multiple prefixes  - Not computing range CommandSummary IsDep 
correctly  - Truncated commands that aren't shard durable could not repopulate 
CFK on replay, permitting recovery of another command to make an incorrect 
decision  - NPE on async persist of RX (i.e. supplying no callback)  - NPE in 
Builder.shouldCleanup when durabi [...]
55e13755a5 is described below

commit 55e13755a5906f82889a137e219c890c5785fa06
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Feb 26 11:08:22 2025 +0000

    Accord fixes:
     - Bad ArrayBuffers recycling logic
     - RX must ensure dependencies TRANSITIVE_VISIBLE
     - Permit constructing "antiRange" that spans multiple prefixes
     - Not computing range CommandSummary IsDep correctly
     - Truncated commands that aren't shard durable could not repopulate CFK on 
replay, permitting recovery of another command to make an incorrect decision
     - NPE on async persist of RX (i.e. supplying no callback)
     - NPE in Builder.shouldCleanup when durability is null
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20370
---
 modules/accord                                     |  2 +-
 .../cassandra/service/accord/AccordJournal.java    | 13 +---------
 .../service/accord/AccordObjectSizes.java          | 30 ++++++++++++++--------
 .../cassandra/service/accord/api/TokenKey.java     |  2 +-
 .../service/accord/serializers/TokenKeyTest.java   |  2 +-
 .../apache/cassandra/utils/AccordGenerators.java   | 26 ++++++++++---------
 6 files changed, 38 insertions(+), 37 deletions(-)

diff --git a/modules/accord b/modules/accord
index a341979bd8..2b9e54004f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a341979bd8fc1d26192cd6bc1edb145e7945e2e9
+Subproject commit 2b9e54004f702c7b626e94391af21fa080292975
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 93d02f2833..4ce3e2a64c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import accord.impl.CommandChange;
 import accord.impl.CommandChange.Field;
-import accord.impl.RetiredSafeCommand;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -87,8 +86,6 @@ import static accord.impl.CommandChange.toIterableSetFields;
 import static accord.impl.CommandChange.unsetIterable;
 import static accord.impl.CommandChange.validateFlags;
 import static accord.local.Cleanup.Input.FULL;
-import static accord.primitives.SaveStatus.Erased;
-import static accord.primitives.SaveStatus.Vestigial;
 import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
 
 public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier, Shutdownable
@@ -222,15 +219,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
         Builder builder = load(commandStoreId, txnId);
-        Cleanup cleanup = builder.shouldCleanup(FULL, agent, redundantBefore, 
durableBefore);
-        switch (cleanup)
-        {
-            case VESTIGIAL:
-                return RetiredSafeCommand.erased(txnId, Vestigial);
-            case EXPUNGE:
-            case ERASE:
-                return RetiredSafeCommand.erased(txnId, Erased);
-        }
+        builder.maybeCleanup(FULL, agent, redundantBefore, durableBefore);
         return builder.construct(redundantBefore);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 9036116067..d1e507fd2c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -53,7 +53,6 @@ import accord.primitives.RoutingKeys;
 import accord.primitives.SaveStatus;
 import accord.primitives.Seekable;
 import accord.primitives.Seekables;
-import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
@@ -73,7 +72,16 @@ import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.utils.ObjectSizes;
 
+import static accord.local.Command.Accepted.accepted;
+import static accord.local.Command.Committed.committed;
+import static accord.local.Command.Executed.executed;
+import static accord.local.Command.NotAcceptedWithoutDefinition.notAccepted;
+import static accord.local.Command.NotDefined.notDefined;
+import static accord.local.Command.PreAccepted.preaccepted;
+import static accord.local.Command.Truncated.*;
+import static accord.local.StoreParticipants.empty;
 import static accord.local.cfk.CommandsForKey.InternalStatus.ACCEPTED;
+import static accord.primitives.Status.Durability.NotDurable;
 import static accord.primitives.TxnId.NO_TXNIDS;
 import static org.apache.cassandra.utils.ObjectSizes.measure;
 
@@ -294,7 +302,7 @@ public class AccordObjectSizes
             Participants<?> empty = route.slice(0, 0);
             ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID)
                                        
.setParticipants(StoreParticipants.create(route, empty, executes ? empty : 
null, executes ? empty : null, empty, route))
-                                       
.durability(Status.Durability.NotDurable)
+                                       .durability(NotDurable)
                                        .executeAt(EMPTY_TXNID)
                                        .promised(Ballot.ZERO);
             if (hasDeps)
@@ -312,14 +320,15 @@ public class AccordObjectSizes
             return builder;
         }
 
-        final static long NOT_DEFINED = 
measure(Command.NotDefined.notDefined(attrs(false, false, false)));
-        final static long PREACCEPTED = 
measure(Command.PreAccepted.preaccepted(attrs(false, true, false), 
SaveStatus.PreAccepted));
-        final static long NOTACCEPTED = 
measure(Command.NotAcceptedWithoutDefinition.notAccepted(attrs(false, false, 
false), SaveStatus.AcceptedInvalidate));
-        final static long ACCEPTED = 
measure(Command.Accepted.accepted(attrs(true, false, false), 
SaveStatus.AcceptedMedium));
-        final static long COMMITTED = 
measure(Command.Committed.committed(attrs(true, true, false), 
SaveStatus.Committed));
-        final static long EXECUTED = 
measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied));
-        final static long TRUNCATED = 
measure(Command.Truncated.truncated(attrs(false, false, false), 
SaveStatus.TruncatedApply,  EMPTY_TXNID, null, null));
-        final static long INVALIDATED = 
measure(Command.Truncated.invalidated(EMPTY_TXNID, attrs(false, false, 
false).participants()));
+        final static long NOT_DEFINED = measure(notDefined(attrs(false, false, 
false)));
+        final static long PREACCEPTED = measure(preaccepted(attrs(false, true, 
false), SaveStatus.PreAccepted));
+        final static long NOTACCEPTED = measure(notAccepted(attrs(false, 
false, false), SaveStatus.AcceptedInvalidate));
+        final static long ACCEPTED = measure(accepted(attrs(true, false, 
false), SaveStatus.AcceptedMedium));
+        final static long COMMITTED = measure(committed(attrs(true, true, 
false), SaveStatus.Committed));
+        final static long EXECUTED = measure(executed(attrs(true, true, true), 
SaveStatus.Applied));
+        // TODO (expected): TruncatedAwaitsOnlyDeps
+        final static long TRUNCATED = measure(vestigial(EMPTY_TXNID, 
attrs(false, false, false).participants()));
+        final static long INVALIDATED = measure(invalidated(EMPTY_TXNID, 
attrs(false, false, false).participants()));
 
         private static long emptySize(Command command)
         {
@@ -357,6 +366,7 @@ public class AccordObjectSizes
                 case TruncatedApply:
                 case TruncatedUnapplied:
                 case TruncatedApplyWithOutcome:
+                case TruncatedApplyWithOutcomeAndDeps:
                 case Vestigial:
                 case Erased:
                     return TRUNCATED;
diff --git a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java 
b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
index ea4e829fba..d29c306869 100644
--- a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
@@ -458,7 +458,7 @@ public final class TokenKey extends AccordRoutableKey 
implements RoutingKey, Ran
         {
             while (index <= escapeLimit)
             {
-                if (bytes[index] == 0 && (index == escapeLimit || bytes[index 
+ 1] <= ESCAPE_BYTE))
+                if (bytes[index] == 0 && (index == escapeLimit || (bytes[index 
+ 1] & 0xff) <= ESCAPE_BYTE))
                     return index;
                 ++index;
             }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java 
b/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
index 78f8fb15d9..9bdb85c268 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java
@@ -112,7 +112,7 @@ public class TokenKeyTest
     @Test
     public void compare()
     {
-        
qt().withSeed(0L).forAll(fromQT(partitioners().assuming(IPartitioner::accordSupported)).flatMap(partitioner
 -> routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN), 
fromQT(token(partitioner)), partitioner)))
+        
qt().forAll(fromQT(partitioners().assuming(IPartitioner::accordSupported)).flatMap(partitioner
 -> routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN), 
fromQT(token(partitioner)), partitioner)))
             .check(key -> {
                 ByteBuffer keyBytes = serializer.serialize(key);
                 for (TokenKey test : mutateAfter(key))
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 19a922450a..f97cdcf5d9 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -251,18 +251,19 @@ public class AccordGenerators
 
         public Command build(SaveStatus saveStatus)
         {
+            ICommand command = attributes(saveStatus);
             switch (saveStatus)
             {
                 default: throw new AssertionError("Unhandled saveStatus: " + 
saveStatus);
                 case Uninitialised:
                 case NotDefined:
-                    return 
Command.NotDefined.notDefined(attributes(saveStatus), Ballot.ZERO);
+                    return Command.NotDefined.notDefined(command, Ballot.ZERO);
                 case PreAccepted:
                 case PreAcceptedWithVote:
                 case PreAcceptedWithDeps:
-                    return 
Command.PreAccepted.preaccepted(attributes(saveStatus), saveStatus);
+                    return Command.PreAccepted.preaccepted(command, 
saveStatus);
                 case AcceptedInvalidate:
-                    return 
Command.NotAcceptedWithoutDefinition.acceptedInvalidate(attributes(saveStatus));
+                    return 
Command.NotAcceptedWithoutDefinition.acceptedInvalidate(command);
 
                 case AcceptedMedium:
                 case AcceptedMediumWithDefinition:
@@ -277,33 +278,34 @@ public class AccordGenerators
                 case PreCommittedWithDefAndDeps:
                 case PreCommittedWithDefAndFixedDeps:
                 case PreCommitted:
-                    return Command.Accepted.accepted(attributes(saveStatus), 
saveStatus);
+                    return Command.Accepted.accepted(command, saveStatus);
 
                 case Committed:
-                    return Command.Committed.committed(attributes(saveStatus), 
saveStatus);
+                    return Command.Committed.committed(command, saveStatus);
 
                 case Stable:
                 case ReadyToExecute:
-                    return Command.Committed.committed(attributes(saveStatus), 
saveStatus);
+                    return Command.Committed.committed(command, saveStatus);
 
                 case PreApplied:
                 case Applying:
                 case Applied:
-                    return Command.Executed.executed(attributes(saveStatus), 
saveStatus);
+                    return Command.Executed.executed(command, saveStatus);
 
                 case TruncatedApply:
                 case TruncatedUnapplied:
-                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, null, null, 
txnId);
-                    else return Truncated.truncated(attributes(saveStatus), 
saveStatus, executeAt, null, null);
+                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(command, saveStatus, executeAt, null, null, null, txnId);
+                    else return Truncated.truncated(command, saveStatus, 
executeAt, null, null, null, null);
 
+                case TruncatedApplyWithOutcomeAndDeps:
                 case TruncatedApplyWithOutcome:
-                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new 
TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId);
-                    else return Truncated.truncated(attributes(saveStatus), 
saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt, 
keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null, new 
TxnData());
+                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(command, saveStatus, executeAt, command.partialDeps(), 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new 
TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId);
+                    else return Truncated.truncated(command, saveStatus, 
executeAt, command.partialDeps(), txnId.is(Write) ? new Writes(txnId, 
executeAt, keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null, 
new TxnData(), null);
 
                 case Erased:
                 case Vestigial:
                 case Invalidated:
-                    return Truncated.invalidated(txnId, 
attributes(saveStatus).participants());
+                    return Truncated.invalidated(txnId, 
command.participants());
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to