This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81bc18ad63 Enrich Durability with each phase, so we can both prune 
unapplied dependencies to mitigate replicas that are behind causing Deps 
growth, and more reliably avoid initialising recovery progress log state of 
transactions that cannot yet make progress Also fix:  - Harden AccordExecutor 
state cleanup to failures  - Handle SAVING state in AccordCache.tryEvict, as 
now possible to save for reasons besides eviction so normal to both be in evict 
queue and saving  - Infer invalid [...]
81bc18ad63 is described below

commit 81bc18ad63123311d71fbfcb99385855dc2c0944
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Aug 8 10:25:17 2025 +0100

    Enrich Durability with each phase, so we can both prune unapplied 
dependencies to mitigate replicas that are behind causing Deps growth,
    and more reliably avoid initialising recovery progress log state of 
transactions that cannot yet make progress
    Also fix:
     - Harden AccordExecutor state cleanup to failures
     - Handle SAVING state in AccordCache.tryEvict, as now possible to save for 
reasons besides eviction so normal to both be in evict queue and saving
     - Infer invalid in MaybeRecover and FetchData
     - MaybeRecover sometimes aborts before home shard knows outcome
     - Epoch sync with VisibilitySyncPoint
     - Retired implies synced
     - Don't interpret force repair as excluding nodes from Accord sync 
conditions
     - TxnData.without
    Also improve:
     - Add Topology.removedNodes
     - If Durability implies we can fetch a status, update the waiting state to 
fetch it
     - DurableBefore debug table should have searchable txnId
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20832
---
 modules/accord                                     |  2 +-
 .../cassandra/db/virtual/AccordDebugKeyspace.java  | 14 ++++----
 src/java/org/apache/cassandra/net/Verb.java        |  1 -
 .../org/apache/cassandra/repair/RepairJob.java     |  2 +-
 .../cassandra/service/accord/AccordCache.java      |  2 ++
 .../service/accord/AccordConfigurationService.java |  8 ++---
 .../cassandra/service/accord/AccordExecutor.java   | 38 +++++++++++++---------
 .../accord/AccordExecutorAbstractLockLoop.java     |  6 ++--
 .../service/accord/AccordExecutorSimple.java       |  2 +-
 .../cassandra/service/accord/AccordJournal.java    |  6 ++--
 .../service/accord/AccordMessageSink.java          |  1 -
 .../service/accord/AccordObjectSizes.java          |  4 +--
 .../cassandra/service/accord/AccordService.java    |  5 +--
 .../cassandra/service/accord/AccordTask.java       |  7 ++--
 .../cassandra/service/accord/AccordTopology.java   |  8 ++++-
 .../service/accord/repair/AccordRepair.java        | 23 +++----------
 .../accord/serializers/CommandSerializers.java     |  3 +-
 .../serializers/CommandStoreSerializers.java       | 17 ++++++----
 .../accord/serializers/InformSerializers.java      | 27 ---------------
 .../accord/serializers/SetDurableSerializers.java  |  6 ++--
 .../accord/serializers/TopologySerializers.java    | 12 +++++--
 .../cassandra/service/accord/txn/TxnData.java      |  7 ++++
 .../service/accord/txn/TxnDataKeyValue.java        |  2 +-
 .../db/virtual/AccordDebugKeyspaceTest.java        | 14 ++++----
 .../service/accord/AccordCommandStoreTest.java     |  4 +--
 .../serializers/CommandsForKeySerializerTest.java  | 12 +++----
 26 files changed, 110 insertions(+), 123 deletions(-)

diff --git a/modules/accord b/modules/accord
index 12da4693b4..b86550c154 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 12da4693b449b30d0420673579a06025b7b3e484
+Subproject commit b86550c154758e5dd223fa8101a2a83005b644a6
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index fa3101741e..efe683af71 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -129,7 +129,7 @@ import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STOR
 import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
 import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
 import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
-import static accord.local.RedundantStatus.Property.MAJORITY_APPLIED;
+import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
 import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
 import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
 import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
@@ -459,8 +459,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         "  table_name text,\n" +
                         "  token_start 'TokenUtf8Type',\n" +
                         "  token_end 'TokenUtf8Type',\n" +
-                        "  majority_before text,\n" +
-                        "  universal_before text,\n" +
+                        "  quorum 'TxnIdUtf8Type',\n" +
+                        "  universal 'TxnIdUtf8Type',\n" +
                         "  PRIMARY KEY (keyspace_name, table_name, 
token_start)" +
                         ')', UTF8Type.instance));
         }
@@ -475,8 +475,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                     TableMetadata tableMetadata = tableMetadata(tableId);
                     ds.row(keyspace(tableMetadata), table(tableId, 
tableMetadata), printToken(start))
                       .column("token_end", printToken(end))
-                      .column("majority_before", 
entry.majorityBefore.toString())
-                      .column("universal_before", 
entry.universalBefore.toString());
+                      .column("quorum", entry.quorumBefore.toString())
+                      .column("universal", entry.universalBefore.toString());
                     return ds;
                 },
                 new SimpleDataSet(metadata()),
@@ -751,7 +751,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         "  end_epoch bigint,\n" +
                         "  gc_before 'TxnIdUtf8Type',\n" +
                         "  shard_applied 'TxnIdUtf8Type',\n" +
