This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new b63d952856 More follow-up to CASSANDRA-19967 and CASSANDRA-19869
b63d952856 is described below

commit b63d95285616a385402224ce1ef45b4a5d8a98d1
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Wed Oct 2 13:35:57 2024 +0200

    More follow-up to CASSANDRA-19967 and CASSANDRA-19869
---
 modules/accord                                     |  2 +-
 .../db/compaction/CompactionIterator.java          | 42 +++++++++--
 src/java/org/apache/cassandra/journal/Journal.java | 16 +++-
 .../org/apache/cassandra/journal/Segments.java     |  8 ++
 .../apache/cassandra/journal/StaticSegment.java    |  9 ++-
 .../service/accord/AccordCommandStore.java         | 11 ++-
 .../cassandra/service/accord/AccordJournal.java    | 82 ++++++++++++++++++---
 .../service/accord/AccordSafeCommandStore.java     | 86 ++++++++++++++++++----
 .../service/accord/AccordSegmentCompactor.java     | 10 +++
 .../cassandra/service/accord/AccordService.java    | 11 ++-
 .../service/accord/CommandsForRangesLoader.java    |  2 +-
 .../cassandra/service/accord/IAccordService.java   |  8 +-
 .../cassandra/service/accord/SavedCommand.java     | 64 ++++++++++++----
 .../service/accord/async/AsyncOperation.java       | 23 +++---
 .../test/accord/AccordBootstrapTest.java           | 28 +++----
 .../distributed/test/accord/AccordLoadTest.java    | 16 +++-
 .../accord/AccordJournalCompactionTest.java        | 22 +++---
 .../compaction/CompactionAccordIteratorsTest.java  |  5 +-
 .../service/accord/AccordCommandStoreTest.java     |  5 +-
 .../cassandra/service/accord/MockJournal.java      | 24 +++---
 .../cassandra/service/accord/SavedCommandTest.java |  1 +
 21 files changed, 350 insertions(+), 125 deletions(-)

diff --git a/modules/accord b/modules/accord
index 4cf0070d60..f3782e2a98 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 4cf0070d604abd2db460a5f1c3f8cd8dc7d26696
+Subproject commit f3782e2a98004843cc3384a6983478c1128a1d6a
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 3e9a1de462..fc993e98da 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -109,6 +110,7 @@ import 
org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static accord.local.Cleanup.ERASE;
+import static accord.local.Cleanup.TRUNCATE;
 import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
 import static accord.local.Cleanup.shouldCleanupPartial;
 import static com.google.common.base.Preconditions.checkState;
@@ -148,6 +150,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CompactionIterator.class);
     private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
+    private static Object[] TRUNCATE_CLUSTERING_VALUE = new Object[] { 
Long.MAX_VALUE, Integer.MAX_VALUE };
 
     private final OperationType type;
     private final AbstractCompactionController controller;
@@ -806,8 +809,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
     class AccordCommandsPurger extends AbstractPurger
     {
         final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+        final Int2ObjectHashMap<DurableBefore> durableBefores;
         final Int2ObjectHashMap<RangesForEpoch> ranges;
-        final DurableBefore durableBefore;
 
         int storeId;
         TxnId txnId;
@@ -817,7 +820,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             IAccordService.CompactionInfo compactionInfo = 
accordService.get().getCompactionInfo();
             this.redundantBefores = compactionInfo.redundantBefores;
             this.ranges = compactionInfo.ranges;
-            this.durableBefore = compactionInfo.durableBefore;
+            this.durableBefores = compactionInfo.durableBefores;
         }
 
         protected void beginPartition(UnfilteredRowIterator partition)
@@ -833,6 +836,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             updateProgress();
 
             RedundantBefore redundantBefore = redundantBefores.get(storeId);
+            DurableBefore durableBefore = durableBefores.get(storeId);
             // TODO (expected): if the store has been retired, this should 
return null
             if (redundantBefore == null)
                 return row;
@@ -1013,8 +1017,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
     class AccordJournalPurger extends AbstractPurger
     {
         final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+        final Int2ObjectHashMap<DurableBefore> durableBefores;
         final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges;
-        final DurableBefore durableBefore;
         final ColumnMetadata recordColumn;
         final ColumnMetadata versionColumn;
         final KeySupport<JournalKey> keySupport = JournalKey.SUPPORT;
@@ -1026,6 +1030,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         Object[] lastClustering = null;
         long maxSeenTimestamp = -1;
         final int userVersion;
+        long lastDescriptor = -1;
+        int lastOffset = -1;
 
         public AccordJournalPurger(Supplier<IAccordService> serviceSupplier)
         {
@@ -1036,7 +1042,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 
             this.redundantBefores = compactionInfo.redundantBefores;
             this.ranges = compactionInfo.ranges;
-            this.durableBefore = compactionInfo.durableBefore;
+            this.durableBefores = compactionInfo.durableBefores;
             ColumnFamilyStore cfs = 
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL);
             this.recordColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
             this.versionColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
@@ -1050,6 +1056,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             serializer = 
(AccordJournalValueSerializers.FlyweightSerializer<Object, Object>) 
key.type.serializer;
             builder = serializer.mergerFor(key);
             maxSeenTimestamp = -1;
+            lastDescriptor = -1;
+            lastOffset = -1;
         }
 
         @Override
@@ -1096,6 +1104,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                 }
 
                 RedundantBefore redundantBefore = 
redundantBefores.get(key.commandStoreId);
+                DurableBefore durableBefore = 
durableBefores.get(key.commandStoreId);
                 Cleanup cleanup = 
commandBuilder.shouldCleanup(redundantBefore, durableBefore);
                 if (cleanup == ERASE)
                     return PartitionUpdate.fullPartitionDelete(metadata(), 
partition.partitionKey(), maxSeenTimestamp, nowInSec).unfilteredIterator();
@@ -1107,9 +1116,16 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                         return null;
 
                     PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
