This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 115c5539840354d6af81a246d391071d2851f636
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jun 25 20:10:00 2025 +0100

    Follow-up to CASSANDRA-20726:
     - Fix shouldCleanup handling of erase/expunge
     - Fix CommandChange handling of minUniqueHlc being cleared
     - Don't clear minUniqueHlc when fast applying; instead simply validate 
!isWaiting
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20726
---
 modules/accord                                     |  2 +-
 .../db/compaction/CompactionIterator.java          |  6 ++--
 .../cassandra/service/accord/AccordExecutor.java   |  3 +-
 .../cassandra/service/accord/AccordJournal.java    | 36 +++++++++++++---------
 .../distributed/test/accord/AccordLoadTest.java    | 14 +++++----
 .../service/accord/AccordJournalBurnTest.java      |  3 +-
 .../db/virtual/AccordDebugKeyspaceTest.java        |  2 ++
 7 files changed, 38 insertions(+), 28 deletions(-)

diff --git a/modules/accord b/modules/accord
index 756f2a1f88..aa08b04e82 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 756f2a1f88f079e74c34fd8e0f3bb5aa98760bef
+Subproject commit aa08b04e8235d9c0b7e2bbe3bbe8773184bbbe46
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 2bba287663..399db90a7d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -119,7 +119,6 @@ import static 
org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
 import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
 import static org.apache.cassandra.service.accord.AccordKeyspace.CFKAccessor;
 import static 
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey;
-import static 
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.key;
 
 /**
  * Merge multiple iterators over the content of sstable into a "compacted" 
iterator.
@@ -1103,7 +1102,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                     case EXPUNGE:
                         return null;
                     case ERASE:
-                        return erase(partitionKey);
+                        return erase(journalKey, partitionKey);
 
                     case TRUNCATE:
                     case TRUNCATE_WITH_OUTCOME:
@@ -1137,9 +1136,10 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             return newVersion.build().unfilteredIterator();
         }
 
-        private UnfilteredRowIterator erase(DecoratedKey partitionKey) throws 
IOException
+        private UnfilteredRowIterator erase(JournalKey journalKey, 
DecoratedKey partitionKey) throws IOException
         {
             AccordCommandRowEntry entry = entries.get(entries.size() - 1);
+            entry.builder.reset(journalKey);
             entry.builder.addCleanup(false, ERASE);
             return PartitionUpdate.singleRowUpdate(AccordKeyspace.Journal, 
partitionKey, toRow(entry)).unfilteredIterator();
         }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 018fe7d2ef..c61f5e7a9b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -887,7 +887,7 @@ public abstract class AccordExecutor implements CacheSize, 
AccordCacheEntry.OnLo
                 else
                 {
                     selfTask.queuePosition = task.queuePosition;
-                    waitingToRun.update(task);
+                    waitingToRun.update(selfTask);
                 }
             }
         }
@@ -1173,6 +1173,7 @@ public abstract class AccordExecutor implements 
CacheSize, AccordCacheEntry.OnLo
                 TaskQueue queue = executor == null ? waitingToRun : executor;
                 if (queue.contains(this))
                 {
+                    --tasks;
                     queue.remove(this);
                     fail(new CancellationException());
                 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index b8b6709d65..db64d55da2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -225,8 +225,13 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     {
         Builder builder = load(commandStoreId, txnId);
         Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, 
durableBefore);
-        if (cleanup == Cleanup.EXPUNGE)
-            return null;
+        switch (cleanup)
+        {
+            case ERASE:
+                return Command.Truncated.erased(txnId);
+            case EXPUNGE:
+                return null;
+        }
 
         return builder.construct(redundantBefore);
     }
@@ -583,6 +588,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                         
ExecuteAtSerializer.serialize(command.executesAtLeast(), out);
                         break;
                     case MIN_UNIQUE_HLC:
+                        Invariants.require(command.waitingOn().minUniqueHlc() 
!= 0);
                         
out.writeUnsignedVInt(command.waitingOn().minUniqueHlc());
                         break;
                     case SAVE_STATUS:
@@ -709,8 +715,8 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                         out.writeByte(cleanup.ordinal());
                         break;
                     case EXECUTE_AT:
-                        Invariants.require(txnId != null);
-                        Invariants.require(executeAt != null);
+                        Invariants.require(txnId != null, "%s", this);
+                        Invariants.require(executeAt != null, "%s", this);
                         ExecuteAtSerializer.serialize(txnId, executeAt, out);
                         break;
                     case EXECUTES_AT_LEAST:
@@ -718,47 +724,47 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                         ExecuteAtSerializer.serialize(executesAtLeast, out);
                         break;
                     case MIN_UNIQUE_HLC:
-                        Invariants.require(minUniqueHlc != 0);
+                        Invariants.require(minUniqueHlc != 0, "%s", this);
                         out.writeUnsignedVInt(minUniqueHlc);
                         break;
                     case SAVE_STATUS:
-                        Invariants.require(saveStatus != null);
+                        Invariants.require(saveStatus != null, "%s", this);
                         out.writeByte(saveStatus.ordinal());
                         break;
                     case DURABILITY:
-                        Invariants.require(durability != null);
+                        Invariants.require(durability != null, "%s", this);
                         out.writeByte(durability.ordinal());
                         break;
                     case ACCEPTED:
-                        Invariants.require(acceptedOrCommitted != null);
+                        Invariants.require(acceptedOrCommitted != null, "%s", 
this);
                         
CommandSerializers.ballot.serialize(acceptedOrCommitted, out);
                         break;
                     case PROMISED:
-                        Invariants.require(promised != null);
+                        Invariants.require(promised != null, "%s", this);
                         CommandSerializers.ballot.serialize(promised, out);
                         break;
                     case PARTICIPANTS:
-                        Invariants.require(participants != null);
+                        Invariants.require(participants != null, "%s", this);
                         
CommandSerializers.participants.serialize(participants, out);
                         break;
                     case PARTIAL_TXN:
-                        Invariants.require(partialTxn != null);
+                        Invariants.require(partialTxn != null, "%s", this);
                         CommandSerializers.partialTxn.serialize(partialTxn, 
out, userVersion);
                         break;
                     case PARTIAL_DEPS:
-                        Invariants.require(partialDeps != null);
+                        Invariants.require(partialDeps != null, "%s", this);
                         DepsSerializers.partialDeps.serialize(partialDeps, 
out);
                         break;
                     case WAITING_ON:
-                        Invariants.require(waitingOn != null);
+                        Invariants.require(waitingOn != null, "%s", this);
                         
((WaitingOnSerializer.Provider)waitingOn).reserialize(out);
                         break;
                     case WRITES:
-                        Invariants.require(writes != null);
+                        Invariants.require(writes != null, "%s", this);
                         CommandSerializers.writes.serialize(writes, out, 
userVersion);
                         break;
                     case RESULT:
-                        Invariants.require(result != null);
+                        Invariants.require(result != null, "%s", this);
                         ResultSerializers.result.serialize(result, out);
                         break;
                 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 15c195f475..2a093887b6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -73,7 +73,7 @@ public class AccordLoadTest extends AccordTestBase
                                                                             
.set("accord.shard_durability_target_splits", "64")
                                                                             
.set("accord.shard_durability_cycle", "5m")
 //                                                                            
.set("accord.ephemeral_read_enabled", "true")
-                                                                            
.set("accord.gc_delay", "5s")), 3);
+                                                                            
.set("accord.gc_delay", "30s")), 3);
     }
 
     @Ignore
@@ -100,13 +100,15 @@ public class AccordLoadTest extends AccordTestBase
             ICoordinator coordinator = cluster.coordinator(1);
             final int repairInterval = Integer.MAX_VALUE;
             final int compactionInterval = 20_000;
-            final int flushInterval = 50_000;
-            final int compactionPeriodSeconds = 1;
-            final int restartInterval = 100_000;
-            final int batchSizeLimit = 1000;
+//            final int flushInterval = 50_000;
+            final int flushInterval = 500;
+            final int compactionPeriodSeconds = 0;
+//            final int restartInterval = 100_000;
+            final int restartInterval = Integer.MAX_VALUE;
+            final int batchSizeLimit = 200;
             final long batchTime = TimeUnit.SECONDS.toNanos(10);
             final int concurrency = 100;
-            final int ratePerSecond = 2000;
+            final int ratePerSecond = 1000;
             final int keyCount = 10_000;
             final float readChance = 0.33f;
             long nextRepairAt = repairInterval;
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index eea399dd60..50cc6c65de 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -129,7 +129,7 @@ public class AccordJournalBurnTest extends BurnTestBase
     public void testOne()
     {
         long seed = System.nanoTime();
-        int operations = 1000;
+        int operations = 5000;
 
         logger.info("Seed: {}", seed);
         Cluster.trace.trace("Seed: {}", seed);
@@ -273,7 +273,6 @@ public class AccordJournalBurnTest extends BurnTestBase
                                  List<ISSTableScanner> scanners = 
selected.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 
                                  Collection<SSTableReader> newSStables;
-
                                  try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(selected, OperationType.COMPACTION);
                                       CompactionController controller = new 
CompactionController(cfs, selected, 0);
                                       CompactionIterator ci = new 
CompactionIterator(OperationType.COMPACTION,
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index a2677f3160..a3056c9a4f 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.awaitility.Awaitility;
 
+import static accord.api.ProtocolModifiers.Toggles.SendStableMessages.TO_ALL;
 import static accord.primitives.TxnId.FastPath.Unoptimised;
 import static org.apache.cassandra.Util.spinUntilSuccess;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
@@ -70,6 +71,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
         daemonInitialization();
         DatabaseDescriptor.getAccord().queue_shard_count = new 
OptionaldPositiveInt(1);
         DatabaseDescriptor.getAccord().command_store_shard_count = new 
OptionaldPositiveInt(1);
+        ProtocolModifiers.Toggles.setSendStableMessages(TO_ALL);
 
         CQLTester.setUpClass();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to