-                        "  majority_applied 'TxnIdUtf8Type',\n" +
+                        "  quorum_applied 'TxnIdUtf8Type',\n" +
                         "  locally_applied 'TxnIdUtf8Type',\n" +
                         "  locally_durable_to_command_store 
'TxnIdUtf8Type',\n" +
                         "  locally_durable_to_data_store 'TxnIdUtf8Type',\n" +
@@ -786,7 +786,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                           .column("end_epoch", entry.endEpoch)
                           .column("gc_before", 
entry.maxBound(GC_BEFORE).toString())
                           .column("shard_applied", 
entry.maxBound(SHARD_APPLIED).toString())
-                          .column("majority_applied", 
entry.maxBound(MAJORITY_APPLIED).toString())
+                          .column("quorum_applied", 
entry.maxBound(QUORUM_APPLIED).toString())
                           .column("locally_applied", 
entry.maxBound(LOCALLY_APPLIED).toString())
                           .column("locally_durable_to_command_store", 
entry.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE).toString())
                           .column("locally_durable_to_data_store", 
entry.maxBound(LOCALLY_DURABLE_TO_DATA_STORE).toString())
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 6613bf3c33..d24c9e64ad 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -340,7 +340,6 @@ public enum Verb
     ACCORD_RECOVER_AWAIT_RSP        (141, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(AwaitSerializers.recoverReply),        
AccordService::responseHandlerOrNoop                                           
),
     ACCORD_RECOVER_AWAIT_REQ        (142, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(AwaitSerializers.recoverRequest),      
AccordService::requestHandlerOrNoop, ACCORD_RECOVER_AWAIT_RSP),
     ACCORD_INFORM_DURABLE_REQ       (143, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(InformSerializers.durable),            
AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP                         
),
-    ACCORD_INFORM_DECIDED_REQ       (171, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(InformSerializers.decided),            
AccordService::requestHandlerOrNoop                                            
),
     ACCORD_CHECK_STATUS_RSP         (144, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(CheckStatusSerializers.reply),         
AccordService::responseHandlerOrNoop                                           
),
     ACCORD_CHECK_STATUS_REQ         (145, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(CheckStatusSerializers.request),       
AccordService::requestHandlerOrNoop, ACCORD_CHECK_STATUS_RSP                   
),
     ACCORD_FETCH_DATA_RSP           (146, P2, writeTimeout, IMMEDIATE,         
 () -> accordEmbedded(FetchSerializers.reply),               
AccordService::responseHandlerOrNoop                                           
),
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 4bebca17e4..ca6770998e 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -192,7 +192,7 @@ public class RepairJob extends AsyncFuture<RepairResult> 
implements Runnable
                         requireAllEndpoints = true;
                 }
                 logger.info("{} {}.{} starting accord repair, require all 
endpoints {}", session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily, requireAllEndpoints);
-                AccordRepair repair = new AccordRepair(ctx, cfs, 
desc.sessionId, desc.keyspace, desc.ranges, requireAllEndpoints, allEndpoints);
+                AccordRepair repair = new AccordRepair(ctx, cfs, 
desc.sessionId, desc.keyspace, desc.ranges, requireAllEndpoints);
                 return repair.repair(taskExecutor).flatMap(accordRepairResult 
-> {
                     // Propagate the HLC discovered during Accord repair to 
Paxos so Paxos doesn't use ballots < Accord has already used
                     if (accordRepairResult.maxHlc != IAccordService.NO_HLC)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index c96a4e685c..991c610cc1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -267,9 +267,11 @@ public class AccordCache implements CacheSize
                 break;
             case MODIFIED:
                 node.save();
+            case SAVING: // we can be in evict queue and already be saving if 
save was requested for durability rather than eviction
                 boolean evict = node.status() == LOADED;
                 node.unlink();
                 if (evict) evict(node, true);
+                break;
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 84f348c4ae..9f3563e367 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -452,13 +452,11 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     {
         long epoch = topology.epoch();
         EpochState epochState = getOrCreateEpochState(epoch);
-        if (!startSync || epochState.syncStatus != SyncStatus.NOT_STARTED)
-            return;
-
         synchronized (this)
         {
-            if (epochState.syncStatus != SyncStatus.NOT_STARTED)
+            if (!startSync || epochState.syncStatus != SyncStatus.NOT_STARTED)
                 return;
+
             epochState.setSyncStatus(SyncStatus.NOTIFYING);
         }
 
@@ -651,7 +649,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     public Future<Void> unsafeLocalSyncNotified(long epoch)
     {
         AsyncPromise<Void> promise = new AsyncPromise<>();
-        getOrCreateEpochState(epoch).localSyncNotified().invoke((result, 
failure) -> {
+        getOrCreateEpochState(epoch).localSyncNotified().begin((result, 
failure) -> {
             if (failure != null) promise.tryFailure(failure);
             else promise.trySuccess(result);
         });
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 431c1639d3..2cd465c612 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -566,8 +566,8 @@ public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTa
     void submitExclusive(AccordTask<?> task)
     {
         assignQueuePosition(task);
-        ++tasks;
         task.setupExclusive();
+        ++tasks;
         updateQueue(task);
         enqueueLoadsExclusive();
     }
@@ -616,21 +616,26 @@ public abstract class AccordExecutor implements 
CacheSize, LoadExecutor<AccordTa
         task.queuePosition = parent.queuePosition;
     }
 
-    void completeTaskExclusive(Task task, boolean cleanupTask)
+    void completeTaskExclusive(Task task)
     {
-        --tasks;
-
         // for integration with SequentialExecutor, we must :
         //  - first take the position so that represents the just-executed task
         //  - call cleanup to submit any following task on the relevant 
sub-queue
         //  - remove the previous task from the running collection only if 
still present (SequentialExecutor will have removed it)
         int position = task.queuePosition;
-        if (cleanupTask) task.cleanupExclusive();
-        if (running.contains(task))
-            running.remove(task);
+        try
+        {
+            task.cleanupExclusive();
+        }
+        finally
+        {
+            --tasks;
+            if (running.contains(task))
+                running.remove(task);
 
-        if (waitingForCompletion != null && 
waitingForCompletion.peek().maybeNotify - position >= 0)
-            maybeNotifyWaitingForCompletion();
+            if (waitingForCompletion != null && 
waitingForCompletion.peek().maybeNotify - position >= 0)
+                maybeNotifyWaitingForCompletion();
+        }
     }
 
     private void maybeNotifyWaitingForCompletion()
@@ -655,14 +660,15 @@ public abstract class AccordExecutor implements 
CacheSize, LoadExecutor<AccordTa
         return task == null ? min : Integer.min(task.queuePosition, min);
     }
 
-    private void cancelExclusive(AccordTask<?> task)
+    void cancelExclusive(AccordTask<?> task)
     {
         switch (task.state())
         {
             default: throw new UnhandledEnum(task.state());
             case INITIALIZED:
                 // we could be cancelled before we even reach the queue
-                task.cancelExclusive();
+                try { task.cancelExclusive(); }
+                finally { task.cleanupExclusive(); }
                 break;
 
             case SCANNING_RANGES:
@@ -671,8 +677,8 @@ public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTa
             case WAITING_TO_SCAN_RANGES:
             case WAITING_TO_RUN:
                 task.unqueueIfQueued();
-                task.cancelExclusive();
-                completeTaskExclusive(task, false);
+                try { task.cancelExclusive(); }
+                finally { completeTaskExclusive(task); }
                 break;
 
             case FAILING:
@@ -715,7 +721,7 @@ public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTa
         finally
         {
             task.unqueueIfQueued();
-            completeTaskExclusive(task, true);
+            completeTaskExclusive(task);
         }
     }
 
@@ -1000,9 +1006,9 @@ public abstract class AccordExecutor implements 
CacheSize, LoadExecutor<AccordTa
                 owner = null;
                 running = false;
                 task = super.poll();
+                AccordExecutor.this.running.remove(selfTask);
                 if (task != null)
                 {
-                    AccordExecutor.this.running.remove(selfTask);
                     selfTask.queuePosition = task.queuePosition;
                     waitingToRun.append(selfTask);
                 }
@@ -1241,7 +1247,7 @@ public abstract class AccordExecutor implements 
CacheSize, LoadExecutor<AccordTa
             if (queue.contains(this))
             {
                 queue.remove(this);
-                completeTaskExclusive(this, true);
+                completeTaskExclusive(this);
                 try { fail(new CancellationException()); }
                 catch (Throwable t) { agent.onUncaughtException(t); }
             }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
index 31583de8a5..2b80cbac8d 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
@@ -180,7 +180,7 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                                 }
                                 finally
                                 {
-                                    completeTaskExclusive(task, true);
+                                    completeTaskExclusive(task);
                                     clearRunning();
                                 }
                             }
@@ -234,7 +234,7 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                         {
                             Task tmp = task;
                             task = null;
-                            completeTaskExclusive(tmp, true);
+                            completeTaskExclusive(tmp);
                             clearRunning();
                         }
                         else resumeExclusive();
@@ -270,7 +270,7 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                         {
                             try { task.fail(t); }
                             catch (Throwable t2) { t.addSuppressed(t2); }
-                            try { completeTaskExclusive(task, true); }
+                            try { completeTaskExclusive(task); }
                             catch (Throwable t2) { t.addSuppressed(t2); }
                             try { agent.onUncaughtException(t); }
                             catch (Throwable t2) { /* nothing we can sensibly 
do after already reporting */ }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
index 202dcb5f86..11a1d04ca0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
@@ -96,7 +96,7 @@ class AccordExecutorSimple extends AccordExecutor
                 catch (Throwable t) { task.fail(t); }
                 finally
                 {
-                    completeTaskExclusive(task, true);
+                    completeTaskExclusive(task);
                     active = null;
                 }
             }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 08819934d0..8adeb6ca87 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -816,7 +816,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                         out.writeByte(command.saveStatus().ordinal());
                         break;
                     case DURABILITY:
-                        out.writeByte(command.durability().ordinal());
+                        out.writeByte(command.durability().encoded());
                         break;
                     case ACCEPTED:
                         
CommandSerializers.ballot.serialize(command.acceptedOrCommitted(), out);
@@ -954,7 +954,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                         break;
                     case DURABILITY:
                         Invariants.require(durability != null, "%s", this);
-                        out.writeByte(durability.ordinal());
+                        out.writeByte(durability.encoded());
                         break;
                     case ACCEPTED:
                         Invariants.require(acceptedOrCommitted != null, "%s", 
this);
@@ -1035,7 +1035,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                     saveStatus = SaveStatus.values()[in.readByte()];
                     break;
                 case DURABILITY:
-                    durability = Durability.values()[in.readByte()];
+                    durability = Durability.forEncoded(in.readUnsignedByte());
                     break;
                 case ACCEPTED:
                     acceptedOrCommitted = 
CommandSerializers.ballot.deserialize(in);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index c57b6a27b3..19e37f06c3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -120,7 +120,6 @@ public class AccordMessageSink implements MessageSink
             builder.put(WAIT_UNTIL_APPLIED_REQ,                   
Verb.ACCORD_WAIT_UNTIL_APPLIED_REQ);
             builder.put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ,        
Verb.ACCORD_APPLY_AND_WAIT_REQ);
             builder.put(INFORM_DURABLE_REQ,                       
Verb.ACCORD_INFORM_DURABLE_REQ);
-            builder.put(INFORM_DECIDED_REQ,                       
Verb.ACCORD_INFORM_DECIDED_REQ);
             builder.put(CHECK_STATUS_REQ,                         
Verb.ACCORD_CHECK_STATUS_REQ);
             builder.put(CHECK_STATUS_RSP,                         
Verb.ACCORD_CHECK_STATUS_RSP);
             builder.put(FETCH_DATA_REQ,                           
Verb.ACCORD_FETCH_DATA_REQ);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 02185431c6..ac112186f9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -221,7 +221,7 @@ public class AccordObjectSizes
         }
     }
 
-    private static final long EMPTY_TXN = measure(new 
PartialTxn.InMemory(null, null, null, null, null, 
TableMetadatasAndKeys.none(Domain.Key)));
+    private static final long EMPTY_TXN = measure(new 
PartialTxn.InMemory(Kind.Read, null, null, null, null, 
TableMetadatasAndKeys.none(Domain.Key)));
     public static long txn(PartialTxn txn)
     {
         long size = EMPTY_TXN;
@@ -316,7 +316,7 @@ public class AccordObjectSizes
                 builder.partialDeps(PartialDeps.NONE);
 
             if (hasTxn)
-                builder.partialTxn(new PartialTxn.InMemory(null, null, null, 
null, null, TableMetadatasAndKeys.none(Domain.Key)));
+                builder.partialTxn(new PartialTxn.InMemory(Kind.Read, null, 
null, null, null, TableMetadatasAndKeys.none(Domain.Key)));
 
             if (executes)
             {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 8db4a44784..a630a39b60 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -139,6 +139,7 @@ import static accord.local.LoadKeysFor.READ_WRITE;
 import static accord.local.durability.DurabilityService.SyncLocal.Self;
 import static accord.local.durability.DurabilityService.SyncRemote.All;
 import static accord.messages.SimpleReply.Ok;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static accord.primitives.Txn.Kind.Write;
 import static accord.primitives.TxnId.Cardinality.cardinality;
 import static accord.topology.TopologyManager.TopologyRange;
@@ -533,7 +534,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     @Override
     public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, 
Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal 
syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit 
timeoutUnits)
     {
-        return node.durability().sync(requestedBy, minBound, ranges, include, 
syncLocal, syncRemote, timeout, timeoutUnits);
+        return node.durability().sync(requestedBy, ExclusiveSyncPoint, 
minBound, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
     }
 
     @Override
@@ -1068,7 +1069,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         long startedAt = nanoTime();
         long deadline = startedAt + timeout;
         // TODO (required): relax this requirement - too expensive
-        getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " + 
epoch + ')', TxnId.minForEpoch(epoch), ranges, Self, All, 
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges, 
new LatencyRequestBookkeeping(null), startedAt, deadline, false);
+        getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " + 
epoch + ')', ExclusiveSyncPoint, TxnId.minForEpoch(epoch), ranges, Self, All, 
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges, 
new LatencyRequestBookkeeping(null), startedAt, deadline, false);
     }
 
     public Params journalConfiguration()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java 
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index 1153599ff1..55d5e8b1aa 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -754,9 +754,9 @@ public abstract class AccordTask<R> extends SubmittableTask 
implements Function<
 
     protected void cleanupExclusive()
     {
-        releaseResources(commandStore.cachesExclusive());
         if (state == FAILING)
             state(FAILED);
+        releaseResources(commandStore.cachesExclusive());
     }
 
     @Nullable