-                    newVersion.row(lastClustering)
-                              .add("record", 
commandBuilder.asByteBuffer(userVersion))
+
+                    Row.SimpleBuilder rowBuilder;
+                    if (cleanup == TRUNCATE || cleanup == 
TRUNCATE_WITH_OUTCOME)
+                        rowBuilder = newVersion.row(TRUNCATE_CLUSTERING_VALUE);
+                    else
+                        rowBuilder = newVersion.row(lastClustering);
+
+                    rowBuilder.add("record", 
commandBuilder.asByteBuffer(userVersion))
                               .add("user_version", userVersion);
+
                     return newVersion.build().unfilteredIterator();
                 }
 
@@ -1133,6 +1149,20 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             updateProgress();
             maxSeenTimestamp = row.primaryKeyLivenessInfo().timestamp();
             ByteBuffer record = row.getCell(recordColumn).buffer();
+            long descriptor = 
LongType.instance.compose(row.clustering().getBufferArray()[0]);
+            int offset = 
Int32Type.instance.compose(row.clustering().getBufferArray()[1]);
+
+            if (lastOffset != -1)
+            {
+                Invariants.checkState(descriptor >= lastDescriptor,
+                                      "Descriptors were accessed out of order: 
%d was accessed after %d", descriptor, lastDescriptor);
+                Invariants.checkState(descriptor != lastDescriptor ||
+                                      offset > lastOffset,
+                                      "Offsets within %s were accessed out of 
order: %d was accessed after %s", offset, lastOffset);
+            }
+            lastDescriptor = descriptor;
+            lastOffset = offset;
+
             try (DataInputBuffer in = new DataInputBuffer(record, false))
             {
                 int userVersion = 
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index d47d801d0e..5e91c7d3d3 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -525,6 +525,8 @@ public class Journal<K, V> implements Shutdownable
         ActiveSegment<K, V>.Allocation alloc;
         while (null == (alloc = segment.allocate(entrySize, hosts)))
         {
+            if (entrySize >= (params.segmentSize() * 3) / 4)
+                throw new IllegalStateException("entrySize " + entrySize + " 
too large for a segmentSize of " + params.segmentSize());
             // failed to allocate; move to a new segment with enough room
             advanceSegment(segment);
             segment = currentSegment;
@@ -776,6 +778,11 @@ public class Journal<K, V> implements Shutdownable
         swapSegments(current -> current.withNewActiveSegment(activeSegment));
     }
 
+    private void removeEmptySegment(ActiveSegment<K, V> activeSegment)
+    {
+        swapSegments(current -> current.withoutEmptySegment(activeSegment));
+    }
+
     private void replaceCompletedSegment(ActiveSegment<K, V> activeSegment, 
StaticSegment<K, V> staticSegment)
     {
         swapSegments(current -> current.withCompletedSegment(activeSegment, 
staticSegment));
@@ -869,6 +876,13 @@ public class Journal<K, V> implements Shutdownable
 
     void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> activeSegment)
     {
+        if (activeSegment.isEmpty())
+        {
+            removeEmptySegment(activeSegment);
+            activeSegment.closeAndDiscard();
+            return;
+        }
+
         closer.execute(new CloseActiveSegmentRunnable(activeSegment));
     }
 
@@ -973,7 +987,7 @@ public class Journal<K, V> implements Shutdownable
         private StaticSegmentIterator()
         {
             this.segments = selectAndReference(Segment::isStatic);
-            this.readers = new PriorityQueue<>((o1, o2) -> 
keySupport.compare(o1.key(), o2.key()));
+            this.readers = new PriorityQueue<>();
             for (Segment<K, V> segment : this.segments.all())
             {
                 StaticSegment<K, V> staticSegment = (StaticSegment<K, 
V>)segment;
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index a779aebf23..94282e9d87 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -63,6 +63,14 @@ class Segments<K, V>
         return new Segments<>(newSegments);
     }
 
+    Segments<K, V> withoutEmptySegment(ActiveSegment<K, V> activeSegment)
+    {
+        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
+        Segment<K, V> oldValue = 
segments.remove(activeSegment.descriptor.timestamp);
+        Invariants.checkState(oldValue.asActive().isEmpty());
+        return new Segments<>(newSegments);
+    }
+
     Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment, 
StaticSegment<K, V> staticSegment)
     {
         
Invariants.checkArgument(activeSegment.descriptor.equals(staticSegment.descriptor));
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index c7ac7ce410..f5f15ee13c 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -473,9 +473,12 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
             that.ensureHasAdvanced();
 
             int cmp = keySupport.compare(this.key(), that.key());
-            return cmp != 0
-                 ? cmp
-                 : this.descriptor.compareTo(that.descriptor);
+            if (cmp != 0)
+                return cmp;
+            cmp = Long.compare(this.descriptor.timestamp, 
that.descriptor.timestamp);
+            if (cmp != 0)
+                return cmp;
+            return Integer.compare(this.offset, that.offset);
         }
     }
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index a2367e5768..418c5cd687 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -502,9 +502,8 @@ public class AccordCommandStore extends CommandStore
         // We find a set of dependencies for a range then update CommandsFor 
to know about them
         Ranges allRanges = safeStore.ranges().all();
         deps.keyDeps.keys().forEach(allRanges, key -> {
-            // TODO (now): batch register to minimise GC
+            // TODO (desired): batch register to minimise GC
             deps.keyDeps.forEach(key, (txnId, txnIdx) -> {
-                // TODO (desired, efficiency): this can be made more efficient 
by batching by epoch
                 if (ranges.coordinates(txnId).contains(key))
                     return; // already coordinates, no need to replicate
                 if (!ranges.allBefore(txnId.epoch()).contains(key))
@@ -525,13 +524,13 @@ public class AccordCommandStore extends CommandStore
                 if (!ranges.allBefore(txnId.epoch()).intersects(range))
                     return;
 
+                // TODO (required): this is potentially not safe - it should 
not be persisted until we save in journal
+                //   but, preferable to retire historical transactions as a 
concept entirely, and rely on ExclusiveSyncPoints instead
                 diskCommandsForRanges().mergeHistoricalTransaction(txnId, 
Ranges.single(range).slice(allRanges), Ranges::with);
             });
         }
     }
 
-    public NavigableMap<Timestamp, Ranges> safeToRead() { return 
super.safeToRead(); }
-
     public void appendCommands(List<SavedCommand.DiffWriter> diffs, Runnable 
onFlush)
     {
         for (int i = 0; i < diffs.size(); i++)
@@ -545,7 +544,7 @@ public class AccordCommandStore extends CommandStore
     @VisibleForTesting
     public Command loadCommand(TxnId txnId)
     {
-        return journal.loadCommand(id, txnId, redundantBefore(), 
durableBefore());
+        return journal.loadCommand(id, txnId, unsafeGetRedundantBefore(), 
unsafeGetDurableBefore());
     }
 
     public interface Loader
@@ -592,7 +591,7 @@ public class AccordCommandStore extends CommandStore
                             Command local = command;
                             if (local.status() != Truncated && local.status() 
!= Invalidated)
                             {
-                                Cleanup cleanup = 
Cleanup.shouldCleanup(AccordCommandStore.this, local, local.participants());
+                                Cleanup cleanup = Cleanup.shouldCleanup(local, 
unsafeGetRedundantBefore(), unsafeGetDurableBefore());
                                 switch (cleanup)
                                 {
                                     case NO:
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index c1292d5322..eb8c0007f2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -253,18 +253,18 @@ public class AccordJournal implements IJournal, 
Shutdownable
     {
         RecordPointer pointer = null;
         // TODO: avoid allocating keys
-        if (fieldUpdates.redundantBefore != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.redundantBefore);
-        if (fieldUpdates.durableBefore != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.DURABLE_BEFORE, store), fieldUpdates.durableBefore);
-        if (fieldUpdates.bootstrapBeganAt != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.bootstrapBeganAt);
-        if (fieldUpdates.safeToRead != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.safeToRead);
-        if (fieldUpdates.rangesForEpoch != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.rangesForEpoch);
-        if (fieldUpdates.historicalTransactions != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.HISTORICAL_TRANSACTIONS, store), 
fieldUpdates.historicalTransactions);
+        if (fieldUpdates.addRedundantBefore != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.addRedundantBefore);
+        if (fieldUpdates.addDurableBefore != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.DURABLE_BEFORE, store), fieldUpdates.addDurableBefore);
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.newBootstrapBeganAt);
+        if (fieldUpdates.newSafeToRead != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.newSafeToRead);
+        if (fieldUpdates.newRangesForEpoch != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.newRangesForEpoch);
+        if (fieldUpdates.addHistoricalTransactions != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.HISTORICAL_TRANSACTIONS, store), 
fieldUpdates.addHistoricalTransactions);
 
         if (onFlush == null)
             return;
@@ -414,4 +414,62 @@ public class AccordJournal implements IJournal, 
Shutdownable
             isReplay.set(false);
         }
     }
