This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81bc18ad63 Enrich Durability with each phase, so we can both prune
unapplied dependencies to mitigate replicas that are behind causing Deps
growth, and more reliably avoid initialising recovery progress log state of
transactions that cannot yet make progress Also fix: - Harden AccordExecutor
state cleanup to failures - Handle SAVING state in AccordCache.tryEvict, as
now possible to save for reasons besides eviction so normal to both be in evict
queue and saving - Infer invalid [...]
81bc18ad63 is described below
commit 81bc18ad63123311d71fbfcb99385855dc2c0944
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Aug 8 10:25:17 2025 +0100
Enrich Durability with each phase, so we can both prune unapplied
dependencies to mitigate replicas that are behind causing Deps growth,
and more reliably avoid initialising recovery progress log state of
transactions that cannot yet make progress
Also fix:
- Harden AccordExecutor state cleanup to failures
- Handle SAVING state in AccordCache.tryEvict, as now possible to save for
reasons besides eviction so normal to both be in evict queue and saving
- Infer invalid in MaybeRecover and FetchData
- MaybeRecover sometimes aborts before home shard knows outcome
- Epoch sync with VisibilitySyncPoint
- Retired implies synced
- Don't interpret force repair as excluding nodes from Accord sync
conditions
- TxnData.without
Also improve:
- Add Topology.removedNodes
- If Durability implies we can fetch a status, update the waiting state to
fetch it
- DurableBefore debug table should have searchable txnId
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20832
---
modules/accord | 2 +-
.../cassandra/db/virtual/AccordDebugKeyspace.java | 14 ++++----
src/java/org/apache/cassandra/net/Verb.java | 1 -
.../org/apache/cassandra/repair/RepairJob.java | 2 +-
.../cassandra/service/accord/AccordCache.java | 2 ++
.../service/accord/AccordConfigurationService.java | 8 ++---
.../cassandra/service/accord/AccordExecutor.java | 38 +++++++++++++---------
.../accord/AccordExecutorAbstractLockLoop.java | 6 ++--
.../service/accord/AccordExecutorSimple.java | 2 +-
.../cassandra/service/accord/AccordJournal.java | 6 ++--
.../service/accord/AccordMessageSink.java | 1 -
.../service/accord/AccordObjectSizes.java | 4 +--
.../cassandra/service/accord/AccordService.java | 5 +--
.../cassandra/service/accord/AccordTask.java | 7 ++--
.../cassandra/service/accord/AccordTopology.java | 8 ++++-
.../service/accord/repair/AccordRepair.java | 23 +++----------
.../accord/serializers/CommandSerializers.java | 3 +-
.../serializers/CommandStoreSerializers.java | 17 ++++++----
.../accord/serializers/InformSerializers.java | 27 ---------------
.../accord/serializers/SetDurableSerializers.java | 6 ++--
.../accord/serializers/TopologySerializers.java | 12 +++++--
.../cassandra/service/accord/txn/TxnData.java | 7 ++++
.../service/accord/txn/TxnDataKeyValue.java | 2 +-
.../db/virtual/AccordDebugKeyspaceTest.java | 14 ++++----
.../service/accord/AccordCommandStoreTest.java | 4 +--
.../serializers/CommandsForKeySerializerTest.java | 12 +++----
26 files changed, 110 insertions(+), 123 deletions(-)
diff --git a/modules/accord b/modules/accord
index 12da4693b4..b86550c154 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 12da4693b449b30d0420673579a06025b7b3e484
+Subproject commit b86550c154758e5dd223fa8101a2a83005b644a6
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index fa3101741e..efe683af71 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -129,7 +129,7 @@ import static
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STOR
import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
-import static accord.local.RedundantStatus.Property.MAJORITY_APPLIED;
+import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
@@ -459,8 +459,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
" table_name text,\n" +
" token_start 'TokenUtf8Type',\n" +
" token_end 'TokenUtf8Type',\n" +
- " majority_before text,\n" +
- " universal_before text,\n" +
+ " quorum 'TxnIdUtf8Type',\n" +
+ " universal 'TxnIdUtf8Type',\n" +
" PRIMARY KEY (keyspace_name, table_name,
token_start)" +
')', UTF8Type.instance));
}
@@ -475,8 +475,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
TableMetadata tableMetadata = tableMetadata(tableId);
ds.row(keyspace(tableMetadata), table(tableId,
tableMetadata), printToken(start))
.column("token_end", printToken(end))
- .column("majority_before",
entry.majorityBefore.toString())
- .column("universal_before",
entry.universalBefore.toString());
+ .column("quorum", entry.quorumBefore.toString())
+ .column("universal", entry.universalBefore.toString());
return ds;
},
new SimpleDataSet(metadata()),
@@ -751,7 +751,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
" end_epoch bigint,\n" +
" gc_before 'TxnIdUtf8Type',\n" +
" shard_applied 'TxnIdUtf8Type',\n" +
- " majority_applied 'TxnIdUtf8Type',\n" +
+ " quorum_applied 'TxnIdUtf8Type',\n" +
" locally_applied 'TxnIdUtf8Type',\n" +
" locally_durable_to_command_store
'TxnIdUtf8Type',\n" +
" locally_durable_to_data_store 'TxnIdUtf8Type',\n" +
@@ -786,7 +786,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
.column("end_epoch", entry.endEpoch)
.column("gc_before",
entry.maxBound(GC_BEFORE).toString())
.column("shard_applied",
entry.maxBound(SHARD_APPLIED).toString())
- .column("majority_applied",
entry.maxBound(MAJORITY_APPLIED).toString())
+ .column("quorum_applied",
entry.maxBound(QUORUM_APPLIED).toString())
.column("locally_applied",
entry.maxBound(LOCALLY_APPLIED).toString())
.column("locally_durable_to_command_store",
entry.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE).toString())
.column("locally_durable_to_data_store",
entry.maxBound(LOCALLY_DURABLE_TO_DATA_STORE).toString())
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 6613bf3c33..d24c9e64ad 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -340,7 +340,6 @@ public enum Verb
ACCORD_RECOVER_AWAIT_RSP (141, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(AwaitSerializers.recoverReply),
AccordService::responseHandlerOrNoop
),
ACCORD_RECOVER_AWAIT_REQ (142, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(AwaitSerializers.recoverRequest),
AccordService::requestHandlerOrNoop, ACCORD_RECOVER_AWAIT_RSP),
ACCORD_INFORM_DURABLE_REQ (143, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(InformSerializers.durable),
AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP
),
- ACCORD_INFORM_DECIDED_REQ (171, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(InformSerializers.decided),
AccordService::requestHandlerOrNoop
),
ACCORD_CHECK_STATUS_RSP (144, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(CheckStatusSerializers.reply),
AccordService::responseHandlerOrNoop
),
ACCORD_CHECK_STATUS_REQ (145, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(CheckStatusSerializers.request),
AccordService::requestHandlerOrNoop, ACCORD_CHECK_STATUS_RSP
),
ACCORD_FETCH_DATA_RSP (146, P2, writeTimeout, IMMEDIATE,
() -> accordEmbedded(FetchSerializers.reply),
AccordService::responseHandlerOrNoop
),
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 4bebca17e4..ca6770998e 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -192,7 +192,7 @@ public class RepairJob extends AsyncFuture<RepairResult>
implements Runnable
requireAllEndpoints = true;
}
logger.info("{} {}.{} starting accord repair, require all
endpoints {}", session.previewKind.logPrefix(session.getId()), desc.keyspace,
desc.columnFamily, requireAllEndpoints);
- AccordRepair repair = new AccordRepair(ctx, cfs,
desc.sessionId, desc.keyspace, desc.ranges, requireAllEndpoints, allEndpoints);
+ AccordRepair repair = new AccordRepair(ctx, cfs,
desc.sessionId, desc.keyspace, desc.ranges, requireAllEndpoints);
return repair.repair(taskExecutor).flatMap(accordRepairResult
-> {
// Propagate the HLC discovered during Accord repair to
Paxos so Paxos doesn't use ballots < Accord has already used
if (accordRepairResult.maxHlc != IAccordService.NO_HLC)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index c96a4e685c..991c610cc1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -267,9 +267,11 @@ public class AccordCache implements CacheSize
break;
case MODIFIED:
node.save();
+ case SAVING: // we can be in evict queue and already be saving if
save was requested for durability rather than eviction
boolean evict = node.status() == LOADED;
node.unlink();
if (evict) evict(node, true);
+ break;
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 84f348c4ae..9f3563e367 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -452,13 +452,11 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
{
long epoch = topology.epoch();
EpochState epochState = getOrCreateEpochState(epoch);
- if (!startSync || epochState.syncStatus != SyncStatus.NOT_STARTED)
- return;
-
synchronized (this)
{
- if (epochState.syncStatus != SyncStatus.NOT_STARTED)
+ if (!startSync || epochState.syncStatus != SyncStatus.NOT_STARTED)
return;
+
epochState.setSyncStatus(SyncStatus.NOTIFYING);
}
@@ -651,7 +649,7 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
public Future<Void> unsafeLocalSyncNotified(long epoch)
{
AsyncPromise<Void> promise = new AsyncPromise<>();
- getOrCreateEpochState(epoch).localSyncNotified().invoke((result,
failure) -> {
+ getOrCreateEpochState(epoch).localSyncNotified().begin((result,
failure) -> {
if (failure != null) promise.tryFailure(failure);
else promise.trySuccess(result);
});
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 431c1639d3..2cd465c612 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -566,8 +566,8 @@ public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTa
void submitExclusive(AccordTask<?> task)
{
assignQueuePosition(task);
- ++tasks;
task.setupExclusive();
+ ++tasks;
updateQueue(task);
enqueueLoadsExclusive();
}
@@ -616,21 +616,26 @@ public abstract class AccordExecutor implements
CacheSize, LoadExecutor<AccordTa
task.queuePosition = parent.queuePosition;
}
- void completeTaskExclusive(Task task, boolean cleanupTask)
+ void completeTaskExclusive(Task task)
{
- --tasks;
-
// for integration with SequentialExecutor, we must :
// - first take the position so that represents the just-executed task
// - call cleanup to submit any following task on the relevant
sub-queue
// - remove the previous task from the running collection only if
still present (SequentialExecutor will have removed it)
int position = task.queuePosition;
- if (cleanupTask) task.cleanupExclusive();
- if (running.contains(task))
- running.remove(task);
+ try
+ {
+ task.cleanupExclusive();
+ }
+ finally
+ {
+ --tasks;
+ if (running.contains(task))
+ running.remove(task);
- if (waitingForCompletion != null &&
waitingForCompletion.peek().maybeNotify - position >= 0)
- maybeNotifyWaitingForCompletion();
+ if (waitingForCompletion != null &&
waitingForCompletion.peek().maybeNotify - position >= 0)
+ maybeNotifyWaitingForCompletion();
+ }
}
private void maybeNotifyWaitingForCompletion()
@@ -655,14 +660,15 @@ public abstract class AccordExecutor implements
CacheSize, LoadExecutor<AccordTa
return task == null ? min : Integer.min(task.queuePosition, min);
}
- private void cancelExclusive(AccordTask<?> task)
+ void cancelExclusive(AccordTask<?> task)
{
switch (task.state())
{
default: throw new UnhandledEnum(task.state());
case INITIALIZED:
// we could be cancelled before we even reach the queue
- task.cancelExclusive();
+ try { task.cancelExclusive(); }
+ finally { task.cleanupExclusive(); }
break;
case SCANNING_RANGES:
@@ -671,8 +677,8 @@ public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTa
case WAITING_TO_SCAN_RANGES:
case WAITING_TO_RUN:
task.unqueueIfQueued();
- task.cancelExclusive();
- completeTaskExclusive(task, false);
+ try { task.cancelExclusive(); }
+ finally { completeTaskExclusive(task); }
break;
case FAILING:
@@ -715,7 +721,7 @@ public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTa
finally
{
task.unqueueIfQueued();
- completeTaskExclusive(task, true);
+ completeTaskExclusive(task);
}
}
@@ -1000,9 +1006,9 @@ public abstract class AccordExecutor implements
CacheSize, LoadExecutor<AccordTa
owner = null;
running = false;
task = super.poll();
+ AccordExecutor.this.running.remove(selfTask);
if (task != null)
{
- AccordExecutor.this.running.remove(selfTask);
selfTask.queuePosition = task.queuePosition;
waitingToRun.append(selfTask);
}
@@ -1241,7 +1247,7 @@ public abstract class AccordExecutor implements
CacheSize, LoadExecutor<AccordTa
if (queue.contains(this))
{
queue.remove(this);
- completeTaskExclusive(this, true);
+ completeTaskExclusive(this);
try { fail(new CancellationException()); }
catch (Throwable t) { agent.onUncaughtException(t); }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
index 31583de8a5..2b80cbac8d 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
@@ -180,7 +180,7 @@ abstract class AccordExecutorAbstractLockLoop extends
AccordExecutor
}
finally
{
- completeTaskExclusive(task, true);
+ completeTaskExclusive(task);
clearRunning();
}
}
@@ -234,7 +234,7 @@ abstract class AccordExecutorAbstractLockLoop extends
AccordExecutor
{
Task tmp = task;
task = null;
- completeTaskExclusive(tmp, true);
+ completeTaskExclusive(tmp);
clearRunning();
}
else resumeExclusive();
@@ -270,7 +270,7 @@ abstract class AccordExecutorAbstractLockLoop extends
AccordExecutor
{
try { task.fail(t); }
catch (Throwable t2) { t.addSuppressed(t2); }
- try { completeTaskExclusive(task, true); }
+ try { completeTaskExclusive(task); }
catch (Throwable t2) { t.addSuppressed(t2); }
try { agent.onUncaughtException(t); }
catch (Throwable t2) { /* nothing we can sensibly
do after already reporting */ }
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
index 202dcb5f86..11a1d04ca0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
@@ -96,7 +96,7 @@ class AccordExecutorSimple extends AccordExecutor
catch (Throwable t) { task.fail(t); }
finally
{
- completeTaskExclusive(task, true);
+ completeTaskExclusive(task);
active = null;
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 08819934d0..8adeb6ca87 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -816,7 +816,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
out.writeByte(command.saveStatus().ordinal());
break;
case DURABILITY:
- out.writeByte(command.durability().ordinal());
+ out.writeByte(command.durability().encoded());
break;
case ACCEPTED:
CommandSerializers.ballot.serialize(command.acceptedOrCommitted(), out);
@@ -954,7 +954,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
break;
case DURABILITY:
Invariants.require(durability != null, "%s", this);
- out.writeByte(durability.ordinal());
+ out.writeByte(durability.encoded());
break;
case ACCEPTED:
Invariants.require(acceptedOrCommitted != null, "%s",
this);
@@ -1035,7 +1035,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
saveStatus = SaveStatus.values()[in.readByte()];
break;
case DURABILITY:
- durability = Durability.values()[in.readByte()];
+ durability = Durability.forEncoded(in.readUnsignedByte());
break;
case ACCEPTED:
acceptedOrCommitted =
CommandSerializers.ballot.deserialize(in);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index c57b6a27b3..19e37f06c3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -120,7 +120,6 @@ public class AccordMessageSink implements MessageSink
builder.put(WAIT_UNTIL_APPLIED_REQ,
Verb.ACCORD_WAIT_UNTIL_APPLIED_REQ);
builder.put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ,
Verb.ACCORD_APPLY_AND_WAIT_REQ);
builder.put(INFORM_DURABLE_REQ,
Verb.ACCORD_INFORM_DURABLE_REQ);
- builder.put(INFORM_DECIDED_REQ,
Verb.ACCORD_INFORM_DECIDED_REQ);
builder.put(CHECK_STATUS_REQ,
Verb.ACCORD_CHECK_STATUS_REQ);
builder.put(CHECK_STATUS_RSP,
Verb.ACCORD_CHECK_STATUS_RSP);
builder.put(FETCH_DATA_REQ,
Verb.ACCORD_FETCH_DATA_REQ);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 02185431c6..ac112186f9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -221,7 +221,7 @@ public class AccordObjectSizes
}
}
- private static final long EMPTY_TXN = measure(new
PartialTxn.InMemory(null, null, null, null, null,
TableMetadatasAndKeys.none(Domain.Key)));
+ private static final long EMPTY_TXN = measure(new
PartialTxn.InMemory(Kind.Read, null, null, null, null,
TableMetadatasAndKeys.none(Domain.Key)));
public static long txn(PartialTxn txn)
{
long size = EMPTY_TXN;
@@ -316,7 +316,7 @@ public class AccordObjectSizes
builder.partialDeps(PartialDeps.NONE);
if (hasTxn)
- builder.partialTxn(new PartialTxn.InMemory(null, null, null,
null, null, TableMetadatasAndKeys.none(Domain.Key)));
+ builder.partialTxn(new PartialTxn.InMemory(Kind.Read, null,
null, null, null, TableMetadatasAndKeys.none(Domain.Key)));
if (executes)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 8db4a44784..a630a39b60 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -139,6 +139,7 @@ import static accord.local.LoadKeysFor.READ_WRITE;
import static accord.local.durability.DurabilityService.SyncLocal.Self;
import static accord.local.durability.DurabilityService.SyncRemote.All;
import static accord.messages.SimpleReply.Ok;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.Write;
import static accord.primitives.TxnId.Cardinality.cardinality;
import static accord.topology.TopologyManager.TopologyRange;
@@ -533,7 +534,7 @@ public class AccordService implements IAccordService,
Shutdownable
@Override
public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound,
Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal
syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit
timeoutUnits)
{
- return node.durability().sync(requestedBy, minBound, ranges, include,
syncLocal, syncRemote, timeout, timeoutUnits);
+ return node.durability().sync(requestedBy, ExclusiveSyncPoint,
minBound, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
}
@Override
@@ -1068,7 +1069,7 @@ public class AccordService implements IAccordService,
Shutdownable
long startedAt = nanoTime();
long deadline = startedAt + timeout;
// TODO (required): relax this requirement - too expensive
- getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " +
epoch + ')', TxnId.minForEpoch(epoch), ranges, Self, All,
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges,
new LatencyRequestBookkeeping(null), startedAt, deadline, false);
+ getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " +
epoch + ')', ExclusiveSyncPoint, TxnId.minForEpoch(epoch), ranges, Self, All,
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges,
new LatencyRequestBookkeeping(null), startedAt, deadline, false);
}
public Params journalConfiguration()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index 1153599ff1..55d5e8b1aa 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -754,9 +754,9 @@ public abstract class AccordTask<R> extends SubmittableTask
implements Function<
protected void cleanupExclusive()
{
- releaseResources(commandStore.cachesExclusive());
if (state == FAILING)
state(FAILED);
+ releaseResources(commandStore.cachesExclusive());
}
@Nullable
@@ -778,7 +778,6 @@ public abstract class AccordTask<R> extends SubmittableTask
implements Function<
public void cancelExclusive()
{
- releaseResources(commandStore.cachesExclusive());
state(CANCELLED);
if (callback != null)
callback.accept(null, new CancellationException());
@@ -786,7 +785,7 @@ public abstract class AccordTask<R> extends SubmittableTask
implements Function<
void cancelExclusive(AccordExecutor owner)
{
- owner.cancel(this);
+ owner.cancelExclusive(this);
}
public State state()
@@ -839,7 +838,7 @@ public abstract class AccordTask<R> extends SubmittableTask
implements Function<
catch (Throwable t)
{
releaseResourcesSlow(caches, t);
- throw t;
+ commandStore.agent().onUncaughtException(t);
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index ebfd99fd8a..ca817e647d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import accord.local.Node;
import accord.local.Node.Id;
import accord.primitives.Ranges;
import accord.topology.Shard;
@@ -307,7 +308,12 @@ public class AccordTopology
}
res.sort((a, b) -> a.range.compare(b.range));
- return new Topology(epoch.getEpoch(),
SortedArrayList.copyUnsorted(staleReplicas.ids(), Id[]::new), res.toArray(new
Shard[0]));
+ List<Node.Id> removed = directory.removedNodes().stream()
+ .filter(n ->
n.removedIn.equals(epoch))
+ .map(n -> tcmIdToAccord(n.id))
+ .collect(Collectors.toList());
+
+ return new Topology(epoch.getEpoch(),
SortedArrayList.copySorted(removed, Id[]::new),
SortedArrayList.copyUnsorted(staleReplicas.ids(), Id[]::new), res.toArray(new
Shard[0]));
}
public static Topology createAccordTopology(ClusterMetadata metadata,
ShardLookup lookup)
diff --git
a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
index e6020359b8..4cddc7c1ff 100644
--- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
@@ -22,21 +22,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.local.Node;
-import accord.local.durability.DurabilityService;
+import accord.local.durability.DurabilityService.SyncRemote;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.LatencyMetrics;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.schema.Schema;
@@ -68,29 +62,25 @@ import static
org.apache.cassandra.config.DatabaseDescriptor.getAccordRepairTime
*/
public class AccordRepair
{
- private static final Logger logger =
LoggerFactory.getLogger(AccordRepair.class);
-
private final SharedContext ctx;
private final ColumnFamilyStore cfs;
private final TimeUUID repairId;
private final Ranges ranges;
- private final boolean requireAllEndpoints;
- private final List<InetAddressAndPort> endpoints;
+ private final SyncRemote syncRemote;
private final Epoch minEpoch = ClusterMetadata.current().epoch;
private volatile Throwable shouldAbort = null;
private volatile Thread waiting;
- public AccordRepair(SharedContext ctx, ColumnFamilyStore cfs, TimeUUID
repairId, String keyspace, Collection<Range<Token>> ranges, boolean
requireAllEndpoints, List<InetAddressAndPort> endpoints)
+ public AccordRepair(SharedContext ctx, ColumnFamilyStore cfs, TimeUUID
repairId, String keyspace, Collection<Range<Token>> ranges, boolean
requireAllEndpoints)
{
this.ctx = ctx;
this.cfs = cfs;
this.repairId = repairId;
- this.requireAllEndpoints = requireAllEndpoints;
- this.endpoints = endpoints;
+ this.syncRemote = requireAllEndpoints ? All : Quorum;
this.ranges = AccordTopology.toAccordRanges(keyspace, ranges);
}
@@ -151,9 +141,6 @@ public class AccordRepair
private Pair<List<accord.primitives.Range>, Long> repairRange(TokenRange
range) throws Throwable
{
List<accord.primitives.Range> repairedRanges = new ArrayList<>();
- List<Node.Id> ids = endpoints == null ? null :
endpoints.stream().map(AccordService.instance().configService()::mappedId).collect(Collectors.toList());
- DurabilityService.SyncRemote syncRemote = requireAllEndpoints ? All :
Quorum;
-
if (shouldAbort != null)
throw shouldAbort;
@@ -177,7 +164,7 @@ public class AccordRepair
long timeoutNanos = getAccordRepairTimeoutNanos();
long maxHlc =
AccordService.getBlocking(service.maxConflict(ranges).flatMap(conflict -> {
Timestamp conflictMax = mergeMax(conflict,
minForEpoch(this.minEpoch.getEpoch()));
- return service.sync("[repairId #" + repairId + ']',
conflictMax, Ranges.of(range), ids, NoLocal, syncRemote, timeoutNanos,
NANOSECONDS).map(ignored -> conflictMax.hlc());
+ return service.sync("[repairId #" + repairId + ']',
conflictMax, Ranges.of(range), null, NoLocal, syncRemote, timeoutNanos,
NANOSECONDS).map(ignored -> conflictMax.hlc());
}), ranges, bookkeeping, start, start + timeoutNanos);
waiting = null;
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 4a0481aa09..9b07e707d2 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -913,7 +913,8 @@ public class CommandSerializers
public static final UnversionedSerializer<SaveStatus> saveStatus =
EncodeAsVInt32.of(SaveStatus.class);
public static final UnversionedSerializer<Status> status =
EncodeAsVInt32.of(Status.class);
- public static final UnversionedSerializer<Durability> durability =
EncodeAsVInt32.of(Durability.class);
+ public static final UnversionedSerializer<Durability> durability =
EncodeAsVInt32.withoutNulls(Durability::encoded, Durability::forEncoded);
+ public static final UnversionedSerializer<Durability.HasOutcome>
outcomeDurability = EncodeAsVInt32.of(Durability.HasOutcome.class);
public static final IVersionedSerializer<Writes> writes = new
IVersionedSerializer<>()
{
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
index cf28b5a4dc..a1cb244b13 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
@@ -117,22 +117,22 @@ public class CommandStoreSerializers
@Override
public void serialize(DurableBefore.Entry t, DataOutputPlus out)
throws IOException
{
- CommandSerializers.txnId.serialize(t.majorityBefore, out);
+ CommandSerializers.txnId.serialize(t.quorumBefore, out);
CommandSerializers.txnId.serialize(t.universalBefore, out);
}
@Override
public DurableBefore.Entry deserialize(DataInputPlus in) throws
IOException
{
- TxnId majorityBefore = CommandSerializers.txnId.deserialize(in);
+ TxnId quorumBefore = CommandSerializers.txnId.deserialize(in);
TxnId universalBefore = CommandSerializers.txnId.deserialize(in);
- return new DurableBefore.Entry(majorityBefore, universalBefore);
+ return new DurableBefore.Entry(quorumBefore, universalBefore);
}
@Override
public long serializedSize(DurableBefore.Entry t)
{
- return CommandSerializers.txnId.serializedSize(t.majorityBefore)
+ return CommandSerializers.txnId.serializedSize(t.quorumBefore)
+
CommandSerializers.txnId.serializedSize(t.universalBefore);
}
}), DurableBefore.Entry[]::new, DurableBefore.SerializerSupport::create);
@@ -153,8 +153,11 @@ public class CommandStoreSerializers
{
CommandSerializers.txnId.serialize(bound, out);
}
- for (int status : b.statuses)
- out.writeShort(status);
+ for (int i = 0 ; i < b.bounds.length ; ++i)
+ {
+ out.writeShort(b.status(i * 2));
+ out.writeShort(b.status(i * 2 + 1));
+ }
}
@Override
@@ -190,7 +193,7 @@ public class CommandStoreSerializers
{
size += CommandSerializers.txnId.serializedSize(bound);
}
- size += 2L * b.statuses.length;
+ size += 2L * 2 * b.bounds.length;
return size;
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
index c3b5a84ba3..5e1ee94ea1 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
-import accord.api.RoutingKey;
-import accord.messages.InformDecided;
import accord.messages.InformDurable;
import accord.primitives.Route;
import accord.primitives.Status;
@@ -33,31 +31,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
public class InformSerializers
{
- public static final IVersionedSerializer<InformDecided> decided = new
IVersionedSerializer<>()
- {
- @Override
- public void serialize(InformDecided t, DataOutputPlus out, Version
version) throws IOException
- {
- CommandSerializers.txnId.serialize(t.txnId, out);
- KeySerializers.routingKey.serialize(t.homeKey, out);
- }
-
- @Override
- public InformDecided deserialize(DataInputPlus in, Version version)
throws IOException
- {
- TxnId txnId = CommandSerializers.txnId.deserialize(in);
- RoutingKey homeKey = KeySerializers.routingKey.deserialize(in);
- return new InformDecided(txnId, homeKey);
- }
-
- @Override
- public long serializedSize(InformDecided t, Version version)
- {
- return CommandSerializers.txnId.serializedSize(t.txnId)
- + KeySerializers.routingKey.serializedSize(t.homeKey);
- }
- };
-
public static final IVersionedSerializer<InformDurable> durable = new
TxnRequestSerializer<>()
{
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
index 60dbbc3c88..c855103b42 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
@@ -39,21 +39,21 @@ public class SetDurableSerializers
public void serialize(SetShardDurable msg, DataOutputPlus out) throws
IOException
{
syncPoint.serialize(msg.exclusiveSyncPoint, out);
- CommandSerializers.durability.serialize(msg.durability, out);
+ CommandSerializers.outcomeDurability.serialize(msg.durability,
out);
}
@Override
public SetShardDurable deserialize(DataInputPlus in) throws IOException
{
return new SetShardDurable(syncPoint.deserialize(in),
-
CommandSerializers.durability.deserialize(in));
+
CommandSerializers.outcomeDurability.deserialize(in));
}
@Override
public long serializedSize(SetShardDurable msg)
{
return syncPoint.serializedSize(msg.exclusiveSyncPoint)
- + CommandSerializers.durability.serializedSize(msg.durability);
+ +
CommandSerializers.outcomeDurability.serializedSize(msg.durability);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index a5f4206029..3076feca66 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -146,6 +146,7 @@ public class TopologySerializers
{
out.writeLong(topology.epoch());
CollectionSerializers.serializeList(topology.shards(), out, shard);
+ CollectionSerializers.serializeCollection(topology.removedIds(),
out, TopologySerializers.nodeId);
CollectionSerializers.serializeCollection(topology.staleIds(),
out, TopologySerializers.nodeId);
}
@@ -154,8 +155,9 @@ public class TopologySerializers
{
long epoch = in.readLong();
Shard[] shards = ArraySerializers.deserializeArray(in, shard,
Shard[]::new);
+ SortedArrayList<Node.Id> removedIds =
CollectionSerializers.deserializeSortedArrayList(in,
TopologySerializers.nodeId, Node.Id[]::new);
SortedArrayList<Node.Id> staleIds =
CollectionSerializers.deserializeSortedArrayList(in,
TopologySerializers.nodeId, Node.Id[]::new);
- return new Topology(epoch, staleIds, shards);
+ return new Topology(epoch, removedIds, staleIds, shards);
}
@Override
@@ -164,6 +166,7 @@ public class TopologySerializers
long size = 0;
size += TypeSizes.LONG_SIZE; // epoch
size +=
CollectionSerializers.serializedListSize(topology.shards(), shard);
+ size +=
CollectionSerializers.serializedCollectionSize(topology.removedIds(),
TopologySerializers.nodeId);
size +=
CollectionSerializers.serializedCollectionSize(topology.staleIds(),
TopologySerializers.nodeId);
return size;
}
@@ -175,6 +178,7 @@ public class TopologySerializers
public void serialize(Topology topology, DataOutputPlus out) throws
IOException
{
out.writeUnsignedVInt(topology.epoch());
+ CollectionSerializers.serializeList(topology.removedIds(), out,
TopologySerializers.nodeId);
CollectionSerializers.serializeList(topology.staleIds(), out,
TopologySerializers.nodeId);
List<Shard> shards = topology.shards();
@@ -228,6 +232,7 @@ public class TopologySerializers
public long serializedSize(Topology topology)
{
long size = TypeSizes.sizeofUnsignedVInt(topology.epoch());
+ size +=
CollectionSerializers.serializedListSize(topology.removedIds(),
TopologySerializers.nodeId);
size +=
CollectionSerializers.serializedListSize(topology.staleIds(),
TopologySerializers.nodeId);
List<Shard> shards = topology.shards();
@@ -279,7 +284,8 @@ public class TopologySerializers
public Topology deserialize(DataInputPlus in) throws IOException
{
long epoch = in.readUnsignedVInt();
- SortedArrays.SortedArrayList<Node.Id> staleNodes =
SortedArrays.SortedArrayList.copySorted(CollectionSerializers.deserializeList(in,
TopologySerializers.nodeId), Node.Id[]::new);
+ SortedArrays.SortedArrayList<Node.Id> removedIds =
SortedArrays.SortedArrayList.copySorted(CollectionSerializers.deserializeList(in,
TopologySerializers.nodeId), Node.Id[]::new);
+ SortedArrays.SortedArrayList<Node.Id> staleIds =
SortedArrays.SortedArrayList.copySorted(CollectionSerializers.deserializeList(in,
TopologySerializers.nodeId), Node.Id[]::new);
List<TokenRange> ranges =
CollectionSerializers.deserializeList(in, TokenRange.noTableSerializer);
@@ -300,7 +306,7 @@ public class TopologySerializers
int flags = in.readUnsignedVInt32();
shards[i] = Shard.SerializerSupport.create(range, nodes,
fromSimpleBitSet(nodes, notInFastPath, Node.Id[]::new), fromSimpleBitSet(nodes,
joining, Node.Id[]::new), new TinyEnumSet<>(flags));
}
- return new Topology(epoch, staleNodes, shards);
+ return new Topology(epoch, removedIds, staleIds, shards);
}
};
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 74ef6b1e13..4d5ab0b6a3 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -193,7 +193,14 @@ public class TxnData extends
Int2ObjectHashMap<TxnDataValue> implements TxnResul
else
{
if (result == null)
+ {
result = new TxnData();
+ for (Map.Entry<Integer, TxnDataValue> e2 : entrySet())
+ {
+ if (e2.getKey() == e.getKey()) break;
+ result.put(e2.getKey(), e2.getValue());
+ }
+ }
if (newValue != null)
result.put(e.getKey(), newValue);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
index e18524cf5d..5e81f79f38 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
@@ -62,7 +62,7 @@ public class TxnDataKeyValue extends FilteredPartition
implements TxnDataValue
@Override
public TxnDataValue without(Ranges ranges)
{
- return ranges.contains(new TokenKey(metadata().id,
partitionKey().getToken())) ? this : null;
+ return ranges.contains(new TokenKey(metadata().id,
partitionKey().getToken())) ? null : this;
}
@Override
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 19fcbfe529..1d500f497a 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -38,7 +38,7 @@ import accord.messages.TxnRequest;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.SaveStatus;
-import accord.primitives.Status;
+import accord.primitives.Status.Durability.HasOutcome;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.utils.async.AsyncChains;
@@ -111,8 +111,8 @@ public class AccordDebugKeyspaceTest extends CQLTester
private static final String QUERY_REDUNDANT_BEFORE =
String.format("SELECT * FROM %s.%s",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
- private static final String
QUERY_REDUNDANT_BEFORE_FILTER_MAJORITY_APPLIED_GEQ =
- String.format("SELECT * FROM %s.%s WHERE majority_applied >= ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
+ private static final String
QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ =
+ String.format("SELECT * FROM %s.%s WHERE quorum_applied >= ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
private static final String
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ =
String.format("SELECT * FROM %s.%s WHERE shard_applied >= ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
@@ -199,13 +199,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new
LongToken(1)), new TokenKey(tableId, new LongToken(100))));
Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new
LongToken(100)), new TokenKey(tableId, new LongToken(200))));
AsyncChains.getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)()
-> "Test", safeStore -> {
- safeStore.commandStore().markShardDurable(safeStore, syncId1,
ranges1, Status.Durability.Universal);
- safeStore.commandStore().markShardDurable(safeStore, syncId2,
ranges2, Status.Durability.Majority);
+ safeStore.commandStore().markShardDurable(safeStore, syncId1,
ranges1, HasOutcome.Universal);
+ safeStore.commandStore().markShardDurable(safeStore, syncId2,
ranges2, HasOutcome.Quorum);
}));
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE).size()).isGreaterThan(0);
-
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_MAJORITY_APPLIED_GEQ,
syncId1.toString()).size()).isEqualTo(2);
-
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_MAJORITY_APPLIED_GEQ,
syncId2.toString()).size()).isEqualTo(1);
+
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ,
syncId1.toString()).size()).isEqualTo(2);
+
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ,
syncId2.toString()).size()).isEqualTo(1);
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ,
syncId1.toString()).size()).isEqualTo(1);
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ,
syncId2.toString()).size()).isEqualTo(0);
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index a44196591b..5532a21b23 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -70,7 +70,7 @@ import org.apache.cassandra.service.accord.txn.TxnUpdate;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.utils.Pair;
-import static accord.primitives.Status.Durability.Majority;
+import static accord.primitives.Status.Durability.AllQuorums;
import static com.google.common.collect.Iterables.getOnlyElement;
import static
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
import static
org.apache.cassandra.service.accord.AccordTestUtils.Commands.preaccepted;
@@ -135,7 +135,7 @@ public class AccordCommandStoreTest
Command.WaitingOn waitingOn = new
Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps, new
ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2));
Pair<Writes, Result> result =
AsyncChains.getBlocking(AccordTestUtils.processTxnResult(commandStore, txnId,
txn, executeAt));
- Command expected = Command.Executed.executed(txnId,
SaveStatus.Applied, Majority, StoreParticipants.all(route),
+ Command expected = Command.Executed.executed(txnId,
SaveStatus.Applied, AllQuorums, StoreParticipants.all(route),
promised, executeAt, txn,
dependencies, accepted,
waitingOn, result.left,
ResultSerializers.APPLIED);
AccordSafeCommand safeCommand = new AccordSafeCommand(loaded(txnId,
null));
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 5129bff029..b806433dc1 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -120,7 +120,7 @@ import static
accord.api.ProtocolModifiers.Toggles.setTransitiveDependenciesAreV
import static accord.local.cfk.CommandsForKey.NO_BOUNDS_INFO;
import static accord.primitives.Known.KnownExecuteAt.ExecuteAtErased;
import static accord.primitives.Known.KnownExecuteAt.ExecuteAtUnknown;
-import static accord.primitives.Status.Durability.Majority;
+import static accord.primitives.Status.Durability.AllQuorums;
import static accord.primitives.Status.Durability.NotDurable;
import static accord.utils.Property.qt;
import static accord.utils.SortedArrays.Search.FAST;
@@ -182,7 +182,7 @@ public class CommandsForKeySerializerTest
builder.partialTxn(txn);
builder.setParticipants(StoreParticipants.all(txn.keys().toRoute(txn.keys().get(0).someIntersectingRoutingKey(null))));
- builder.durability(isDurable ? Majority : NotDurable);
+ builder.durability(isDurable ? AllQuorums : NotDurable);
if
(saveStatus.known.deps().hasPreAcceptedOrProposedOrDecidedDeps())
{
try (KeyDeps.Builder keyBuilder = KeyDeps.builder();)
@@ -196,7 +196,7 @@ public class CommandsForKeySerializerTest
builder.executeAt(executeAt);
builder.promised(ballot);
builder.acceptedOrCommitted(ballot);
- builder.durability(isDurable ? Majority : NotDurable);
+ builder.durability(isDurable ? AllQuorums : NotDurable);
if (saveStatus.compareTo(SaveStatus.Stable) >= 0 &&
!saveStatus.hasBeen(Status.Truncated))
builder.waitingOn(Command.WaitingOn.empty(txnId.domain()));
@@ -424,7 +424,7 @@ public class CommandsForKeySerializerTest
@Test
public void serde()
{
- testOne(-4567266914751633833L);
+ testOne(7082228630293368049L);
Random random = new Random();
for (int i = 0 ; i < 10000 ; ++i)
{
@@ -610,7 +610,7 @@ public class CommandsForKeySerializerTest
else unmanaged = CommandsForKey.NO_PENDING_UNMANAGED;
long maxUniqueHlc = rs.nextLong(0, Long.MAX_VALUE);
- CommandsForKey expected =
CommandsForKey.SerializerSupport.create(pk, info, maxUniqueHlc, unmanaged,
TxnId.NONE, NO_BOUNDS_INFO);
+ CommandsForKey expected =
CommandsForKey.SerializerSupport.create(pk, info, maxUniqueHlc, unmanaged,
TxnId.NONE, NO_BOUNDS_INFO, true);
ByteBuffer buffer = Serialize.toBytesWithoutKey(expected);
CommandsForKey roundTrip = Serialize.fromBytes(pk, buffer);
@@ -627,7 +627,7 @@ public class CommandsForKeySerializerTest
TxnId txnId = TxnId.fromValues(11,34052499,2,1);
CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk,
new TxnInfo[] {
TxnInfo.create(txnId, InternalStatus.PREACCEPTED_WITHOUT_DEPS, true, txnId,
TxnId.NO_TXNIDS, Ballot.ZERO) },
- 0,
CommandsForKey.NO_PENDING_UNMANAGED, TxnId.NONE, NO_BOUNDS_INFO);
+ 0,
CommandsForKey.NO_PENDING_UNMANAGED, TxnId.NONE, NO_BOUNDS_INFO, true);
ByteBuffer buffer = Serialize.toBytesWithoutKey(expected);
CommandsForKey roundTrip = Serialize.fromBytes(pk, buffer);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]