@@ -778,7 +778,6 @@ public abstract class AccordTask<R> extends SubmittableTask 
implements Function<
 
     public void cancelExclusive()
     {
-        releaseResources(commandStore.cachesExclusive());
         state(CANCELLED);
         if (callback != null)
             callback.accept(null, new CancellationException());
@@ -786,7 +785,7 @@ public abstract class AccordTask<R> extends SubmittableTask 
implements Function<
 
     void cancelExclusive(AccordExecutor owner)
     {
-        owner.cancel(this);
+        owner.cancelExclusive(this);
     }
 
     public State state()
@@ -839,7 +838,7 @@ public abstract class AccordTask<R> extends SubmittableTask 
implements Function<
         catch (Throwable t)
         {
             releaseResourcesSlow(caches, t);
-            throw t;
+            commandStore.agent().onUncaughtException(t);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java 
b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index ebfd99fd8a..ca817e647d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
+import accord.local.Node;
 import accord.local.Node.Id;
 import accord.primitives.Ranges;
 import accord.topology.Shard;
@@ -307,7 +308,12 @@ public class AccordTopology
         }
 
         res.sort((a, b) -> a.range.compare(b.range));
-        return new Topology(epoch.getEpoch(), 
SortedArrayList.copyUnsorted(staleReplicas.ids(), Id[]::new), res.toArray(new 
Shard[0]));
+        List<Node.Id> removed = directory.removedNodes().stream()
+                                         .filter(n -> 
n.removedIn.equals(epoch))
+                                         .map(n -> tcmIdToAccord(n.id))
+                                         .collect(Collectors.toList());
+
+        return new Topology(epoch.getEpoch(), 
SortedArrayList.copySorted(removed, Id[]::new), 
SortedArrayList.copyUnsorted(staleReplicas.ids(), Id[]::new), res.toArray(new 
Shard[0]));
     }
 
     public static Topology createAccordTopology(ClusterMetadata metadata, 
ShardLookup lookup)
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java 
b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
index e6020359b8..4cddc7c1ff 100644
--- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
@@ -22,21 +22,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.local.Node;
-import accord.local.durability.DurabilityService;
+import accord.local.durability.DurabilityService.SyncRemote;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.LatencyMetrics;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
@@ -68,29 +62,25 @@ import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordRepairTime
  */
 public class AccordRepair
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordRepair.class);
-
     private final SharedContext ctx;
     private final ColumnFamilyStore cfs;
     private final TimeUUID repairId;
 
     private final Ranges ranges;
 
