This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 115c5539840354d6af81a246d391071d2851f636 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Wed Jun 25 20:10:00 2025 +0100 Follow-up to CASSANDRA-20726: - Fix shouldCleanup handling of erase/expunge - Fix CommandChange handling of minUniqueHlc being cleared - Don't clear minUniqueHlc when fast applying; instead simply validate !isWaiting patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20726 --- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 6 ++-- .../cassandra/service/accord/AccordExecutor.java | 3 +- .../cassandra/service/accord/AccordJournal.java | 36 +++++++++++++--------- .../distributed/test/accord/AccordLoadTest.java | 14 +++++---- .../service/accord/AccordJournalBurnTest.java | 3 +- .../db/virtual/AccordDebugKeyspaceTest.java | 2 ++ 7 files changed, 38 insertions(+), 28 deletions(-) diff --git a/modules/accord b/modules/accord index 756f2a1f88..aa08b04e82 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 756f2a1f88f079e74c34fd8e0f3bb5aa98760bef +Subproject commit aa08b04e8235d9c0b7e2bbe3bbe8773184bbbe46 diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 2bba287663..399db90a7d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -119,7 +119,6 @@ import static org.apache.cassandra.config.Config.PaxosStatePurging.legacy; import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging; import static org.apache.cassandra.service.accord.AccordKeyspace.CFKAccessor; import static org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey; -import static org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.key; /** * Merge multiple iterators over the content of sstable into a "compacted" iterator. @@ -1103,7 +1102,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte case EXPUNGE: return null; case ERASE: - return erase(partitionKey); + return erase(journalKey, partitionKey); case TRUNCATE: case TRUNCATE_WITH_OUTCOME: @@ -1137,9 +1136,10 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return newVersion.build().unfilteredIterator(); } - private UnfilteredRowIterator erase(DecoratedKey partitionKey) throws IOException + private UnfilteredRowIterator erase(JournalKey journalKey, DecoratedKey partitionKey) throws IOException { AccordCommandRowEntry entry = entries.get(entries.size() - 1); + entry.builder.reset(journalKey); entry.builder.addCleanup(false, ERASE); return PartitionUpdate.singleRowUpdate(AccordKeyspace.Journal, partitionKey, toRow(entry)).unfilteredIterator(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java index 018fe7d2ef..c61f5e7a9b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java @@ -887,7 +887,7 @@ public abstract class AccordExecutor implements CacheSize, AccordCacheEntry.OnLo else { selfTask.queuePosition = task.queuePosition; - waitingToRun.update(task); + waitingToRun.update(selfTask); } } } @@ -1173,6 +1173,7 @@ public abstract class AccordExecutor implements CacheSize, AccordCacheEntry.OnLo TaskQueue queue = executor == null ? waitingToRun : executor; if (queue.contains(this)) { + --tasks; queue.remove(this); fail(new CancellationException()); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index b8b6709d65..db64d55da2 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -225,8 +225,13 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier { Builder builder = load(commandStoreId, txnId); Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, durableBefore); - if (cleanup == Cleanup.EXPUNGE) - return null; + switch (cleanup) + { + case ERASE: + return Command.Truncated.erased(txnId); + case EXPUNGE: + return null; + } return builder.construct(redundantBefore); } @@ -583,6 +588,7 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier ExecuteAtSerializer.serialize(command.executesAtLeast(), out); break; case MIN_UNIQUE_HLC: + Invariants.require(command.waitingOn().minUniqueHlc() != 0); out.writeUnsignedVInt(command.waitingOn().minUniqueHlc()); break; case SAVE_STATUS: @@ -709,8 +715,8 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier out.writeByte(cleanup.ordinal()); break; case EXECUTE_AT: - Invariants.require(txnId != null); - Invariants.require(executeAt != null); + Invariants.require(txnId != null, "%s", this); + Invariants.require(executeAt != null, "%s", this); ExecuteAtSerializer.serialize(txnId, executeAt, out); break; case EXECUTES_AT_LEAST: @@ -718,47 +724,47 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier ExecuteAtSerializer.serialize(executesAtLeast, out); break; case MIN_UNIQUE_HLC: - Invariants.require(minUniqueHlc != 0); + Invariants.require(minUniqueHlc != 0, "%s", this); out.writeUnsignedVInt(minUniqueHlc); break; case SAVE_STATUS: - Invariants.require(saveStatus != null); + Invariants.require(saveStatus != null, "%s", this); out.writeByte(saveStatus.ordinal()); break; case DURABILITY: - Invariants.require(durability != null); + Invariants.require(durability != null, "%s", this); out.writeByte(durability.ordinal()); break; case ACCEPTED: - Invariants.require(acceptedOrCommitted != null); + Invariants.require(acceptedOrCommitted != null, "%s", this); CommandSerializers.ballot.serialize(acceptedOrCommitted, out); break; case PROMISED: - Invariants.require(promised != null); + Invariants.require(promised != null, "%s", this); CommandSerializers.ballot.serialize(promised, out); break; case PARTICIPANTS: - Invariants.require(participants != null); + Invariants.require(participants != null, "%s", this); CommandSerializers.participants.serialize(participants, out); break; case PARTIAL_TXN: - Invariants.require(partialTxn != null); + Invariants.require(partialTxn != null, "%s", this); CommandSerializers.partialTxn.serialize(partialTxn, out, userVersion); break; case PARTIAL_DEPS: - Invariants.require(partialDeps != null); + Invariants.require(partialDeps != null, "%s", this); DepsSerializers.partialDeps.serialize(partialDeps, out); break; case WAITING_ON: - Invariants.require(waitingOn != null); + Invariants.require(waitingOn != null, "%s", this); ((WaitingOnSerializer.Provider)waitingOn).reserialize(out); break; case WRITES: - Invariants.require(writes != null); + Invariants.require(writes != null, "%s", this); CommandSerializers.writes.serialize(writes, out, userVersion); break; case RESULT: - Invariants.require(result != null); + Invariants.require(result != null, "%s", this); ResultSerializers.result.serialize(result, out); break; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index 15c195f475..2a093887b6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -73,7 +73,7 @@ public class AccordLoadTest extends AccordTestBase .set("accord.shard_durability_target_splits", "64") .set("accord.shard_durability_cycle", "5m") // .set("accord.ephemeral_read_enabled", "true") - .set("accord.gc_delay", "5s")), 3); + .set("accord.gc_delay", "30s")), 3); } @Ignore @@ -100,13 +100,15 @@ public class AccordLoadTest extends AccordTestBase ICoordinator coordinator = cluster.coordinator(1); final int repairInterval = Integer.MAX_VALUE; final int compactionInterval = 20_000; - final int flushInterval = 50_000; - final int compactionPeriodSeconds = 1; - final int restartInterval = 100_000; - final int batchSizeLimit = 1000; +// final int flushInterval = 50_000; + final int flushInterval = 500; + final int compactionPeriodSeconds = 0; +// final int restartInterval = 100_000; + final int restartInterval = Integer.MAX_VALUE; + final int batchSizeLimit = 200; final long batchTime = TimeUnit.SECONDS.toNanos(10); final int concurrency = 100; - final int ratePerSecond = 2000; + final int ratePerSecond = 1000; final int keyCount = 10_000; final float readChance = 0.33f; long nextRepairAt = repairInterval; diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index eea399dd60..50cc6c65de 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -129,7 +129,7 @@ public class AccordJournalBurnTest extends BurnTestBase public void testOne() { long seed = System.nanoTime(); - int operations = 1000; + int operations = 5000; logger.info("Seed: {}", seed); Cluster.trace.trace("Seed: {}", seed); @@ -273,7 +273,6 @@ public class AccordJournalBurnTest extends BurnTestBase List<ISSTableScanner> scanners = selected.stream().map(SSTableReader::getScanner).collect(Collectors.toList()); Collection<SSTableReader> newSStables; - try (LifecycleTransaction txn = cfs.getTracker().tryModify(selected, OperationType.COMPACTION); CompactionController controller = new CompactionController(cfs, selected, 0); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index a2677f3160..a3056c9a4f 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -53,6 +53,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.concurrent.Condition; import org.awaitility.Awaitility; +import static accord.api.ProtocolModifiers.Toggles.SendStableMessages.TO_ALL; import static accord.primitives.TxnId.FastPath.Unoptimised; import static org.apache.cassandra.Util.spinUntilSuccess; import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn; @@ -70,6 +71,7 @@ public class AccordDebugKeyspaceTest extends CQLTester daemonInitialization(); DatabaseDescriptor.getAccord().queue_shard_count = new OptionaldPositiveInt(1); DatabaseDescriptor.getAccord().command_store_shard_count = new OptionaldPositiveInt(1); + ProtocolModifiers.Toggles.setSendStableMessages(TO_ALL); CQLTester.setUpClass(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