+
+    // TODO: this is here temporarily; for debugging purposes
+    @VisibleForTesting
+    public void checkAllCommands()
+    {
+        try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = 
journalTable.readAll())
+        {
+            IAccordService.CompactionInfo compactionInfo = 
AccordService.instance().getCompactionInfo();
+            JournalKey key;
+            SavedCommand.Builder builder = new SavedCommand.Builder();
+            while ((key = iter.key()) != null)
+            {
+                builder.reset(key.id);
+                if (key.type != JournalKey.Type.COMMAND_DIFF)
+                {
+                    // TODO (required): add "skip" for the key to avoid 
getting stuck
+                    iter.readAllForKey(key, (segment, position, key1, buffer, 
hosts, userVersion) -> {});
+                    continue;
+                }
+
+                JournalKey finalKey = key;
+                List<RecordPointer> pointers = new ArrayList<>();
+                try
+                {
+                    iter.readAllForKey(key, (segment, position, local, buffer, 
hosts, userVersion) -> {
+                        pointers.add(new RecordPointer(segment, position));
+                        Invariants.checkState(finalKey.equals(local));
+                        try (DataInputBuffer in = new DataInputBuffer(buffer, 
false))
+                        {
+                            builder.deserializeNext(in, userVersion);
+                        }
+                        catch (IOException e)
+                        {
+                            // can only throw if serializer is buggy
+                            throw new RuntimeException(e);
+                        }
+                    });
+
+                    Cleanup cleanup = 
builder.shouldCleanup(compactionInfo.redundantBefores.get(key.commandStoreId), 
compactionInfo.durableBefores.get(key.commandStoreId));
+                    switch (cleanup)
+                    {
+                        case ERASE:
+                        case EXPUNGE:
+                        case EXPUNGE_PARTIAL:
+                        case VESTIGIAL:
+                            continue;
+                    }
+                    builder.construct();
+                }
+                catch (Throwable t)
+                {
+                    throw new RuntimeException(String.format("Caught an 
exception after iterating over: %s", pointers),
+                                               t);
+                }
+            }
+
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index fae5e4634f..3451899ac0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -284,45 +284,82 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     @Override
     public void upsertRedundantBefore(RedundantBefore addRedundantBefore)
     {
-        // TODO (now): this is a temporary measure, see comment on 
AccordJournalValueSerializers; upsert instead
+        // TODO (required): this is a temporary measure, see comment on 
AccordJournalValueSerializers; upsert instead
         //  when modifying, only modify together with 
AccordJournalValueSerializers
-        ensureFieldUpdates().redundantBefore = 
RedundantBefore.merge(commandStore.redundantBefore(), addRedundantBefore);
-        super.upsertRedundantBefore(addRedundantBefore);
+        ensureFieldUpdates().newRedundantBefore = 
ensureFieldUpdates().addRedundantBefore = 
RedundantBefore.merge(redundantBefore(), addRedundantBefore);
     }
 
     @Override
     public void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
     {
-        ensureFieldUpdates().bootstrapBeganAt = newBootstrapBeganAt;
+        ensureFieldUpdates().newBootstrapBeganAt = newBootstrapBeganAt;
         super.setBootstrapBeganAt(newBootstrapBeganAt);
     }
 
     @Override
     public void upsertDurableBefore(DurableBefore addDurableBefore)
     {
-        ensureFieldUpdates().durableBefore = addDurableBefore;
+        ensureFieldUpdates().addDurableBefore = addDurableBefore;
         super.upsertDurableBefore(addDurableBefore);
     }
 
     @Override
     public void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead)
     {
-        ensureFieldUpdates().safeToRead = newSafeToRead;
+        ensureFieldUpdates().newSafeToRead = newSafeToRead;
         super.setSafeToRead(newSafeToRead);
     }
 
     @Override
     public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
     {
-        ensureFieldUpdates().rangesForEpoch = rangesForEpoch.snapshot();
+        ensureFieldUpdates().newRangesForEpoch = rangesForEpoch.snapshot();
         super.setRangesForEpoch(rangesForEpoch);
         ranges = rangesForEpoch;
     }
 
+    @Override
+    public NavigableMap<TxnId, Ranges> bootstrapBeganAt()
+    {
+        if (fieldUpdates != null && fieldUpdates.newBootstrapBeganAt != null)
+            return fieldUpdates.newBootstrapBeganAt;
+
+        return super.bootstrapBeganAt();
+    }
+
+    @Override
+    public NavigableMap<Timestamp, Ranges> safeToReadAt()
+    {
+        if (fieldUpdates != null && fieldUpdates.newSafeToRead != null)
+            return fieldUpdates.newSafeToRead;
+
+        return super.safeToReadAt();
+    }
+
+    @Override
+    public RedundantBefore redundantBefore()
+    {
+        if (fieldUpdates != null && fieldUpdates.newRedundantBefore != null)
+            return fieldUpdates.newRedundantBefore;
+
+        return super.redundantBefore();
+    }
+
+    @Override
+    public DurableBefore durableBefore()
+    {
+        if (fieldUpdates != null && fieldUpdates.newDurableBefore != null)
+            return fieldUpdates.newDurableBefore;
+
+        return super.durableBefore();
+    }
+
     @Override
     protected void registerHistoricalTransactions(Deps deps)
     {
-        ensureFieldUpdates().historicalTransactions = deps;
+        ensureFieldUpdates().addHistoricalTransactions = deps;
+        // TODO (required): it is potentially unsafe to propagate this 
synchronously, as if we fail to write to the journal we may be in an 
inconsistent state
+        //     however, we can and should retire the concept of historical 
transactions in favour of ExclusiveSyncPoints ensuring their deps are known
         super.registerHistoricalTransactions(deps);
     }
 
@@ -337,13 +374,34 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         return fieldUpdates;
     }
 