-    private final boolean requireAllEndpoints;
-    private final List<InetAddressAndPort> endpoints;
+    private final SyncRemote syncRemote;
 
     private final Epoch minEpoch = ClusterMetadata.current().epoch;
 
     private volatile Throwable shouldAbort = null;
     private volatile Thread waiting;
 
-    public AccordRepair(SharedContext ctx, ColumnFamilyStore cfs, TimeUUID 
repairId, String keyspace, Collection<Range<Token>> ranges, boolean 
requireAllEndpoints, List<InetAddressAndPort> endpoints)
+    public AccordRepair(SharedContext ctx, ColumnFamilyStore cfs, TimeUUID 
repairId, String keyspace, Collection<Range<Token>> ranges, boolean 
requireAllEndpoints)
     {
         this.ctx = ctx;
         this.cfs = cfs;
         this.repairId = repairId;
-        this.requireAllEndpoints = requireAllEndpoints;
-        this.endpoints = endpoints;
+        this.syncRemote = requireAllEndpoints ? All : Quorum;
         this.ranges = AccordTopology.toAccordRanges(keyspace, ranges);
     }
 
@@ -151,9 +141,6 @@ public class AccordRepair
     private Pair<List<accord.primitives.Range>, Long> repairRange(TokenRange 
range) throws Throwable
     {
         List<accord.primitives.Range> repairedRanges = new ArrayList<>();
-        List<Node.Id> ids = endpoints == null ? null : 
endpoints.stream().map(AccordService.instance().configService()::mappedId).collect(Collectors.toList());
-        DurabilityService.SyncRemote syncRemote = requireAllEndpoints ? All : 
Quorum;
-
         if (shouldAbort != null)
             throw shouldAbort;
 
@@ -177,7 +164,7 @@ public class AccordRepair
             long timeoutNanos = getAccordRepairTimeoutNanos();
             long maxHlc = 
AccordService.getBlocking(service.maxConflict(ranges).flatMap(conflict -> {
                 Timestamp conflictMax = mergeMax(conflict, 
minForEpoch(this.minEpoch.getEpoch()));
-                return service.sync("[repairId #" + repairId + ']', 
conflictMax, Ranges.of(range), ids, NoLocal, syncRemote, timeoutNanos, 
NANOSECONDS).map(ignored -> conflictMax.hlc());
+                return service.sync("[repairId #" + repairId + ']', 
conflictMax, Ranges.of(range), null, NoLocal, syncRemote, timeoutNanos, 
NANOSECONDS).map(ignored -> conflictMax.hlc());
             }), ranges, bookkeeping, start, start + timeoutNanos);
             waiting = null;
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 4a0481aa09..9b07e707d2 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -913,7 +913,8 @@ public class CommandSerializers
 
     public static final UnversionedSerializer<SaveStatus> saveStatus = 
EncodeAsVInt32.of(SaveStatus.class);
     public static final UnversionedSerializer<Status> status = 
EncodeAsVInt32.of(Status.class);
-    public static final UnversionedSerializer<Durability> durability = 
EncodeAsVInt32.of(Durability.class);
+    public static final UnversionedSerializer<Durability> durability = 
EncodeAsVInt32.withoutNulls(Durability::encoded, Durability::forEncoded);
+    public static final UnversionedSerializer<Durability.HasOutcome> 
outcomeDurability = EncodeAsVInt32.of(Durability.HasOutcome.class);
 
     public static final IVersionedSerializer<Writes> writes = new 
IVersionedSerializer<>()
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
index cf28b5a4dc..a1cb244b13 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
@@ -117,22 +117,22 @@ public class CommandStoreSerializers
         @Override
         public void serialize(DurableBefore.Entry t, DataOutputPlus out) 
throws IOException
         {
-            CommandSerializers.txnId.serialize(t.majorityBefore, out);
+            CommandSerializers.txnId.serialize(t.quorumBefore, out);
             CommandSerializers.txnId.serialize(t.universalBefore, out);
         }
 
         @Override
         public DurableBefore.Entry deserialize(DataInputPlus in) throws 
IOException
         {
-            TxnId majorityBefore = CommandSerializers.txnId.deserialize(in);
+            TxnId quorumBefore = CommandSerializers.txnId.deserialize(in);
             TxnId universalBefore = CommandSerializers.txnId.deserialize(in);
-            return new DurableBefore.Entry(majorityBefore, universalBefore);
+            return new DurableBefore.Entry(quorumBefore, universalBefore);
         }
 
         @Override
         public long serializedSize(DurableBefore.Entry t)
         {
-            return   CommandSerializers.txnId.serializedSize(t.majorityBefore)
+            return   CommandSerializers.txnId.serializedSize(t.quorumBefore)
                    + 
CommandSerializers.txnId.serializedSize(t.universalBefore);
         }
     }), DurableBefore.Entry[]::new, DurableBefore.SerializerSupport::create);
