This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f960ccb58 Follow-up to CASSANDRA-20726: - Fix shouldCleanup handling
of erase/expunge - Fix CommandChange handling of minUniqueHlc being cleared -
Don't clear minUniqueHlc when fast applying; instead simply validate !isWaiting
5f960ccb58 is described below
commit 5f960ccb58755cb6ff613021161ffad6b919b843
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jun 25 20:10:00 2025 +0100
Follow-up to CASSANDRA-20726:
- Fix shouldCleanup handling of erase/expunge
- Fix CommandChange handling of minUniqueHlc being cleared
- Don't clear minUniqueHlc when fast applying; instead simply validate
!isWaiting
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20726
---
modules/accord | 2 +-
.../db/compaction/CompactionIterator.java | 6 ++--
.../cassandra/service/accord/AccordExecutor.java | 3 +-
.../cassandra/service/accord/AccordJournal.java | 36 +++++++++++++---------
.../distributed/test/accord/AccordLoadTest.java | 14 +++++----
.../service/accord/AccordJournalBurnTest.java | 3 +-
.../db/virtual/AccordDebugKeyspaceTest.java | 3 ++
7 files changed, 39 insertions(+), 28 deletions(-)
diff --git a/modules/accord b/modules/accord
index 756f2a1f88..6f9d248a9b 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 756f2a1f88f079e74c34fd8e0f3bb5aa98760bef
+Subproject commit 6f9d248a9b903d5de354a030a5d2e6e7b0e34b8b
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 2bba287663..399db90a7d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -119,7 +119,6 @@ import static
org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
import static org.apache.cassandra.service.accord.AccordKeyspace.CFKAccessor;
import static
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey;
-import static
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.key;
/**
* Merge multiple iterators over the content of sstable into a "compacted"
iterator.
@@ -1103,7 +1102,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
case EXPUNGE:
return null;
case ERASE:
- return erase(partitionKey);
+ return erase(journalKey, partitionKey);
case TRUNCATE:
case TRUNCATE_WITH_OUTCOME:
@@ -1137,9 +1136,10 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
return newVersion.build().unfilteredIterator();
}
- private UnfilteredRowIterator erase(DecoratedKey partitionKey) throws
IOException
+ private UnfilteredRowIterator erase(JournalKey journalKey,
DecoratedKey partitionKey) throws IOException
{
AccordCommandRowEntry entry = entries.get(entries.size() - 1);
+ entry.builder.reset(journalKey);
entry.builder.addCleanup(false, ERASE);
return PartitionUpdate.singleRowUpdate(AccordKeyspace.Journal,
partitionKey, toRow(entry)).unfilteredIterator();
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 018fe7d2ef..c61f5e7a9b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -887,7 +887,7 @@ public abstract class AccordExecutor implements CacheSize,
AccordCacheEntry.OnLo
else
{
selfTask.queuePosition = task.queuePosition;
- waitingToRun.update(task);
+ waitingToRun.update(selfTask);
}
}
}
@@ -1173,6 +1173,7 @@ public abstract class AccordExecutor implements
CacheSize, AccordCacheEntry.OnLo
TaskQueue queue = executor == null ? waitingToRun : executor;
if (queue.contains(this))
{
+ --tasks;
queue.remove(this);
fail(new CancellationException());
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index b8b6709d65..db64d55da2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -225,8 +225,13 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
{
Builder builder = load(commandStoreId, txnId);
Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore,
durableBefore);
- if (cleanup == Cleanup.EXPUNGE)
- return null;
+ switch (cleanup)
+ {
+ case ERASE:
+ return Command.Truncated.erased(txnId);
+ case EXPUNGE:
+ return null;
+ }
return builder.construct(redundantBefore);
}
@@ -583,6 +588,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
ExecuteAtSerializer.serialize(command.executesAtLeast(), out);
break;
case MIN_UNIQUE_HLC:
+ Invariants.require(command.waitingOn().minUniqueHlc()
!= 0);
out.writeUnsignedVInt(command.waitingOn().minUniqueHlc());
break;
case SAVE_STATUS:
@@ -709,8 +715,8 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
out.writeByte(cleanup.ordinal());
break;
case EXECUTE_AT:
- Invariants.require(txnId != null);
- Invariants.require(executeAt != null);
+ Invariants.require(txnId != null, "%s", this);
+ Invariants.require(executeAt != null, "%s", this);
ExecuteAtSerializer.serialize(txnId, executeAt, out);
break;
case EXECUTES_AT_LEAST:
@@ -718,47 +724,47 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
ExecuteAtSerializer.serialize(executesAtLeast, out);
break;
case MIN_UNIQUE_HLC:
- Invariants.require(minUniqueHlc != 0);
+ Invariants.require(minUniqueHlc != 0, "%s", this);
out.writeUnsignedVInt(minUniqueHlc);
break;
case SAVE_STATUS:
- Invariants.require(saveStatus != null);
+ Invariants.require(saveStatus != null, "%s", this);
out.writeByte(saveStatus.ordinal());
break;
case DURABILITY:
- Invariants.require(durability != null);
+ Invariants.require(durability != null, "%s", this);
out.writeByte(durability.ordinal());
break;
case ACCEPTED:
- Invariants.require(acceptedOrCommitted != null);
+ Invariants.require(acceptedOrCommitted != null, "%s",
this);
CommandSerializers.ballot.serialize(acceptedOrCommitted, out);
break;
case PROMISED:
- Invariants.require(promised != null);
+ Invariants.require(promised != null, "%s", this);
CommandSerializers.ballot.serialize(promised, out);
break;
case PARTICIPANTS:
- Invariants.require(participants != null);
+ Invariants.require(participants != null, "%s", this);
CommandSerializers.participants.serialize(participants, out);
break;
case PARTIAL_TXN:
- Invariants.require(partialTxn != null);
+ Invariants.require(partialTxn != null, "%s", this);
CommandSerializers.partialTxn.serialize(partialTxn,
out, userVersion);
break;
case PARTIAL_DEPS:
- Invariants.require(partialDeps != null);
+ Invariants.require(partialDeps != null, "%s", this);
DepsSerializers.partialDeps.serialize(partialDeps,
out);
break;
case WAITING_ON:
- Invariants.require(waitingOn != null);
+ Invariants.require(waitingOn != null, "%s", this);
((WaitingOnSerializer.Provider)waitingOn).reserialize(out);
break;
case WRITES:
- Invariants.require(writes != null);
+ Invariants.require(writes != null, "%s", this);
CommandSerializers.writes.serialize(writes, out,
userVersion);
break;
case RESULT:
- Invariants.require(result != null);
+ Invariants.require(result != null, "%s", this);
ResultSerializers.result.serialize(result, out);
break;
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 15c195f475..2a093887b6 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -73,7 +73,7 @@ public class AccordLoadTest extends AccordTestBase
.set("accord.shard_durability_target_splits", "64")
.set("accord.shard_durability_cycle", "5m")
//
.set("accord.ephemeral_read_enabled", "true")
-
.set("accord.gc_delay", "5s")), 3);
+
.set("accord.gc_delay", "30s")), 3);
}
@Ignore
@@ -100,13 +100,15 @@ public class AccordLoadTest extends AccordTestBase
ICoordinator coordinator = cluster.coordinator(1);
final int repairInterval = Integer.MAX_VALUE;
final int compactionInterval = 20_000;
- final int flushInterval = 50_000;
- final int compactionPeriodSeconds = 1;
- final int restartInterval = 100_000;
- final int batchSizeLimit = 1000;
+// final int flushInterval = 50_000;
+ final int flushInterval = 500;
+ final int compactionPeriodSeconds = 0;
+// final int restartInterval = 100_000;
+ final int restartInterval = Integer.MAX_VALUE;
+ final int batchSizeLimit = 200;
final long batchTime = TimeUnit.SECONDS.toNanos(10);
final int concurrency = 100;
- final int ratePerSecond = 2000;
+ final int ratePerSecond = 1000;
final int keyCount = 10_000;
final float readChance = 0.33f;
long nextRepairAt = repairInterval;
diff --git
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index eea399dd60..50cc6c65de 100644
---
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -129,7 +129,7 @@ public class AccordJournalBurnTest extends BurnTestBase
public void testOne()
{
long seed = System.nanoTime();
- int operations = 1000;
+ int operations = 5000;
logger.info("Seed: {}", seed);
Cluster.trace.trace("Seed: {}", seed);
@@ -273,7 +273,6 @@ public class AccordJournalBurnTest extends BurnTestBase
List<ISSTableScanner> scanners =
selected.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
Collection<SSTableReader> newSStables;
-
try (LifecycleTransaction txn =
cfs.getTracker().tryModify(selected, OperationType.COMPACTION);
CompactionController controller = new
CompactionController(cfs, selected, 0);
CompactionIterator ci = new
CompactionIterator(OperationType.COMPACTION,
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index a2677f3160..003872ff0e 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -53,6 +53,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.concurrent.Condition;
import org.awaitility.Awaitility;
+import static accord.api.ProtocolModifiers.Toggles.FastExec.DISABLED;
+import static accord.api.ProtocolModifiers.Toggles.SendStableMessages.TO_ALL;
import static accord.primitives.TxnId.FastPath.Unoptimised;
import static org.apache.cassandra.Util.spinUntilSuccess;
import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
@@ -70,6 +72,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
daemonInitialization();
DatabaseDescriptor.getAccord().queue_shard_count = new
OptionaldPositiveInt(1);
DatabaseDescriptor.getAccord().command_store_shard_count = new
OptionaldPositiveInt(1);
+ ProtocolModifiers.Toggles.setSendStableMessages(TO_ALL);
CQLTester.setUpClass();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]