+    public void postExecute()
+    {
+        if (fieldUpdates == null)
+            return;
+
+        if (fieldUpdates.newRedundantBefore != null)
+            super.unsafeSetRedundantBefore(fieldUpdates.newRedundantBefore);
+
+        if (fieldUpdates.newDurableBefore != null)
+            super.unsafeSetDurableBefore(fieldUpdates.newDurableBefore);
+
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            super.setBootstrapBeganAt(fieldUpdates.newBootstrapBeganAt);
+
+        if (fieldUpdates.newSafeToRead != null)
+            super.setSafeToRead(fieldUpdates.newSafeToRead);
+
+        if (fieldUpdates.newRangesForEpoch != null)
+            super.setRangesForEpoch(ranges);
+    }
+
     public static class FieldUpdates
     {
-        public RedundantBefore redundantBefore;
-        public DurableBefore durableBefore;
-        public NavigableMap<TxnId, Ranges> bootstrapBeganAt;
-        public NavigableMap<Timestamp, Ranges> safeToRead;
-        public RangesForEpoch.Snapshot rangesForEpoch;
-        public Deps historicalTransactions;
+        public RedundantBefore addRedundantBefore, newRedundantBefore;
+        public DurableBefore addDurableBefore, newDurableBefore;
+        public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
+        public NavigableMap<Timestamp, Ranges> newSafeToRead;
+        public RangesForEpoch.Snapshot newRangesForEpoch;
+        public Deps addHistoricalTransactions;
     }
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java 
b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
index f0c7b38c37..f94510b8b8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
@@ -99,6 +99,8 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
                         key = reader.key();
                         serializer = (FlyweightSerializer<Object, Object>) 
key.type.serializer;
                         builder = serializer.mergerFor(key);
+                        lastOffset = -1;
+                        lastDescriptor = -1;
                     }
 
                     boolean advanced;