@@ -153,8 +153,11 @@ public class CommandStoreSerializers
             {
                 CommandSerializers.txnId.serialize(bound, out);
             }
-            for (int status : b.statuses)
-                out.writeShort(status);
+            for (int i = 0 ; i < b.bounds.length ; ++i)
+            {
+                out.writeShort(b.status(i * 2));
+                out.writeShort(b.status(i * 2 + 1));
+            }
         }
 
         @Override
@@ -190,7 +193,7 @@ public class CommandStoreSerializers
             {
                 size += CommandSerializers.txnId.serializedSize(bound);
             }
-            size += 2L * b.statuses.length;
+            size += 2L * 2 * b.bounds.length;
             return size;
         }
     };
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
index c3b5a84ba3..5e1ee94ea1 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/InformSerializers.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
 
-import accord.api.RoutingKey;
-import accord.messages.InformDecided;
 import accord.messages.InformDurable;
 import accord.primitives.Route;
 import accord.primitives.Status;
@@ -33,31 +31,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class InformSerializers
 {
-    public static final IVersionedSerializer<InformDecided> decided = new 
IVersionedSerializer<>()
-    {
-        @Override
-        public void serialize(InformDecided t, DataOutputPlus out, Version 
version) throws IOException
-        {
-            CommandSerializers.txnId.serialize(t.txnId, out);
-            KeySerializers.routingKey.serialize(t.homeKey, out);
-        }
-
-        @Override
-        public InformDecided deserialize(DataInputPlus in, Version version) 
throws IOException
-        {
-            TxnId txnId = CommandSerializers.txnId.deserialize(in);
-            RoutingKey homeKey = KeySerializers.routingKey.deserialize(in);
-            return new InformDecided(txnId, homeKey);
-        }
-
-        @Override
-        public long serializedSize(InformDecided t, Version version)
-        {
-            return CommandSerializers.txnId.serializedSize(t.txnId)
-                   + KeySerializers.routingKey.serializedSize(t.homeKey);
-        }
-    };
-
     public static final IVersionedSerializer<InformDurable> durable = new 
TxnRequestSerializer<>()
     {
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
index 60dbbc3c88..c855103b42 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
@@ -39,21 +39,21 @@ public class SetDurableSerializers
         public void serialize(SetShardDurable msg, DataOutputPlus out) throws 
IOException
         {
             syncPoint.serialize(msg.exclusiveSyncPoint, out);
-            CommandSerializers.durability.serialize(msg.durability, out);
+            CommandSerializers.outcomeDurability.serialize(msg.durability, 
out);
         }
 
         @Override
         public SetShardDurable deserialize(DataInputPlus in) throws IOException
         {
             return new SetShardDurable(syncPoint.deserialize(in),
-                                       
CommandSerializers.durability.deserialize(in));
+                                       
CommandSerializers.outcomeDurability.deserialize(in));
         }
 
         @Override
         public long serializedSize(SetShardDurable msg)
         {
             return syncPoint.serializedSize(msg.exclusiveSyncPoint)
-                + CommandSerializers.durability.serializedSize(msg.durability);
+                + 
CommandSerializers.outcomeDurability.serializedSize(msg.durability);
         }
     };
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index a5f4206029..3076feca66 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -146,6 +146,7 @@ public class TopologySerializers
         {
             out.writeLong(topology.epoch());
             CollectionSerializers.serializeList(topology.shards(), out, shard);
+            CollectionSerializers.serializeCollection(topology.removedIds(), 
out, TopologySerializers.nodeId);
             CollectionSerializers.serializeCollection(topology.staleIds(), 
out, TopologySerializers.nodeId);
         }
 
@@ -154,8 +155,9 @@ public class TopologySerializers
         {
             long epoch = in.readLong();
             Shard[] shards = ArraySerializers.deserializeArray(in, shard, 
Shard[]::new);
+            SortedArrayList<Node.Id> removedIds = 
CollectionSerializers.deserializeSortedArrayList(in, 
TopologySerializers.nodeId, Node.Id[]::new);
             SortedArrayList<Node.Id> staleIds = 
CollectionSerializers.deserializeSortedArrayList(in, 
TopologySerializers.nodeId, Node.Id[]::new);
-            return new Topology(epoch, staleIds, shards);
+            return new Topology(epoch, removedIds, staleIds, shards);
         }
 
         @Override
@@ -164,6 +166,7 @@ public class TopologySerializers
             long size = 0;
             size += TypeSizes.LONG_SIZE; // epoch
             size += 
CollectionSerializers.serializedListSize(topology.shards(), shard);
+            size += 
CollectionSerializers.serializedCollectionSize(topology.removedIds(), 
TopologySerializers.nodeId);
             size += 
CollectionSerializers.serializedCollectionSize(topology.staleIds(), 
TopologySerializers.nodeId);
             return size;
         }
@@ -175,6 +178,7 @@ public class TopologySerializers
         public void serialize(Topology topology, DataOutputPlus out) throws 