@@ -106,6 +108,14 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
                     {
                         try (DataInputBuffer in = new 
DataInputBuffer(reader.record(), false))
                         {
+                            if (lastDescriptor != -1)
+                            {
+                                
Invariants.checkState(reader.descriptor.timestamp >= lastDescriptor,
+                                                      "Descriptors were 
accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, 
lastDescriptor);
+                                
Invariants.checkState(reader.descriptor.timestamp != lastDescriptor ||
+                                                      reader.offset() > 
lastOffset,
+                                                      "Offsets within %s were 
accessed out of order: %d was accessed after %s", reader.offset(), lastOffset);
+                            }
                             serializer.deserialize(key, builder, in, 
reader.descriptor.userVersion);
                             lastDescriptor = reader.descriptor.timestamp;
                             lastOffset = reader.offset();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index bce8a198da..e051a53803 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -31,7 +31,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -298,7 +297,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         @Override
         public CompactionInfo getCompactionInfo()
         {
-            return new CompactionInfo(new Int2ObjectHashMap<>(), new 
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
+            return new CompactionInfo(new Int2ObjectHashMap<>(), new 
Int2ObjectHashMap<>(), new Int2ObjectHashMap<>());
         }
 
         @Override
@@ -1261,17 +1260,17 @@ public class AccordService implements IAccordService, 
Shutdownable
     public CompactionInfo getCompactionInfo()
     {
         Int2ObjectHashMap<RedundantBefore> redundantBefores = new 
Int2ObjectHashMap<>();
+        Int2ObjectHashMap<DurableBefore> durableBefores = new 
Int2ObjectHashMap<>();
         Int2ObjectHashMap<RangesForEpoch> ranges = new Int2ObjectHashMap<>();
-        AtomicReference<DurableBefore> durableBefore = new 
AtomicReference<>(DurableBefore.EMPTY);
         
AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> {
             synchronized (redundantBefores)
             {
-                redundantBefores.put(safeStore.commandStore().id(), 
safeStore.commandStore().redundantBefore());
+                redundantBefores.put(safeStore.commandStore().id(), 
safeStore.redundantBefore());
                 ranges.put(safeStore.commandStore().id(), safeStore.ranges());
+                durableBefores.put(safeStore.commandStore().id(), 
safeStore.durableBefore());
             }
-            durableBefore.set(DurableBefore.merge(durableBefore.get(), 
safeStore.commandStore().durableBefore()));
         }));
-        return new CompactionInfo(redundantBefores, ranges, 
durableBefore.get());
+        return new CompactionInfo(redundantBefores, ranges, durableBefores);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
index 6324735883..e7e1461054 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
@@ -187,7 +187,7 @@ public class CommandsForRangesLoader
     {
         //TODO (now): this logic is kinda duplicate of 
org.apache.cassandra.service.accord.CommandsForRange.mapReduce
         // should figure out if this can be improved... also what is correct?
-        var durableBefore = store.durableBefore();
+        var durableBefore = store.unsafeGetDurableBefore();
         NavigableMap<TxnId, Summary> map = new TreeMap<>();
         for (TxnId txnId : possibleTxns)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 1be920bc1b..e5e2d125f1 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -134,17 +134,17 @@ public interface IAccordService
 
     class CompactionInfo
     {
-        static final Supplier<CompactionInfo> NO_OP = () ->  new 
CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), 
DurableBefore.EMPTY);
+        static final Supplier<CompactionInfo> NO_OP = () ->  new 
CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), new 
Int2ObjectHashMap<>());
 
         public final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+        public final Int2ObjectHashMap<DurableBefore> durableBefores;
         public final Int2ObjectHashMap<RangesForEpoch> ranges;
-        public final DurableBefore durableBefore;
 
-        public CompactionInfo(Int2ObjectHashMap<RedundantBefore> 
redundantBefores, Int2ObjectHashMap<RangesForEpoch> ranges, DurableBefore 
durableBefore)
+        public CompactionInfo(Int2ObjectHashMap<RedundantBefore> 
redundantBefores, Int2ObjectHashMap<RangesForEpoch> ranges, 
Int2ObjectHashMap<DurableBefore> durableBefores)
         {
             this.redundantBefores = redundantBefores;
             this.ranges = ranges;
-            this.durableBefore = durableBefore;
+            this.durableBefores = durableBefores;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java 
b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
index 1f0086b484..209208989f 100644
--- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
@@ -51,6 +51,7 @@ import 
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
 import org.apache.cassandra.utils.Throwables;
 
 import static accord.local.Cleanup.NO;
+import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
 import static accord.primitives.Known.KnownDeps.DepsErased;
 import static accord.primitives.Known.KnownDeps.DepsUnknown;
 import static accord.primitives.Known.KnownDeps.NoDeps;
@@ -73,9 +74,10 @@ public class SavedCommand
         PARTIAL_DEPS,
         WAITING_ON,
         WRITES,
+        CLEANUP
         ;
 
-        static final Fields[] FIELDS = values();
+        public static final Fields[] FIELDS = values();
     }
 
     // TODO: maybe rename this and enclosing classes?
@@ -123,7 +125,6 @@ public class SavedCommand
         }
     }
 
-
     public static ByteBuffer asSerializedDiff(Command after, int userVersion) 