IOException
         {
             out.writeUnsignedVInt(topology.epoch());
+            CollectionSerializers.serializeList(topology.removedIds(), out, 
TopologySerializers.nodeId);
             CollectionSerializers.serializeList(topology.staleIds(), out, 
TopologySerializers.nodeId);
 
             List<Shard> shards = topology.shards();
@@ -228,6 +232,7 @@ public class TopologySerializers
         public long serializedSize(Topology topology)
         {
             long size = TypeSizes.sizeofUnsignedVInt(topology.epoch());
+            size += 
CollectionSerializers.serializedListSize(topology.removedIds(), 
TopologySerializers.nodeId);
             size += 
CollectionSerializers.serializedListSize(topology.staleIds(), 
TopologySerializers.nodeId);
 
             List<Shard> shards = topology.shards();
@@ -279,7 +284,8 @@ public class TopologySerializers
         public Topology deserialize(DataInputPlus in) throws IOException
         {
             long epoch = in.readUnsignedVInt();
-            SortedArrays.SortedArrayList<Node.Id> staleNodes = 
SortedArrays.SortedArrayList.copySorted(CollectionSerializers.deserializeList(in,
 TopologySerializers.nodeId), Node.Id[]::new);
+            SortedArrays.SortedArrayList<Node.Id> removedIds = 
SortedArrays.SortedArrayList.copySorted(CollectionSerializers.deserializeList(in,
 TopologySerializers.nodeId), Node.Id[]::new);
+            SortedArrays.SortedArrayList<Node.Id> staleIds = 
SortedArrays.SortedArrayList.copySorted(CollectionSerializers.deserializeList(in,
 TopologySerializers.nodeId), Node.Id[]::new);
 
             List<TokenRange> ranges = 
CollectionSerializers.deserializeList(in, TokenRange.noTableSerializer);
 
@@ -300,7 +306,7 @@ public class TopologySerializers
                 int flags = in.readUnsignedVInt32();
                 shards[i] = Shard.SerializerSupport.create(range, nodes, 
fromSimpleBitSet(nodes, notInFastPath, Node.Id[]::new), fromSimpleBitSet(nodes, 
joining, Node.Id[]::new), new TinyEnumSet<>(flags));
             }
-            return new Topology(epoch, staleNodes, shards);
+            return new Topology(epoch, removedIds, staleIds, shards);
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 74ef6b1e13..4d5ab0b6a3 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -193,7 +193,14 @@ public class TxnData extends 
Int2ObjectHashMap<TxnDataValue> implements TxnResul
             else
             {
                 if (result == null)
+                {
                     result = new TxnData();
+                    for (Map.Entry<Integer, TxnDataValue> e2 : entrySet())
+                    {
+                        if (e2.getKey() == e.getKey()) break;
+                        result.put(e2.getKey(), e2.getValue());
+                    }
+                }
                 if (newValue != null)
                     result.put(e.getKey(), newValue);
             }
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
index e18524cf5d..5e81f79f38 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataKeyValue.java
@@ -62,7 +62,7 @@ public class TxnDataKeyValue extends FilteredPartition 
implements TxnDataValue
     @Override
     public TxnDataValue without(Ranges ranges)
     {
-        return ranges.contains(new TokenKey(metadata().id, 
partitionKey().getToken())) ? this : null;
+        return ranges.contains(new TokenKey(metadata().id, 
partitionKey().getToken())) ? null : this;
     }
 
     @Override
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 19fcbfe529..1d500f497a 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -38,7 +38,7 @@ import accord.messages.TxnRequest;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.SaveStatus;
-import accord.primitives.Status;
+import accord.primitives.Status.Durability.HasOutcome;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.utils.async.AsyncChains;
@@ -111,8 +111,8 @@ public class AccordDebugKeyspaceTest extends CQLTester
     private static final String QUERY_REDUNDANT_BEFORE =
         String.format("SELECT * FROM %s.%s", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
 
-    private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_MAJORITY_APPLIED_GEQ =
-        String.format("SELECT * FROM %s.%s WHERE majority_applied >= ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
+    private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ =
+        String.format("SELECT * FROM %s.%s WHERE quorum_applied >= ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
 
     private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ =
         String.format("SELECT * FROM %s.%s WHERE shard_applied >= ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
@@ -199,13 +199,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
         Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new 
LongToken(1)), new TokenKey(tableId, new LongToken(100))));
         Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new 
LongToken(100)), new TokenKey(tableId, new LongToken(200))));
         
AsyncChains.getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)()
 -> "Test", safeStore -> {
-            safeStore.commandStore().markShardDurable(safeStore, syncId1, 
ranges1, Status.Durability.Universal);
-            safeStore.commandStore().markShardDurable(safeStore, syncId2, 
ranges2, Status.Durability.Majority);
+            safeStore.commandStore().markShardDurable(safeStore, syncId1, 
ranges1, HasOutcome.Universal);
+            safeStore.commandStore().markShardDurable(safeStore, syncId2, 
ranges2, HasOutcome.Quorum);
         }));
 
         
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE).size()).isGreaterThan(0);
-        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_MAJORITY_APPLIED_GEQ,
 syncId1.toString()).size()).isEqualTo(2);
-        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_MAJORITY_APPLIED_GEQ,
 syncId2.toString()).size()).isEqualTo(1);
+        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ, 
syncId1.toString()).size()).isEqualTo(2);
+        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ, 
syncId2.toString()).size()).isEqualTo(1);
         
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ, 
syncId1.toString()).size()).isEqualTo(1);
         
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ, 
syncId2.toString()).size()).isEqualTo(0);
     }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index a44196591b..5532a21b23 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -70,7 +70,7 @@ import org.apache.cassandra.service.accord.txn.TxnUpdate;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.utils.Pair;
 
-import static accord.primitives.Status.Durability.Majority;
+import static accord.primitives.Status.Durability.AllQuorums;
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
 import static 
org.apache.cassandra.service.accord.AccordTestUtils.Commands.preaccepted;
@@ -135,7 +135,7 @@ public class AccordCommandStoreTest
         Command.WaitingOn waitingOn = new 
Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps, new 
ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2));
         Pair<Writes, Result> result = 
AsyncChains.getBlocking(AccordTestUtils.processTxnResult(commandStore, txnId, 
txn, executeAt));
 
-        Command expected = Command.Executed.executed(txnId, 
SaveStatus.Applied, Majority, StoreParticipants.all(route),
+        Command expected = Command.Executed.executed(txnId, 
SaveStatus.Applied, AllQuorums, StoreParticipants.all(route),
                                                      promised, executeAt, txn, 
dependencies, accepted,
                                                      waitingOn, result.left, 
ResultSerializers.APPLIED);
         AccordSafeCommand safeCommand = new AccordSafeCommand(loaded(txnId, 
null));
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 5129bff029..b806433dc1 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -120,7 +120,7 @@ import static 
accord.api.ProtocolModifiers.Toggles.setTransitiveDependenciesAreV
 import static accord.local.cfk.CommandsForKey.NO_BOUNDS_INFO;
 import static accord.primitives.Known.KnownExecuteAt.ExecuteAtErased;
 import static accord.primitives.Known.KnownExecuteAt.ExecuteAtUnknown;
-import static accord.primitives.Status.Durability.Majority;
+import static accord.primitives.Status.Durability.AllQuorums;
 import static accord.primitives.Status.Durability.NotDurable;
 import static accord.utils.Property.qt;
 import static accord.utils.SortedArrays.Search.FAST;
@@ -182,7 +182,7 @@ public class CommandsForKeySerializerTest
                 builder.partialTxn(txn);
 
             
builder.setParticipants(StoreParticipants.all(txn.keys().toRoute(txn.keys().get(0).someIntersectingRoutingKey(null))));
-            builder.durability(isDurable ? Majority : NotDurable);
+            builder.durability(isDurable ? AllQuorums : NotDurable);
             if 
(saveStatus.known.deps().hasPreAcceptedOrProposedOrDecidedDeps())
             {
                 try (KeyDeps.Builder keyBuilder = KeyDeps.builder();)
@@ -196,7 +196,7 @@ public class CommandsForKeySerializerTest
             builder.executeAt(executeAt);
             builder.promised(ballot);
             builder.acceptedOrCommitted(ballot);
-            builder.durability(isDurable ? Majority : NotDurable);
+            builder.durability(isDurable ? AllQuorums : NotDurable);
             if (saveStatus.compareTo(SaveStatus.Stable) >= 0 && 
!saveStatus.hasBeen(Status.Truncated))
                 builder.waitingOn(Command.WaitingOn.empty(txnId.domain()));
 
@@ -424,7 +424,7 @@ public class CommandsForKeySerializerTest
     @Test
     public void serde()
     {
-        testOne(-4567266914751633833L);
+        testOne(7082228630293368049L);
         Random random = new Random();
         for (int i = 0 ; i < 10000 ; ++i)
         {
@@ -610,7 +610,7 @@ public class CommandsForKeySerializerTest
             else unmanaged = CommandsForKey.NO_PENDING_UNMANAGED;
 
             long maxUniqueHlc = rs.nextLong(0, Long.MAX_VALUE);
-            CommandsForKey expected = 
CommandsForKey.SerializerSupport.create(pk, info, maxUniqueHlc, unmanaged, 
TxnId.NONE, NO_BOUNDS_INFO);
+            CommandsForKey expected = 
CommandsForKey.SerializerSupport.create(pk, info, maxUniqueHlc, unmanaged, 
TxnId.NONE, NO_BOUNDS_INFO, true);
 
             ByteBuffer buffer = Serialize.toBytesWithoutKey(expected);
             CommandsForKey roundTrip = Serialize.fromBytes(pk, buffer);
@@ -627,7 +627,7 @@ public class CommandsForKeySerializerTest
         TxnId txnId = TxnId.fromValues(11,34052499,2,1);
         CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk,
                                                      new TxnInfo[] { 
TxnInfo.create(txnId, InternalStatus.PREACCEPTED_WITHOUT_DEPS, true, txnId, 
TxnId.NO_TXNIDS, Ballot.ZERO) },
-                                                                          0, 
CommandsForKey.NO_PENDING_UNMANAGED, TxnId.NONE, NO_BOUNDS_INFO);
+                                                                          0, 
CommandsForKey.NO_PENDING_UNMANAGED, TxnId.NONE, NO_BOUNDS_INFO, true);
 
         ByteBuffer buffer = Serialize.toBytesWithoutKey(expected);
         CommandsForKey roundTrip = Serialize.fromBytes(pk, buffer);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to