throws IOException
     {
         try (DataOutputBuffer out = new DataOutputBuffer())
@@ -304,6 +305,7 @@ public class SavedCommand
         SavedCommand.WaitingOnProvider waitingOn;
         Writes writes;
         Result result;
+        Cleanup cleanup;
 
         boolean nextCalled;
         int count;
@@ -385,14 +387,26 @@ public class SavedCommand
         public void clear()
         {
             flags = 0;
+            txnId = null;
+
             executeAt = null;
+            executeAtLeast = null;
             saveStatus = null;
             durability = null;
+
+            acceptedOrCommitted = null;
             promised = null;
+
             participants = null;
             partialTxn = null;
             partialDeps = null;
+
+            waitingOnBytes = null;
+            waitingOn = null;
             writes = null;
+            result = null;
+            cleanup = null;
+
             nextCalled = false;
             count = 0;
         }
@@ -428,17 +442,19 @@ public class SavedCommand
                 return NO;
 
             if (saveStatus == null || participants == null)
-                return Cleanup.EXPUNGE_PARTIAL;
+                return Cleanup.NO;
 
-            return Cleanup.shouldCleanup(txnId, saveStatus, durability, 
participants, redundantBefore, durableBefore);
+            Cleanup cleanup = Cleanup.shouldCleanup(txnId, saveStatus, 
durability, participants, redundantBefore, durableBefore);
+            if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
+                cleanup = this.cleanup;
+            return cleanup;
         }
 
         // TODO (expected): avoid allocating new builder
         public Builder maybeCleanup(Cleanup cleanup)
         {
-            // Do not have txnId in selected SSTables; remove
             if (saveStatus() == null)
-                return null;
+                return this;
 
             switch (cleanup)
             {
@@ -447,19 +463,23 @@ public class SavedCommand
                     return null;
 
                 case EXPUNGE_PARTIAL:
-                    return expungePartial();
+                    return expungePartial(cleanup, saveStatus, true);
+
                 case VESTIGIAL:
                 case INVALIDATE:
+                    return saveStatusOnly();
+
                 case TRUNCATE_WITH_OUTCOME:
                 case TRUNCATE:
-                    return saveStatusOnly();
+                    return expungePartial(cleanup, cleanup.appliesIfNot, 
cleanup == TRUNCATE_WITH_OUTCOME);
+
                 case NO:
                     return this;
                 default:
                     throw new IllegalStateException("Unknown cleanup: " + 
cleanup);}
         }
 
-        public Builder expungePartial()
+        public Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus, 
boolean includeOutcome)
         {
             Invariants.checkState(txnId != null);
             Builder builder = new Builder(txnId);
@@ -467,12 +487,11 @@ public class SavedCommand
             builder.count++;
             builder.nextCalled = true;
 
-            // TODO: these accesses can be abstracted away
-            if (saveStatus != null)
-            {
-                builder.flags = setFieldChanged(Fields.SAVE_STATUS, 
builder.flags);
-                builder.saveStatus = saveStatus;
-            }
+            Invariants.checkState(saveStatus != null);
+            builder.flags = setFieldChanged(Fields.SAVE_STATUS, builder.flags);
+            builder.saveStatus = saveStatus;
+            builder.flags = setFieldChanged(Fields.CLEANUP, builder.flags);
+            builder.cleanup = cleanup;
             if (executeAt != null)
             {
                 builder.flags = setFieldChanged(Fields.EXECUTE_AT, 
builder.flags);
@@ -488,6 +507,11 @@ public class SavedCommand
                 builder.flags = setFieldChanged(Fields.PARTICIPANTS, 
builder.flags);
                 builder.participants = participants;
             }
+            if (includeOutcome && builder.writes != null)
+            {
+                builder.flags = setFieldChanged(Fields.WRITES, builder.flags);
+                builder.writes = writes;
+            }
 
             return builder;
         }
@@ -554,6 +578,9 @@ public class SavedCommand
 
             if (getFieldChanged(Fields.WRITES, flags) && 
!getFieldIsNull(Fields.WRITES, flags))
                 CommandSerializers.writes.serialize(writes(), out, 
userVersion);
+
+            if (getFieldChanged(Fields.CLEANUP, flags))
+                out.writeByte(cleanup.ordinal());
         }
 
 
@@ -682,6 +709,13 @@ public class SavedCommand
                 else
                     writes = CommandSerializers.writes.deserialize(in, 
userVersion);
             }
+
+            if (getFieldChanged(Fields.CLEANUP, flags))
+            {
+                Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
+                if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
+                    cleanup = newCleanup;
+            }
         }
 
         public void forceResult(Result newValue)
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 62cda848a6..2c2867aa65 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -269,24 +269,23 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
                     }
                 }
 
-                commandStore.completeOperation(safeStore);
-                context.releaseResources(commandStore);
-                state(COMPLETING);
+                boolean flushed = false;
                 if (diffs != null || safeStore.fieldUpdates() != null)
                 {
                     Runnable onFlush = () -> finish(result, null);
                     if (safeStore.fieldUpdates() != null)
-                    {
-                        if (diffs != null)
-                            appendCommands(diffs, null);
-                        
commandStore.persistFieldUpdates(safeStore.fieldUpdates(), onFlush);
-                    }
-                    else
-                    {
+                        
commandStore.persistFieldUpdates(safeStore.fieldUpdates(), diffs == null ? 
onFlush : null);
+                    if (diffs != null)
                         appendCommands(diffs, onFlush);
-                    }
-                    return false;
+                    flushed = true;
                 }
+
+                commandStore.completeOperation(safeStore);
+                context.releaseResources(commandStore);
+                state(COMPLETING);
+                if (flushed)
+                    return false;
+
             case COMPLETING:
                 finish(result, null);
             case FINISHED:
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 483f7e4f37..fa93afa2ae 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -47,9 +47,9 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordConfigurationService;
 import 
org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot;
+import org.apache.cassandra.service.accord.AccordSafeCommandStore;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.streaming.StreamManager;
@@ -271,9 +271,9 @@ public class AccordBootstrapTest extends TestBaseImpl
                     });
 
                     
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore
 -> {
-                        AccordCommandStore commandStore = (AccordCommandStore) 
safeStore.commandStore();
-                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(commandStore.bootstrapBeganAt().keySet()));
-                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(commandStore.safeToRead().keySet()));
+                        AccordSafeCommandStore ss = (AccordSafeCommandStore) 
safeStore;
+                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(ss.bootstrapBeganAt().keySet()));
+                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(ss.safeToReadAt().keySet()));
 //
 //                        Assert.assertTrue(commandStore.maxBootstrapEpoch() > 
0);
 //                        
Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty());
@@ -316,17 +316,17 @@ public class AccordBootstrapTest extends TestBaseImpl
                         
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore
 -> {
                             if 
(safeStore.ranges().currentRanges().contains(partitionKey))
                             {
-                                AccordCommandStore commandStore = 
(AccordCommandStore) safeStore.commandStore();
-                                
Assert.assertFalse(commandStore.bootstrapBeganAt().isEmpty());
-                                
Assert.assertFalse(commandStore.safeToRead().isEmpty());
+                                AccordSafeCommandStore ss = 
(AccordSafeCommandStore) safeStore;
+                                
Assert.assertFalse(ss.bootstrapBeganAt().isEmpty());
+                                
Assert.assertFalse(ss.safeToReadAt().isEmpty());
 
-                                Assert.assertEquals(1, 
commandStore.bootstrapBeganAt().entrySet().stream()
+                                Assert.assertEquals(1, 
ss.bootstrapBeganAt().entrySet().stream()
                                                                    
.filter(entry -> entry.getValue().contains(partitionKey))
                                                                    .map(entry 
-> {
                                                                        
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
                                                                        return 
entry;
                                                                    }).count());
-                                Assert.assertEquals(1, 
commandStore.safeToRead().entrySet().stream()
+                                Assert.assertEquals(1, 
ss.safeToReadAt().entrySet().stream()
                                                                    
.filter(entry -> entry.getValue().contains(partitionKey))
                                                                    .map(entry 
-> {
                                                                        
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
@@ -458,17 +458,17 @@ public class AccordBootstrapTest extends TestBaseImpl
                                                                                
           safeStore -> {
                                 if 
(!safeStore.ranges().allAt(preMove).contains(partitionKey))
                                 {
-                                    AccordCommandStore commandStore = 
(AccordCommandStore) safeStore.commandStore();
-                                    
Assert.assertFalse(commandStore.bootstrapBeganAt().isEmpty());
-                                    
Assert.assertFalse(commandStore.safeToRead().isEmpty());
+                                    AccordSafeCommandStore ss = 
(AccordSafeCommandStore) safeStore;
+                                    
Assert.assertFalse(ss.bootstrapBeganAt().isEmpty());
+                                    
Assert.assertFalse(ss.safeToReadAt().isEmpty());
 
-                                    Assert.assertEquals(1, 
commandStore.bootstrapBeganAt().entrySet().stream()
+                                    Assert.assertEquals(1, 
ss.bootstrapBeganAt().entrySet().stream()
                                                                        
.filter(entry -> entry.getValue().contains(partitionKey))
                                                                        
.map(entry -> {
                                                                            
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
                                                                            
return entry;
                                                                        
}).count());
-                                    Assert.assertEquals(1, 
commandStore.safeToRead().entrySet().stream()
+                                    Assert.assertEquals(1, 
ss.safeToReadAt().entrySet().stream()
                                                                        
.filter(entry -> entry.getValue().contains(partitionKey))
                                                                        
.map(entry -> {
                                                                            
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 48f8903577..d9315cf2c7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -38,7 +38,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
@@ -85,7 +84,7 @@ public class AccordLoadTest extends AccordTestBase
 
                  cluster.forEach(i -> i.runOnInstance(() -> {
                      ((AccordService) 
AccordService.instance()).journal().compactor().updateCompactionPeriod(1, 
SECONDS);
-                     ((AccordSpec.JournalSpec)((AccordService) 
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
+//                     ((AccordSpec.JournalSpec)((AccordService) 
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
                  }));
 
                  ICoordinator coordinator = cluster.coordinator(1);
@@ -162,14 +161,23 @@ public class AccordLoadTest extends AccordTestBase
                      {
                          nextCompactionAt += compactionInterval;
                          System.out.println("compacting accord...");
-                         cluster.forEach(i -> i.nodetool("compact", 
"system_accord.journal"));
+                         cluster.forEach(i -> {
+                             i.nodetool("compact", "system_accord.journal");
+                             i.runOnInstance(() -> {
+                                 ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
+                             });
+                         });
+
                      }
 
                      if ((nextFlushAt -= batchSize) <= 0)
                      {
                          nextFlushAt += flushInterval;
                          System.out.println("flushing journal...");
-                         cluster.forEach(i -> i.runOnInstance(() -> 
((AccordService) 
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty()));
+                         cluster.forEach(i -> i.runOnInstance(() -> {
+                             ((AccordService) 
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
+                             ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
+                         }));
                      }
 
                      final Date date = new Date();
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
index da1b9cea10..6229e8148f 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
@@ -141,22 +141,22 @@ public class AccordJournalCompactionTest
             {
                 timestamp = timestamp.next();
                 AccordSafeCommandStore.FieldUpdates updates = new 
AccordSafeCommandStore.FieldUpdates();
-                updates.durableBefore = durableBeforeGen.next(rs);
+                updates.addDurableBefore = durableBeforeGen.next(rs);
                 // TODO: improve redundant before generator and re-enable
 //                updates.redundantBefore = redundantBeforeGen.next(rs);
-                updates.safeToRead = safeToReadGen.next(rs);
-                updates.rangesForEpoch = rangesForEpochGen.next(rs);
-                updates.historicalTransactions = 
historicalTransactionsGen.next(rs);
+                updates.newSafeToRead = safeToReadGen.next(rs);
+                updates.newRangesForEpoch = rangesForEpochGen.next(rs);
+                updates.addHistoricalTransactions = 
historicalTransactionsGen.next(rs);
 
                 journal.persistStoreState(1, updates, null);
 
-                redundantBeforeAccumulator.update(updates.redundantBefore);
-                durableBeforeAccumulator.update(updates.durableBefore);
-                if (updates.bootstrapBeganAt != null)
-                    
bootstrapBeganAtAccumulator.update(updates.bootstrapBeganAt);
-                safeToReadAccumulator.update(updates.safeToRead);
-                rangesForEpochAccumulator.update(updates.rangesForEpoch);
-                
historicalTransactionsAccumulator.update(updates.historicalTransactions);
+                redundantBeforeAccumulator.update(updates.addRedundantBefore);
+                durableBeforeAccumulator.update(updates.addDurableBefore);
+                if (updates.newBootstrapBeganAt != null)
+                    
bootstrapBeganAtAccumulator.update(updates.newBootstrapBeganAt);
+                safeToReadAccumulator.update(updates.newSafeToRead);
+                rangesForEpochAccumulator.update(updates.newRangesForEpoch);
+                
historicalTransactionsAccumulator.update(updates.addHistoricalTransactions);
 
                 if (i % 100 == 0)
                     journal.closeCurrentSegmentForTestingIfNonEmpty();
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 57ef025061..a329be5864 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -420,9 +420,12 @@ public class CompactionAccordIteratorsTest
         Int2ObjectHashMap<RedundantBefore> redundantBefores = new 
Int2ObjectHashMap<>();
         if (redundantBefore != null)
             redundantBefores.put(commandStore.id(), redundantBefore);
+        Int2ObjectHashMap<DurableBefore> durableBefores = new 
Int2ObjectHashMap<>();
+        if (durableBefore != null)
+            durableBefores.put(commandStore.id(), durableBefore);
         Int2ObjectHashMap<CommandStores.RangesForEpoch> rangesForEpochs = new 
Int2ObjectHashMap<>();
         rangesForEpochs.put(commandStore.id(), 
commandStore.unsafeRangesForEpoch());
-        when(mockAccordService.getCompactionInfo()).thenReturn(new 
IAccordService.CompactionInfo(redundantBefores, rangesForEpochs, 
durableBefore));
+        when(mockAccordService.getCompactionInfo()).thenReturn(new 
IAccordService.CompactionInfo(redundantBefores, rangesForEpochs, 
durableBefores));
         return mockAccordService;
     }
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index b6fca2e9cc..e41e4ca579 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -159,6 +159,7 @@ public class AccordCommandStoreTest
     {
         AtomicLong clock = new AtomicLong(0);
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
+//        SafeCommandStore safeStore =
         Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 1);
 
         PartialTxn txn = createPartialTxn(1);
@@ -172,10 +173,10 @@ public class AccordCommandStoreTest
         AccordSafeTimestampsForKey tfk = new 
AccordSafeTimestampsForKey(loaded(key, null));
         tfk.initialize();
 
-        TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, 
txnId1, txnId1, true);
+        TimestampsForKeys.updateLastExecutionTimestamps(null, tfk, txnId1, 
txnId1, true);
         Assert.assertEquals(txnId1.hlc(), 
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId1, true));
 
-        TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, 
txnId2, txnId2, true);
+        TimestampsForKeys.updateLastExecutionTimestamps(null, tfk, txnId2, 
txnId2, true);
         Assert.assertEquals(txnId2.hlc(), 
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId2, true));
 
         Assert.assertEquals(txnId2, tfk.current().lastExecutedTimestamp());
diff --git a/test/unit/org/apache/cassandra/service/accord/MockJournal.java 
b/test/unit/org/apache/cassandra/service/accord/MockJournal.java
index 24b911a3ff..dd7377ab58 100644
--- a/test/unit/org/apache/cassandra/service/accord/MockJournal.java
+++ b/test/unit/org/apache/cassandra/service/accord/MockJournal.java
@@ -137,18 +137,18 @@ public class MockJournal implements IJournal
     public void persistStoreState(int store, 
AccordSafeCommandStore.FieldUpdates fieldUpdates, Runnable onFlush)
     {
         FieldUpdates updates = fieldUpdates(store);
-        if (fieldUpdates.redundantBefore != null)
-            
updates.redundantBeforeAccumulator.update(fieldUpdates.redundantBefore);
-        if (fieldUpdates.durableBefore != null)
-            
updates.durableBeforeAccumulator.update(fieldUpdates.durableBefore);
-        if (fieldUpdates.bootstrapBeganAt != null)
-            
updates.bootstrapBeganAtAccumulator.update(fieldUpdates.bootstrapBeganAt);
-        if (fieldUpdates.safeToRead != null)
-            updates.safeToReadAccumulator.update(fieldUpdates.safeToRead);
-        if (fieldUpdates.rangesForEpoch != null)
-            
updates.rangesForEpochAccumulator.update(fieldUpdates.rangesForEpoch);
-        if (fieldUpdates.historicalTransactions != null)
-            
updates.historicalTransactionsAccumulator.update(fieldUpdates.historicalTransactions);
+        if (fieldUpdates.addRedundantBefore != null)
+            
updates.redundantBeforeAccumulator.update(fieldUpdates.addRedundantBefore);
+        if (fieldUpdates.addDurableBefore != null)
+            
updates.durableBeforeAccumulator.update(fieldUpdates.addDurableBefore);
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            
updates.bootstrapBeganAtAccumulator.update(fieldUpdates.newBootstrapBeganAt);
+        if (fieldUpdates.newSafeToRead != null)
+            updates.safeToReadAccumulator.update(fieldUpdates.newSafeToRead);
+        if (fieldUpdates.newRangesForEpoch != null)
+            
updates.rangesForEpochAccumulator.update(fieldUpdates.newRangesForEpoch);
+        if (fieldUpdates.addHistoricalTransactions != null)
+            
updates.historicalTransactionsAccumulator.update(fieldUpdates.addHistoricalTransactions);
 
         onFlush.run();
     }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
index 5627029644..99963c3af0 100644
--- a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
@@ -132,6 +132,7 @@ public class SavedCommandTest
         SoftAssertions checks = new SoftAssertions();
         for (Fields field : missing)
         {
+            if (field == Fields.CLEANUP) continue;
             checks.assertThat(SavedCommand.getFieldChanged(field, flags))
                   .describedAs("field %s changed", field)
                   .isFalse();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to