This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 8aeab43d47 Migrate in memory journal to CommandChange logic shared 
with AccordJournal
8aeab43d47 is described below

commit 8aeab43d47274b9ff6b00eaf24ea587f92449b54
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Tue Nov 26 15:26:54 2024 +0100

    Migrate in memory journal to CommandChange logic shared with AccordJournal
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20115
---
 modules/accord                                     |   2 +-
 .../db/compaction/CompactionIterator.java          |   6 +-
 .../cassandra/service/accord/AccordCache.java      |  30 +-
 .../cassandra/service/accord/AccordCacheEntry.java |   8 +-
 .../service/accord/AccordCommandStore.java         |  14 +-
 .../cassandra/service/accord/AccordJournal.java    | 400 +++++++--
 .../accord/AccordJournalValueSerializers.java      |  24 +-
 .../cassandra/service/accord/AccordTask.java       |   2 +-
 .../service/accord/CommandsForRanges.java          |   4 +-
 .../apache/cassandra/service/accord/IJournal.java  |  40 -
 .../cassandra/service/accord/SavedCommand.java     | 980 ---------------------
 .../distributed/test/accord/AccordLoadTest.java    |   4 -
 .../service/accord/AccordJournalBurnTest.java      |   4 +-
 .../service/accord/AccordJournalOrderTest.java     |   2 +-
 ...avedCommandTest.java => CommandChangeTest.java} |  25 +-
 .../accord/SimulatedAccordCommandStore.java        |  21 +-
 16 files changed, 396 insertions(+), 1170 deletions(-)

diff --git a/modules/accord b/modules/accord
index 520cc1072d..f7b9bb8887 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 520cc1072d44a5f7617566b6667e915532b89033
+Subproject commit f7b9bb8887ed672185f269ebcbc9d11e6aeafca9
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index e11d154829..90b583235c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -92,6 +92,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordJournal;
 import org.apache.cassandra.service.accord.AccordJournalValueSerializers;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
 import org.apache.cassandra.service.accord.AccordKeyspace;
@@ -101,7 +102,6 @@ import 
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.IAccordService;
 import org.apache.cassandra.service.accord.JournalKey;
-import org.apache.cassandra.service.accord.SavedCommand;
 import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.service.paxos.PaxosRepairHistory;
@@ -1016,7 +1016,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                     return newVersion.build().unfilteredIterator();
                 }
 
-                SavedCommand.Builder commandBuilder = (SavedCommand.Builder) 
builder;
+                AccordJournal.Builder commandBuilder = (AccordJournal.Builder) 
builder;
                 if (commandBuilder.isEmpty())
                 {
                     Invariants.checkState(rows.isEmpty());
@@ -1038,7 +1038,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                     PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
 
                     Row.SimpleBuilder rowBuilder = 
newVersion.row(firstClustering);
-                    rowBuilder.add("record", 
commandBuilder.asByteBuffer(userVersion))
+                    rowBuilder.add("record", 
commandBuilder.asByteBuffer(redundantBefore, userVersion))
                               .add("user_version", userVersion);
 
                     return newVersion.build().unfilteredIterator();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index 180b827531..cbdba5224f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -104,7 +104,7 @@ public class AccordCache implements CacheSize
         @Nullable V quickShrink(V value);
         // a result of null means we cannot shrink, and should save/evict as 
appropriate
         @Nullable Object fullShrink(K key, V value);
-        @Nullable V inflate(K key, Object shrunk);
+        @Nullable V inflate(AccordCommandStore commandStore, K key, Object 
shrunk);
         long estimateHeapSize(V value);
         long estimateShrunkHeapSize(Object shrunk);
         boolean validate(AccordCommandStore commandStore, K key, V value);
@@ -359,7 +359,7 @@ public class AccordCache implements CacheSize
         ToLongFunction<V> heapEstimator,
         Function<AccordCacheEntry<K, V>, S> safeRefFactory)
     {
-        return newType(keyClass, loadFunction, saveFunction, quickShrink, (i, 
j) -> j, (i, j) -> (V)j, validateFunction, heapEstimator, i -> 0, 
safeRefFactory);
+        return newType(keyClass, loadFunction, saveFunction, quickShrink, (i, 
j) -> j, (c, i, j) -> (V)j, validateFunction, heapEstimator, i -> 0, 
safeRefFactory);
     }
 
     public <K, V, S extends AccordSafeState<K, V>> Type<K, V, S> newType(
@@ -368,7 +368,7 @@ public class AccordCache implements CacheSize
         QuadFunction<AccordCommandStore, K, V, Object, Runnable> saveFunction,
         Function<V, V> quickShrink,
         BiFunction<K, V, Object> fullShrink,
-        BiFunction<K, Object, V> inflate,
+        TriFunction<AccordCommandStore, K, Object, V> inflate,
         TriFunction<AccordCommandStore, K, V, Boolean> validateFunction,
         ToLongFunction<V> heapEstimator,
         ToLongFunction<Object> shrunkHeapEstimator,
@@ -583,7 +583,7 @@ public class AccordCache implements CacheSize
                     {
                         Object shrunk = state.tryGetShrunk();
                         if (shrunk != null)
-                            evicted = adapter.inflate(key, shrunk);
+                            evicted = adapter.inflate(commandStore, key, 
shrunk);
                     }
                     catch (RuntimeException rte)
                     {
@@ -971,7 +971,7 @@ public class AccordCache implements CacheSize
         final QuadFunction<AccordCommandStore, K, V, Object, Runnable> save;
         final Function<V, V> quickShrink;
         final BiFunction<K, V, Object> shrink;
-        final BiFunction<K, Object, V> inflate;
+        final TriFunction<AccordCommandStore, K, Object, V> inflate;
         final TriFunction<AccordCommandStore, K, V, Boolean> validate;
         final ToLongFunction<V> estimateHeapSize;
         final ToLongFunction<Object> estimateShrunkHeapSize;
@@ -981,7 +981,7 @@ public class AccordCache implements CacheSize
         FunctionalAdapter(BiFunction<AccordCommandStore, K, V> load,
                           QuadFunction<AccordCommandStore, K, V, Object, 
Runnable> save,
                           Function<V, V> quickShrink, BiFunction<K, V, Object> 
shrink,
-                          BiFunction<K, Object, V> inflate,
+                          TriFunction<AccordCommandStore, K, Object, V> 
inflate,
                           TriFunction<AccordCommandStore, K, V, Boolean> 
validate,
                           ToLongFunction<V> estimateHeapSize,
                           ToLongFunction<Object> estimateShrunkHeapSize,
@@ -1030,9 +1030,9 @@ public class AccordCache implements CacheSize
         }
 
         @Override
-        public V inflate(K key, Object shrunk)
+        public V inflate(AccordCommandStore commandStore, K key, Object shrunk)
         {
-            return inflate.apply(key, shrunk);
+            return inflate.apply(commandStore, key, shrunk);
         }
 
         @Override
@@ -1096,7 +1096,7 @@ public class AccordCache implements CacheSize
         @Override public Runnable save(AccordCommandStore commandStore, K key, 
@Nullable V value, @Nullable Object shrunk) { return null; }
         @Override public V quickShrink(V value) { return null; }
         @Override public Object fullShrink(K key, V value) { return null; }
-        @Override public V inflate(K key, Object shrunk) { return null; }
+        @Override public V inflate(AccordCommandStore commandStore, K key, 
Object shrunk) { return null; }
         @Override public long estimateHeapSize(V value) { return 0; }
         @Override public long estimateShrunkHeapSize(Object shrunk) { return 
0; }
         @Override public boolean validate(AccordCommandStore commandStore, K 
key, V value) { return false; }
@@ -1136,7 +1136,7 @@ public class AccordCache implements CacheSize
         }
 
         @Override
-        public CommandsForKey inflate(RoutingKey key, Object shrunk)
+        public CommandsForKey inflate(AccordCommandStore commandStore, 
RoutingKey key, Object shrunk)
         {
             return Serialize.fromBytes(key, (ByteBuffer)shrunk);
         }
@@ -1186,7 +1186,7 @@ public class AccordCache implements CacheSize
 
             if (value == null)
             {
-                value = inflate(txnId, serialized);
+                value = inflate(commandStore, txnId, serialized);
                 if (value == null)
                     return null;
             }
@@ -1212,7 +1212,7 @@ public class AccordCache implements CacheSize
 
             try
             {
-                return SavedCommand.asSerializedDiff(null, value, 
current_version);
+                return AccordJournal.asSerializedChange(null, value, 
current_version);
             }
             catch (IOException e)
             {
@@ -1222,15 +1222,15 @@ public class AccordCache implements CacheSize
         }
 
         @Override
-        public @Nullable Command inflate(TxnId key, Object serialized)
+        public @Nullable Command inflate(AccordCommandStore commandStore, 
TxnId key, Object serialized)
         {
-            SavedCommand.Builder builder = new SavedCommand.Builder(key);
+            AccordJournal.Builder builder = new AccordJournal.Builder(key);
             ByteBuffer buffer = (ByteBuffer) serialized;
             buffer.mark();
             try (DataInputBuffer buf = new DataInputBuffer(buffer, false))
             {
                 builder.deserializeNext(buf, current_version);
-                return builder.construct();
+                return 
builder.construct(commandStore.unsafeGetRedundantBefore());
             }
             catch (UnknownTableException e)
             {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java 
b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
index 4bfa4be308..4e152e1650 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
@@ -368,7 +368,7 @@ public class AccordCacheEntry<K, V> extends 
IntrusiveLinkedListNode
         if (isShrunk())
         {
             AccordCache.Type<K, V, ?> parent = owner.parent();
-            inflate(key, parent.adapter());
+            inflate(owner.commandStore, key, parent.adapter());
             updateSize(parent);
         }
 
@@ -538,17 +538,17 @@ public class AccordCacheEntry<K, V> extends 
IntrusiveLinkedListNode
         return true;
     }
 
-    private void inflate(K key, Adapter<K, V, ?> adapter)
+    private void inflate(AccordCommandStore commandStore, K key, Adapter<K, V, 
?> adapter)
     {
         Invariants.checkState(isShrunk());
         if (isNested())
         {
             Nested nested = (Nested) state;
-            nested.state = adapter.inflate(key, nested.state);
+            nested.state = adapter.inflate(commandStore, key, nested.state);
         }
         else
         {
-            state = adapter.inflate(key, state);
+            state = adapter.inflate(commandStore, key, state);
         }
         status &= ~SHRUNK;
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 35e8dca12a..81ce3ac8a4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
@@ -63,7 +64,6 @@ import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.service.accord.SavedCommand.MinimalCommand;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.utils.Clock;
@@ -71,12 +71,12 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static accord.api.Journal.CommandUpdate;
 import static accord.api.Journal.FieldUpdates;
+import static accord.api.Journal.Load.MINIMAL;
 import static accord.api.Journal.Loader;
 import static accord.api.Journal.OnDone;
 import static accord.local.KeyHistory.SYNC;
 import static accord.primitives.Status.Committed;
 import static accord.utils.Invariants.checkState;
-import static org.apache.cassandra.service.accord.SavedCommand.Load.MINIMAL;
 
 public class AccordCommandStore extends CommandStore
 {
@@ -141,7 +141,7 @@ public class AccordCommandStore extends CommandStore
     }
 
     public final String loggingId;
-    private final IJournal journal;
+    private final Journal journal;
     private final AccordExecutor executor;
     private final Executor taskExecutor;
     private final ExclusiveCaches caches;
@@ -160,7 +160,7 @@ public class AccordCommandStore extends CommandStore
                               ProgressLog.Factory progressLogFactory,
                               LocalListeners.Factory listenerFactory,
                               EpochUpdateHolder epochUpdateHolder,
-                              IJournal journal,
+                              Journal journal,
                               AccordExecutor executor)
     {
         super(id, node, agent, dataStore, progressLogFactory, listenerFactory, 
epochUpdateHolder);
@@ -300,9 +300,9 @@ public class AccordCommandStore extends CommandStore
     }
 
     @VisibleForTesting
-    public void sanityCheckCommand(Command command)
+    public void sanityCheckCommand(RedundantBefore redundantBefore, Command 
command)
     {
-        ((AccordJournal) journal).sanityCheck(id, command);
+        ((AccordJournal) journal).sanityCheck(id, redundantBefore, command);
     }
 
     CommandsForKey loadCommandsForKey(RoutableKey key)
@@ -487,7 +487,7 @@ public class AccordCommandStore extends CommandStore
         return command;
     }
 
-    public MinimalCommand loadMinimal(TxnId txnId)
+    public Command.Minimal loadMinimal(TxnId txnId)
     {
         return journal.loadMinimal(id, txnId, MINIMAL, 
unsafeGetRedundantBefore(), durableBefore());
     }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 6e2e57fc40..a17c3528f5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -18,19 +18,20 @@
 package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.List;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.impl.CommandChange;
 import accord.impl.ErasedSafeCommand;
 import accord.local.Cleanup;
 import accord.local.Command;
@@ -54,6 +55,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.journal.Compactor;
@@ -65,14 +67,30 @@ import org.apache.cassandra.net.MessagingService;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
 import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
 import org.apache.cassandra.service.accord.api.AccordAgent;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
+import org.apache.cassandra.service.accord.serializers.DepsSerializers;
+import org.apache.cassandra.service.accord.serializers.ResultSerializers;
+import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
 import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
+import static accord.impl.CommandChange.anyFieldChanged;
+import static accord.impl.CommandChange.getFieldChanged;
+import static accord.impl.CommandChange.getFieldIsNull;
+import static accord.impl.CommandChange.getFlags;
+import static accord.impl.CommandChange.getWaitingOn;
+import static accord.impl.CommandChange.nextSetField;
+import static accord.impl.CommandChange.setFieldChanged;
+import static accord.impl.CommandChange.setFieldIsNull;
+import static accord.impl.CommandChange.toIterableSetFields;
+import static accord.impl.CommandChange.unsetIterableFields;
+import static accord.impl.CommandChange.validateFlags;
 import static accord.primitives.SaveStatus.ErasedOrVestigial;
 import static accord.primitives.Status.Truncated;
 import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
 
-public class AccordJournal implements IJournal, Shutdownable
+public class AccordJournal implements accord.api.Journal, Shutdownable
 {
     static
     {
@@ -188,7 +206,7 @@ public class AccordJournal implements IJournal, Shutdownable
     @Override
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
-        SavedCommand.Builder builder = loadDiffs(commandStoreId, txnId);
+        Builder builder = load(commandStoreId, txnId);
         Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore, 
durableBefore);
         switch (cleanup)
         {
@@ -197,14 +215,14 @@ public class AccordJournal implements IJournal, 
Shutdownable
             case ERASE:
                 return ErasedSafeCommand.erased(txnId, ErasedOrVestigial);
         }
-        return builder.construct();
+        return builder.construct(redundantBefore);
     }
 
     @Override
-    public SavedCommand.MinimalCommand loadMinimal(int commandStoreId, TxnId 
txnId, SavedCommand.Load load, RedundantBefore redundantBefore, DurableBefore 
durableBefore)
+    public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load 
load, RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
-        SavedCommand.Builder builder = loadDiffs(commandStoreId, txnId, load);
-        if (!builder.nextCalled)
+        Builder builder = loadDiffs(commandStoreId, txnId, load);
+        if (builder.isEmpty())
             return null;
 
         Cleanup cleanup = builder.shouldCleanup(node.agent(), redundantBefore, 
durableBefore);
@@ -215,11 +233,11 @@ public class AccordJournal implements IJournal, 
Shutdownable
             case ERASE:
                 return null;
         }
-        Invariants.checkState(builder.saveStatus != null, "No saveSatus 
loaded, but next was called and cleanup was not: %s", builder);
+        Invariants.checkState(builder.saveStatus() != null, "No saveSatus 
loaded, but next was called and cleanup was not: %s", builder);
         return builder.asMinimal();
     }
 
-    @VisibleForTesting
+    @Override
     public RedundantBefore loadRedundantBefore(int store)
     {
         IdentityAccumulator<RedundantBefore> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store));
@@ -250,8 +268,8 @@ public class AccordJournal implements IJournal, Shutdownable
     @Override
     public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
     {
-        SavedCommand.Writer diff = SavedCommand.diff(update.before, 
update.after);
-        if (diff == null || status == Status.REPLAY)
+        Writer diff = Writer.make(update.before, update.after);
+        if (diff == null)
         {
             if (onFlush != null)
                 onFlush.run();
@@ -272,9 +290,6 @@ public class AccordJournal implements IJournal, Shutdownable
             @Override
             public AsyncResult<?> persist(DurableBefore addDurableBefore, 
DurableBefore newDurableBefore)
             {
-                if (status == Status.REPLAY)
-                    return AsyncResults.success(null);
-
                 AsyncResult.Settable<Void> result = AsyncResults.settable();
                 JournalKey key = new JournalKey(TxnId.NONE, 
JournalKey.Type.DURABLE_BEFORE, 0);
                 RecordPointer pointer = appendInternal(key, addDurableBefore);
@@ -315,18 +330,18 @@ public class AccordJournal implements IJournal, 
Shutdownable
             onFlush.run();
     }
 
-    @VisibleForTesting
-    public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId, 
SavedCommand.Load load)
+    private Builder loadDiffs(int commandStoreId, TxnId txnId, Load load)
     {
         JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, 
commandStoreId);
-        SavedCommand.Builder builder = new SavedCommand.Builder(txnId, load);
+        Builder builder = new Builder(txnId, load);
         journalTable.readAll(key, builder::deserializeNext);
         return builder;
     }
 
-    public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId)
+    @VisibleForTesting
+    public Builder load(int commandStoreId, TxnId txnId)
     {
-        return loadDiffs(commandStoreId, txnId, SavedCommand.Load.ALL);
+        return loadDiffs(commandStoreId, txnId, Load.ALL);
     }
 
     private <BUILDER> BUILDER readAll(JournalKey key)
@@ -351,17 +366,17 @@ public class AccordJournal implements IJournal, 
Shutdownable
         journal.closeCurrentSegmentForTestingIfNonEmpty();
     }
 
-    public void sanityCheck(int commandStoreId, Command orig)
+    public void sanityCheck(int commandStoreId, RedundantBefore 
redundantBefore, Command orig)
     {
-        SavedCommand.Builder diffs = loadDiffs(commandStoreId, orig.txnId());
-        diffs.forceResult(orig.result());
+        Builder builder = load(commandStoreId, orig.txnId());
+        builder.forceResult(orig.result());
         // We can only use strict equality if we supply result.
-        Command reconstructed = diffs.construct();
+        Command reconstructed = builder.construct(redundantBefore);
         Invariants.checkState(orig.equals(reconstructed),
                               '\n' +
                               "Original:      %s\n" +
                               "Reconstructed: %s\n" +
-                              "Diffs:         %s", orig, reconstructed, diffs);
+                              "Diffs:         %s", orig, reconstructed, 
builder);
     }
 
     @VisibleForTesting
@@ -391,7 +406,7 @@ public class AccordJournal implements IJournal, Shutdownable
         try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = 
journalTable.readAll())
         {
             JournalKey key;
-            SavedCommand.Builder builder = new SavedCommand.Builder();
+            Builder builder = new Builder();
 
             while ((key = iter.key()) != null)
             {
@@ -417,12 +432,12 @@ public class AccordJournal implements IJournal, 
Shutdownable
                     }
                 });
 
-                if (builder.nextCalled)
+                if (!builder.isEmpty())
                 {
-                    Command command = builder.construct();
+                    CommandStore commandStore = 
commandStores.forId(key.commandStoreId);
+                    Command command = 
builder.construct(commandStore.unsafeGetRedundantBefore());
                     Invariants.checkState(command.saveStatus() != 
SaveStatus.Uninitialised,
                                           "Found uninitialized command in the 
log: %s %s", command.toString(), builder.toString());
-                    CommandStore commandStore = 
commandStores.forId(key.commandStoreId);
                     Loader loader = commandStore.loader();
                     async(loader::load, command).get();
                     if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 
&& !command.hasBeen(Truncated))
@@ -454,65 +469,306 @@ public class AccordJournal implements IJournal, 
Shutdownable
         return future;
     }
 
-    // TODO: this is here temporarily; for debugging purposes
+    public static @Nullable ByteBuffer asSerializedChange(Command before, 
Command after, int userVersion) throws IOException
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            Writer writer = Writer.make(before, after);
+            if (writer == null)
+                return null;
+
+            writer.write(out, userVersion);
+            return out.asNewBuffer();
+        }
+    }
+
     @VisibleForTesting
-    public void checkAllCommands()
+    public void unsafeSetStarted()
     {
-        try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = 
journalTable.readAll())
+        status = Status.STARTED;
+    }
+
+    public static class Writer implements Journal.Writer
+    {
+        private final Command after;
+        private final int flags;
+
+        private Writer(Command after, int flags)
         {
-            IAccordService.CompactionInfo compactionInfo = 
AccordService.instance().getCompactionInfo();
-            JournalKey key;
-            SavedCommand.Builder builder = new SavedCommand.Builder();
-            while ((key = iter.key()) != null)
+            this.after = after;
+            this.flags = flags;
+        }
+
+        public static Writer make(Command before, Command after)
+        {
+            if (before == after
+                || after == null
+                || after.saveStatus() == SaveStatus.Uninitialised)
+                return null;
+
+            int flags = validateFlags(getFlags(before, after));
+            if (!anyFieldChanged(flags))
+                return null;
+
+            return new Writer(after, flags);
+        }
+
+        @Override
+        public void write(DataOutputPlus out, int userVersion) throws 
IOException
+        {
+            serialize(after, flags, out, userVersion);
+        }
+
+        private static void serialize(Command command, int flags, 
DataOutputPlus out, int userVersion) throws IOException
+        {
+            Invariants.checkState(flags != 0);
+            out.writeInt(flags);
+
+            int iterable = toIterableSetFields(flags);
+            while (iterable != 0)
             {
-                builder.reset(key.id);
-                if (key.type != JournalKey.Type.COMMAND_DIFF)
+                CommandChange.Fields field = nextSetField(iterable);
+                if (getFieldIsNull(field, flags))
                 {
-                    // TODO (required): add "skip" for the key to avoid 
getting stuck
-                    iter.readAllForKey(key, (segment, position, key1, buffer, 
hosts, userVersion) -> {});
+                    iterable = unsetIterableFields(field, iterable);
                     continue;
                 }
 
-                JournalKey finalKey = key;
-                List<RecordPointer> pointers = new ArrayList<>();
-                try
+                switch (field)
+                {
+                    case EXECUTE_AT:
+                        
CommandSerializers.timestamp.serialize(command.executeAt(), out, userVersion);
+                        break;
+                    case EXECUTES_AT_LEAST:
+                        
CommandSerializers.timestamp.serialize(command.executesAtLeast(), out, 
userVersion);
+                        break;
+                    case SAVE_STATUS:
+                        out.writeShort(command.saveStatus().ordinal());
+                        break;
+                    case DURABILITY:
+                        out.writeByte(command.durability().ordinal());
+                        break;
+                    case ACCEPTED:
+                        
CommandSerializers.ballot.serialize(command.acceptedOrCommitted(), out, 
userVersion);
+                        break;
+                    case PROMISED:
+                        
CommandSerializers.ballot.serialize(command.promised(), out, userVersion);
+                        break;
+                    case PARTICIPANTS:
+                        
CommandSerializers.participants.serialize(command.participants(), out, 
userVersion);
+                        break;
+                    case PARTIAL_TXN:
+                        
CommandSerializers.partialTxn.serialize(command.partialTxn(), out, userVersion);
+                        break;
+                    case PARTIAL_DEPS:
+                        
DepsSerializers.partialDeps.serialize(command.partialDeps(), out, userVersion);
+                        break;
+                    case WAITING_ON:
+                        Command.WaitingOn waitingOn = getWaitingOn(command);
+                        long size = 
WaitingOnSerializer.serializedSize(command.txnId(), waitingOn);
+                        ByteBuffer serialized = 
WaitingOnSerializer.serialize(command.txnId(), waitingOn);
+                        Invariants.checkState(serialized.remaining() == size);
+                        out.writeInt((int) size);
+                        out.write(serialized);
+                        break;
+                    case WRITES:
+                        CommandSerializers.writes.serialize(command.writes(), 
out, userVersion);
+                        break;
+                    case RESULT:
+                        ResultSerializers.result.serialize(command.result(), 
out, userVersion);
+                        break;
+                    case CLEANUP:
+                        throw new IllegalStateException();
+                }
+
+                iterable = unsetIterableFields(field, iterable);
+            }
+        }
+    }
+
+    public static class Builder extends CommandChange.Builder
+    {
+        public Builder()
+        {
+            super(null, Load.ALL);
+        }
+
+        public Builder(TxnId txnId)
+        {
+            super(txnId, Load.ALL);
+        }
+
+        public Builder(TxnId txnId, Load load)
+        {
+            super(txnId, load);
+        }
+        public ByteBuffer asByteBuffer(RedundantBefore redundantBefore, int 
userVersion) throws IOException
+        {
+            try (DataOutputBuffer out = new DataOutputBuffer())
+            {
+                serialize(out, redundantBefore, userVersion);
+                return out.asNewBuffer();
+            }
+        }
+
+        public Builder maybeCleanup(Cleanup cleanup)
+        {
+            super.maybeCleanup(cleanup);
+            return this;
+        }
+
+        public void serialize(DataOutputPlus out, RedundantBefore 
redundantBefore, int userVersion) throws IOException
+        {
+            Invariants.checkState(mask == 0);
+            Invariants.checkState(flags != 0);
+
+            int flags = validateFlags(this.flags);
+            Writer.serialize(construct(redundantBefore), flags, out, 
userVersion);
+        }
+
+        public void deserializeNext(DataInputPlus in, int userVersion) throws 
IOException
+        {
+            Invariants.checkState(txnId != null);
+            int flags = in.readInt();
+            Invariants.checkState(flags != 0);
+            nextCalled = true;
+            count++;
+
+            int iterable = toIterableSetFields(flags);
+            while (iterable != 0)
+            {
+                CommandChange.Fields field = nextSetField(iterable);
+                if (getFieldChanged(field, this.flags) || 
getFieldIsNull(field, mask))
+                {
+                    if (!getFieldIsNull(field, flags))
+                        skip(field, in, userVersion);
+
+                    iterable = unsetIterableFields(field, iterable);
+                    continue;
+                }
+                this.flags = setFieldChanged(field, this.flags);
+
+                if (getFieldIsNull(field, flags))
+                {
+                    this.flags = setFieldIsNull(field, this.flags);
+                }
+                else
                 {
-                    iter.readAllForKey(key, (segment, position, local, buffer, 
hosts, userVersion) -> {
-                        pointers.add(new RecordPointer(segment, position));
-                        Invariants.checkState(finalKey.equals(local));
-                        try (DataInputBuffer in = new DataInputBuffer(buffer, 
false))
+                    deserialize(field, in, userVersion);
+                }
+
+                iterable = unsetIterableFields(field, iterable);
+            }
+        }
+
+        private void deserialize(CommandChange.Fields field, DataInputPlus in, 
int userVersion) throws IOException
+        {
+            switch (field)
+            {
+                case EXECUTE_AT:
+                    executeAt = CommandSerializers.timestamp.deserialize(in, 
userVersion);
+                    break;
+                case EXECUTES_AT_LEAST:
+                    executeAtLeast = 
CommandSerializers.timestamp.deserialize(in, userVersion);
+                    break;
+                case SAVE_STATUS:
+                    saveStatus = SaveStatus.values()[in.readShort()];
+                    break;
+                case DURABILITY:
+                    durability = 
accord.primitives.Status.Durability.values()[in.readByte()];
+                    break;
+                case ACCEPTED:
+                    acceptedOrCommitted = 
CommandSerializers.ballot.deserialize(in, userVersion);
+                    break;
+                case PROMISED:
+                    promised = CommandSerializers.ballot.deserialize(in, 
userVersion);
+                    break;
+                case PARTICIPANTS:
+                    participants = 
CommandSerializers.participants.deserialize(in, userVersion);
+                    break;
+                case PARTIAL_TXN:
+                    partialTxn = CommandSerializers.partialTxn.deserialize(in, 
userVersion);
+                    break;
+                case PARTIAL_DEPS:
+                    partialDeps = DepsSerializers.partialDeps.deserialize(in, 
userVersion);
+                    break;
+                case WAITING_ON:
+                    int size = in.readInt();
+
+                    byte[] waitingOnBytes = new byte[size];
+                    in.readFully(waitingOnBytes);
+                    ByteBuffer buffer = ByteBuffer.wrap(waitingOnBytes);
+                    waitingOn = (localTxnId, deps) -> {
+                        try
                         {
-                            builder.deserializeNext(in, userVersion);
+                            Invariants.nonNull(deps);
+                            return WaitingOnSerializer.deserialize(localTxnId, 
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
                         }
                         catch (IOException e)
                         {
-                            // can only throw if serializer is buggy
-                            throw new RuntimeException(e);
+                            throw Throwables.unchecked(e);
                         }
-                    });
-
-                    Cleanup cleanup = builder.shouldCleanup(node.agent(), 
compactionInfo.redundantBefores.get(key.commandStoreId), 
compactionInfo.durableBefores.get(key.commandStoreId));
-                    switch (cleanup)
-                    {
-                        case ERASE:
-                        case EXPUNGE:
-                        case EXPUNGE_PARTIAL:
-                        case VESTIGIAL:
-                            continue;
-                    }
-                    builder.construct();
-                }
-                catch (Throwable t)
-                {
-                    throw new RuntimeException(String.format("Caught an 
exception after iterating over: %s", pointers),
-                                               t);
-                }
+                    };
+                    break;
+                case WRITES:
+                    writes = CommandSerializers.writes.deserialize(in, 
userVersion);
+                    break;
+                case CLEANUP:
+                    Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
+                    if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
+                        cleanup = newCleanup;
+                    break;
+                case RESULT:
+                    result = ResultSerializers.result.deserialize(in, 
userVersion);
+                    break;
             }
         }
-    }
 
-    public void unsafeSetStarted()
-    {
-        status = Status.STARTED;
+        private void skip(CommandChange.Fields field, DataInputPlus in, int 
userVersion) throws IOException
+        {
+            switch (field)
+            {
+                case EXECUTE_AT:
+                case EXECUTES_AT_LEAST:
+                    CommandSerializers.timestamp.skip(in, userVersion);
+                    break;
+                case SAVE_STATUS:
+                    in.readShort();
+                    break;
+                case DURABILITY:
+                    in.readByte();
+                    break;
+                case ACCEPTED:
+                case PROMISED:
+                    CommandSerializers.ballot.skip(in, userVersion);
+                    break;
+                case PARTICIPANTS:
+                    // TODO (expected): skip
+                    CommandSerializers.participants.deserialize(in, 
userVersion);
+                    break;
+                case PARTIAL_TXN:
+                    CommandSerializers.partialTxn.deserialize(in, userVersion);
+                    break;
+                case PARTIAL_DEPS:
+                    // TODO (expected): skip
+                    DepsSerializers.partialDeps.deserialize(in, userVersion);
+                    break;
+                case WAITING_ON:
+                    int size = in.readInt();
+                    in.skipBytesFully(size);
+                    break;
+                case WRITES:
+                    // TODO (expected): skip
+                    CommandSerializers.writes.deserialize(in, userVersion);
+                    break;
+                case CLEANUP:
+                    in.readByte();
+                    break;
+                case RESULT:
+                    // TODO (expected): skip
+                    result = ResultSerializers.result.deserialize(in, 
userVersion);
+                    break;
+            }
+        }
     }
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 7f1c71f351..a11dfb744b 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -35,8 +35,8 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.accord.serializers.CommandStoreSerializers;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
 
+import static accord.api.Journal.Load.ALL;
 import static accord.local.CommandStores.RangesForEpoch;
-import static org.apache.cassandra.service.accord.SavedCommand.Load.ALL;
 
 // TODO (required): test with large collection values, and perhaps split out 
some fields if they have a tendency to grow larger
 // TODO (required): alert on metadata size
@@ -56,16 +56,16 @@ public class AccordJournalValueSerializers
     }
 
     public static class CommandDiffSerializer
-    implements FlyweightSerializer<SavedCommand.Writer, SavedCommand.Builder>
+    implements FlyweightSerializer<AccordJournal.Writer, AccordJournal.Builder>
     {
         @Override
-        public SavedCommand.Builder mergerFor(JournalKey journalKey)
+        public AccordJournal.Builder mergerFor(JournalKey journalKey)
         {
-            return new SavedCommand.Builder(journalKey.id, ALL);
+            return new AccordJournal.Builder(journalKey.id, ALL);
         }
 
         @Override
-        public void serialize(JournalKey key, SavedCommand.Writer writer, 
DataOutputPlus out, int userVersion)
+        public void serialize(JournalKey key, AccordJournal.Writer writer, 
DataOutputPlus out, int userVersion)
         {
             try
             {
@@ -78,13 +78,17 @@ public class AccordJournalValueSerializers
         }
 
         @Override
-        public void reserialize(JournalKey key, SavedCommand.Builder from, 
DataOutputPlus out, int userVersion) throws IOException
+        public void reserialize(JournalKey key, AccordJournal.Builder from, 
DataOutputPlus out, int userVersion) throws IOException
         {
-            from.serialize(out, userVersion);
+            from.serialize(out,
+                           // In CompactionIterator, we are dealing with 
relatively recent records, so we do not pass redundant before here.
+                           // However, we do on load and during Journal 
SSTable compaction.
+                           RedundantBefore.EMPTY,
+                           userVersion);
         }
 
         @Override
-        public void deserialize(JournalKey journalKey, SavedCommand.Builder 
into, DataInputPlus in, int userVersion) throws IOException
+        public void deserialize(JournalKey journalKey, AccordJournal.Builder 
into, DataInputPlus in, int userVersion) throws IOException
         {
             into.deserializeNext(in, userVersion);
         }
@@ -296,8 +300,8 @@ public class AccordJournalValueSerializers
             from.forEach((epoch, ranges) -> {
                 try
                 {
-                    KeySerializers.ranges.serialize(ranges, out, 
messagingVersion);
                     out.writeLong(epoch);
+                    KeySerializers.ranges.serialize(ranges, out, 
messagingVersion);
                 }
                 catch (Throwable t)
                 {
@@ -320,8 +324,8 @@ public class AccordJournalValueSerializers
             long[] epochs = new long[size];
             for (int i = 0; i < ranges.length; i++)
             {
-                ranges[i] = KeySerializers.ranges.deserialize(in, 
messagingVersion);
                 epochs[i] = in.readLong();
+                ranges[i] = KeySerializers.ranges.deserialize(in, 
messagingVersion);
             }
             Invariants.checkState(ranges.length == epochs.length);
             into.update(new RangesForEpoch(epochs, ranges));
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java 
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index ea4277391a..db36c42562 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -618,7 +618,7 @@ public abstract class AccordTask<R> extends Task implements 
Runnable, Function<S
             condition.awaitUninterruptibly();
 
             for (Command check : sanityCheck)
-                this.commandStore.sanityCheckCommand(check);
+                
this.commandStore.sanityCheckCommand(commandStore.unsafeGetRedundantBefore(), 
check);
 
             if (onFlush != null) onFlush.run();
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index b0aa948e26..29cdcb5195 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -169,7 +169,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
Summary> implements Co
         {
             if (findAsDep == null)
             {
-                SavedCommand.MinimalCommand cmd = 
manager.commandStore.loadMinimal(txnId);
+                Command.Minimal cmd = manager.commandStore.loadMinimal(txnId);
                 if (cmd != null)
                     return ifRelevant(cmd);
             }
@@ -225,7 +225,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
Summary> implements Co
             return ifRelevant(cmd.txnId(), cmd.executeAt(), cmd.saveStatus(), 
cmd.participants(), cmd.partialDeps());
         }
 
-        public Summary ifRelevant(SavedCommand.MinimalCommand cmd)
+        public Summary ifRelevant(Command.Minimal cmd)
         {
             Invariants.checkState(findAsDep == null);
             return ifRelevant(cmd.txnId, cmd.executeAt, cmd.saveStatus, 
cmd.participants, null);
diff --git a/src/java/org/apache/cassandra/service/accord/IJournal.java 
b/src/java/org/apache/cassandra/service/accord/IJournal.java
deleted file mode 100644
index 66b1f65ce5..0000000000
--- a/src/java/org/apache/cassandra/service/accord/IJournal.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import accord.api.Journal;
-import accord.local.Command;
-import accord.local.DurableBefore;
-import accord.local.RedundantBefore;
-import accord.primitives.TxnId;
-import accord.utils.PersistentField.Persister;
-
-public interface IJournal extends Journal
-{
-    // TODO (required): migrate to accord.api.Journal
-    default SavedCommand.MinimalCommand loadMinimal(int commandStoreId, TxnId 
txnId, SavedCommand.Load load, RedundantBefore redundantBefore, DurableBefore 
durableBefore)
-    {
-        Command command = loadCommand(commandStoreId, txnId, redundantBefore, 
durableBefore);
-        if (command == null)
-            return null;
-        return new SavedCommand.MinimalCommand(command.txnId(), 
command.saveStatus(), command.participants(), command.durability(), 
command.executeAt(), command.writes());
-    }
-
-    Persister<DurableBefore, DurableBefore> durableBeforePersister();
-}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java 
b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
deleted file mode 100644
index 9e38e5158c..0000000000
--- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
+++ /dev/null
@@ -1,980 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import accord.api.Agent;
-import accord.api.Result;
-import accord.local.Cleanup;
-import accord.local.Command;
-import accord.local.CommonAttributes;
-import accord.local.DurableBefore;
-import accord.local.RedundantBefore;
-import accord.local.StoreParticipants;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.SaveStatus;
-import accord.primitives.Status;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.primitives.Writes;
-import accord.utils.Invariants;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.journal.Journal;
-import org.apache.cassandra.service.accord.serializers.CommandSerializers;
-import org.apache.cassandra.service.accord.serializers.DepsSerializers;
-import org.apache.cassandra.service.accord.serializers.ResultSerializers;
-import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
-import org.apache.cassandra.utils.Throwables;
-
-import static accord.local.Cleanup.NO;
-import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
-import static accord.primitives.Known.KnownDeps.DepsErased;
-import static accord.primitives.Known.KnownDeps.DepsUnknown;
-import static accord.primitives.Known.KnownDeps.NoDeps;
-import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
-import static accord.primitives.Status.Durability.NotDurable;
-import static accord.utils.Invariants.illegalState;
-import static 
org.apache.cassandra.service.accord.SavedCommand.Fields.DURABILITY;
-import static 
org.apache.cassandra.service.accord.SavedCommand.Fields.EXECUTE_AT;
-import static 
org.apache.cassandra.service.accord.SavedCommand.Fields.PARTICIPANTS;
-import static org.apache.cassandra.service.accord.SavedCommand.Fields.RESULT;
-import static 
org.apache.cassandra.service.accord.SavedCommand.Fields.SAVE_STATUS;
-import static org.apache.cassandra.service.accord.SavedCommand.Fields.WRITES;
-import static org.apache.cassandra.service.accord.SavedCommand.Load.ALL;
-
-public class SavedCommand
-{
-    // This enum is order-dependent
-    public enum Fields
-    {
-        PARTICIPANTS, // stored first so we can index it
-        SAVE_STATUS,
-        PARTIAL_DEPS,
-        EXECUTE_AT,
-        EXECUTES_AT_LEAST,
-        DURABILITY,
-        ACCEPTED,
-        PROMISED,
-        WAITING_ON,
-        PARTIAL_TXN,
-        WRITES,
-        CLEANUP,
-        RESULT,
-        ;
-
-        public static final Fields[] FIELDS = values();
-    }
-
-    // TODO: maybe rename this and enclosing classes?
-    public static class Writer implements Journal.Writer
-    {
-        private final Command after;
-        private final TxnId txnId;
-        private final int flags;
-
-        @VisibleForTesting
-        public Writer(Command after, int flags)
-        {
-            this(after.txnId(), after, flags);
-        }
-
-        @VisibleForTesting
-        public Writer(TxnId txnId, Command after, int flags)
-        {
-            this.txnId = txnId;
-            this.after = after;
-            this.flags = flags;
-        }
-
-        @VisibleForTesting
-        public Command after()
-        {
-            return after;
-        }
-
-        public void write(DataOutputPlus out, int userVersion) throws 
IOException
-        {
-            serialize(after, flags, out, userVersion);
-        }
-
-        public TxnId key()
-        {
-            return txnId;
-        }
-    }
-
-    public static @Nullable ByteBuffer asSerializedDiff(Command before, 
Command after, int userVersion) throws IOException
-    {
-        try (DataOutputBuffer out = new DataOutputBuffer())
-        {
-            Writer writer = diff(before, after);
-            if (writer == null)
-                return null;
-
-            writer.write(out, userVersion);
-            return out.asNewBuffer();
-        }
-    }
-
-    @Nullable
-    public static Writer diff(Command before, Command after)
-    {
-        if (before == after
-            || after == null
-            || after.saveStatus() == SaveStatus.Uninitialised)
-            return null;
-
-        int flags = validateFlags(getFlags(before, after));
-        if (!anyFieldChanged(flags))
-            return null;
-
-        return new Writer(after, flags);
-    }
-
-    // TODO (required): calculate flags once
-    private static boolean anyFieldChanged(int flags)
-    {
-        return (flags >>> 16) != 0;
-    }
-
-    private static int validateFlags(int flags)
-    {
-        Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff)));
-        return flags;
-    }
-    
-    public static void serialize(Command after, int flags, DataOutputPlus out, 
int userVersion) throws IOException
-    {
-        Invariants.checkState(flags != 0);
-        out.writeInt(flags);
-
-        int iterable = toIterableSetFields(flags);
-        while (iterable != 0)
-        {
-            Fields field = nextSetField(iterable);
-            if (getFieldIsNull(field, flags))
-            {
-                iterable = unsetIterableFields(field, iterable);
-                continue;
-            }
-
-            switch (field)
-            {
-                case EXECUTE_AT:
-                    CommandSerializers.timestamp.serialize(after.executeAt(), 
out, userVersion);
-                    break;
-                case EXECUTES_AT_LEAST:
-                    
CommandSerializers.timestamp.serialize(after.executesAtLeast(), out, 
userVersion);
-                    break;
-                case SAVE_STATUS:
-                    out.writeShort(after.saveStatus().ordinal());
-                    break;
-                case DURABILITY:
-                    out.writeByte(after.durability().ordinal());
-                    break;
-                case ACCEPTED:
-                    
CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), out, 
userVersion);
-                    break;
-                case PROMISED:
-                    CommandSerializers.ballot.serialize(after.promised(), out, 
userVersion);
-                    break;
-                case PARTICIPANTS:
-                    
CommandSerializers.participants.serialize(after.participants(), out, 
userVersion);
-                    break;
-                case PARTIAL_TXN:
-                    
CommandSerializers.partialTxn.serialize(after.partialTxn(), out, userVersion);
-                    break;
-                case PARTIAL_DEPS:
-                    DepsSerializers.partialDeps.serialize(after.partialDeps(), 
out, userVersion);
-                    break;
-                case WAITING_ON:
-                    Command.WaitingOn waitingOn = getWaitingOn(after);
-                    long size = 
WaitingOnSerializer.serializedSize(after.txnId(), waitingOn);
-                    ByteBuffer serialized = 
WaitingOnSerializer.serialize(after.txnId(), waitingOn);
-                    Invariants.checkState(serialized.remaining() == size);
-                    out.writeInt((int) size);
-                    out.write(serialized);
-                    break;
-                case WRITES:
-                    CommandSerializers.writes.serialize(after.writes(), out, 
userVersion);
-                    break;
-                case RESULT:
-                    ResultSerializers.result.serialize(after.result(), out, 
userVersion);
-                    break;
-                case CLEANUP:
-                    throw new IllegalStateException();
-            }
-
-            iterable = unsetIterableFields(field, iterable);
-        }
-    }
-
-    @VisibleForTesting
-    public static int getFlags(Command before, Command after)
-    {
-        int flags = 0;
-
-        flags = collectFlags(before, after, Command::executeAt, true, 
Fields.EXECUTE_AT, flags);
-        flags = collectFlags(before, after, Command::executesAtLeast, true, 
Fields.EXECUTES_AT_LEAST, flags);
-        flags = collectFlags(before, after, Command::saveStatus, false, 
SAVE_STATUS, flags);
-        flags = collectFlags(before, after, Command::durability, false, 
DURABILITY, flags);
-
-        flags = collectFlags(before, after, Command::acceptedOrCommitted, 
false, Fields.ACCEPTED, flags);
-        flags = collectFlags(before, after, Command::promised, false, 
Fields.PROMISED, flags);
-
-        flags = collectFlags(before, after, Command::participants, true, 
PARTICIPANTS, flags);
-        flags = collectFlags(before, after, Command::partialTxn, false, 
Fields.PARTIAL_TXN, flags);
-        flags = collectFlags(before, after, Command::partialDeps, false, 
Fields.PARTIAL_DEPS, flags);
-
-        // TODO: waitingOn vs WaitingOnWithExecutedAt?
-        flags = collectFlags(before, after, SavedCommand::getWaitingOn, true, 
Fields.WAITING_ON, flags);
-
-        flags = collectFlags(before, after, Command::writes, false, WRITES, 
flags);
-
-        // Special-cased for Journal BurnTest integration
-        if ((before != null && before.result() != null && before.result() != 
ResultSerializers.APPLIED) ||
-            (after != null && after.result() != null && after.result() != 
ResultSerializers.APPLIED))
-        {
-            flags = collectFlags(before, after, Command::writes, false, 
RESULT, flags);
-        }
-
-        return flags;
-    }
-
-    static Command.WaitingOn getWaitingOn(Command command)
-    {
-        if (command instanceof Command.Committed)
-            return command.asCommitted().waitingOn();
-
-        return null;
-    }
-
-    private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch, Fields field, int flags)
-    {
-        VAL l = null;
-        VAL r = null;
-        if (lo != null) l = convert.apply(lo);
-        if (ro != null) r = convert.apply(ro);
-
-        if (l == r)
-            return flags; // no change
-
-        if (r == null)
-            flags = setFieldIsNull(field, flags);
-
-        if (l == null || r == null)
-            return setFieldChanged(field, flags);
-
-        assert allowClassMismatch || l.getClass() == r.getClass() : 
String.format("%s != %s", l.getClass(), r.getClass());
-
-        if (l.equals(r))
-            return flags; // no change
-
-        return setFieldChanged(field, flags);
-    }
-
-    private static int setFieldChanged(Fields field, int oldFlags)
-    {
-        return oldFlags | (0x10000 << field.ordinal());
-    }
-
-    @VisibleForTesting
-    static boolean getFieldChanged(Fields field, int oldFlags)
-    {
-        return (oldFlags & (0x10000 << field.ordinal())) != 0;
-    }
-
-    static int toIterableSetFields(int flags)
-    {
-        return flags >>> 16;
-    }
-
-    static Fields nextSetField(int iterable)
-    {
-        int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
-        return i == 32 ? null : Fields.FIELDS[i];
-    }
-
-    static int unsetIterableFields(Fields field, int iterable)
-    {
-        return iterable & ~(1 << field.ordinal());
-    }
-
-    @VisibleForTesting
-    static boolean getFieldIsNull(Fields field, int oldFlags)
-    {
-        return (oldFlags & (1 << field.ordinal())) != 0;
-    }
-
-    private static int setFieldIsNull(Fields field, int oldFlags)
-    {
-        return oldFlags | (1 << field.ordinal());
-    }
-
-    public enum Load
-    {
-        ALL(0),
-        PURGEABLE(SAVE_STATUS, PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES),
-        MINIMAL(SAVE_STATUS, PARTICIPANTS, EXECUTE_AT);
-
-        final int mask;
-
-        Load(int mask)
-        {
-            this.mask = mask;
-        }
-
-        Load(Fields ... fields)
-        {
-            int mask = -1;
-            for (Fields field : fields)
-                mask &= ~(1<< field.ordinal());
-            this.mask = mask;
-        }
-    }
-
-    public static class MinimalCommand
-    {
-        public final TxnId txnId;
-        public final SaveStatus saveStatus;
-        public final StoreParticipants participants;
-        public final Status.Durability durability;
-        public final Timestamp executeAt;
-        public final Writes writes;
-
-        public MinimalCommand(TxnId txnId, SaveStatus saveStatus, 
StoreParticipants participants, Status.Durability durability, Timestamp 
executeAt, Writes writes)
-        {
-            this.txnId = txnId;
-            this.saveStatus = saveStatus;
-            this.participants = participants;
-            this.durability = durability;
-            this.executeAt = executeAt;
-            this.writes = writes;
-        }
-    }
-
-    public static class Builder
-    {
-        final int mask;
-        int flags;
-
-        TxnId txnId;
-
-        Timestamp executeAt;
-        Timestamp executeAtLeast;
-        SaveStatus saveStatus;
-        Status.Durability durability;
-
-        Ballot acceptedOrCommitted;
-        Ballot promised;
-
-        StoreParticipants participants;
-        PartialTxn partialTxn;
-        PartialDeps partialDeps;
-
-        byte[] waitingOnBytes;
-        SavedCommand.WaitingOnProvider waitingOn;
-        Writes writes;
-        Result result;
-        Cleanup cleanup;
-
-        boolean nextCalled;
-        int count;
-
-        public Builder(TxnId txnId, Load load)
-        {
-            this.mask = load.mask;
-            init(txnId);
-        }
-
-        public Builder(TxnId txnId)
-        {
-            this(txnId, ALL);
-        }
-
-        public Builder(Load load)
-        {
-            this.mask = load.mask;
-        }
-
-        public Builder()
-        {
-            this(ALL);
-        }
-
-        public TxnId txnId()
-        {
-            return txnId;
-        }
-
-        public Timestamp executeAt()
-        {
-            return executeAt;
-        }
-
-        public Timestamp executeAtLeast()
-        {
-            return executeAtLeast;
-        }
-
-        public SaveStatus saveStatus()
-        {
-            return saveStatus;
-        }
-
-        public Status.Durability durability()
-        {
-            return durability;
-        }
-
-        public Ballot acceptedOrCommitted()
-        {
-            return acceptedOrCommitted;
-        }
-
-        public Ballot promised()
-        {
-            return promised;
-        }
-
-        public StoreParticipants participants()
-        {
-            return participants;
-        }
-
-        public PartialTxn partialTxn()
-        {
-            return partialTxn;
-        }
-
-        public PartialDeps partialDeps()
-        {
-            return partialDeps;
-        }
-
-        public SavedCommand.WaitingOnProvider waitingOn()
-        {
-            return waitingOn;
-        }
-
-        public Writes writes()
-        {
-            return writes;
-        }
-
-        public Result result()
-        {
-            return result;
-        }
-
-        public void clear()
-        {
-            flags = 0;
-            txnId = null;
-
-            executeAt = null;
-            executeAtLeast = null;
-            saveStatus = null;
-            durability = null;
-
-            acceptedOrCommitted = null;
-            promised = null;
-
-            participants = null;
-            partialTxn = null;
-            partialDeps = null;
-
-            waitingOnBytes = null;
-            waitingOn = null;
-            writes = null;
-            result = null;
-            cleanup = null;
-
-            nextCalled = false;
-            count = 0;
-        }
-
-        public void reset(TxnId txnId)
-        {
-            clear();
-            init(txnId);
-        }
-
-        public void init(TxnId txnId)
-        {
-            this.txnId = txnId;
-            durability = NotDurable;
-            acceptedOrCommitted = promised = Ballot.ZERO;
-            waitingOn = (txn, deps) -> null;
-            result = ResultSerializers.APPLIED;
-        }
-
-        public boolean isEmpty()
-        {
-            return !nextCalled;
-        }
-
-        public int count()
-        {
-            return count;
-        }
-
-        public Cleanup shouldCleanup(Agent agent, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
-        {
-            if (!nextCalled)
-                return NO;
-
-            if (saveStatus == null || participants == null)
-                return Cleanup.NO;
-
-            Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId, 
saveStatus, durability, participants, redundantBefore, durableBefore);
-            if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
-                cleanup = this.cleanup;
-            return cleanup;
-        }
-
-        // TODO (expected): avoid allocating new builder
-        public Builder maybeCleanup(Cleanup cleanup)
-        {
-            if (saveStatus() == null)
-                return this;
-
-            switch (cleanup)
-            {
-                case EXPUNGE:
-                case ERASE:
-                    return null;
-
-                case EXPUNGE_PARTIAL:
-                    return expungePartial(cleanup, saveStatus, true);
-
-                case VESTIGIAL:
-                case INVALIDATE:
-                    return saveStatusOnly();
-
-                case TRUNCATE_WITH_OUTCOME:
-                case TRUNCATE:
-                    return expungePartial(cleanup, cleanup.appliesIfNot, 
cleanup == TRUNCATE_WITH_OUTCOME);
-
-                case NO:
-                    return this;
-                default:
-                    throw new IllegalStateException("Unknown cleanup: " + 
cleanup);}
-        }
-
-        private Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus, 
boolean includeOutcome)
-        {
-            Invariants.checkState(txnId != null);
-            Builder builder = new Builder(txnId, ALL);
-
-            builder.count++;
-            builder.nextCalled = true;
-
-            Invariants.checkState(saveStatus != null);
-            builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
-            builder.saveStatus = saveStatus;
-            builder.flags = setFieldChanged(Fields.CLEANUP, builder.flags);
-            builder.cleanup = cleanup;
-            if (executeAt != null)
-            {
-                builder.flags = setFieldChanged(Fields.EXECUTE_AT, 
builder.flags);
-                builder.executeAt = executeAt;
-            }
-            if (durability != null)
-            {
-                builder.flags = setFieldChanged(DURABILITY, builder.flags);
-                builder.durability = durability;
-            }
-            if (participants != null)
-            {
-                builder.flags = setFieldChanged(PARTICIPANTS, builder.flags);
-                builder.participants = participants;
-            }
-            if (includeOutcome && builder.writes != null)
-            {
-                builder.flags = setFieldChanged(WRITES, builder.flags);
-                builder.writes = writes;
-            }
-
-            return builder;
-        }
-
-        private Builder saveStatusOnly()
-        {
-            Invariants.checkState(txnId != null);
-            Builder builder = new Builder(txnId, ALL);
-
-            builder.count++;
-            builder.nextCalled = true;
-
-            // TODO: these accesses can be abstracted away
-            if (saveStatus != null)
-            {
-                builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
-                builder.saveStatus = saveStatus;
-            }
-
-            return builder;
-        }
-
-        public ByteBuffer asByteBuffer(int userVersion) throws IOException
-        {
-            try (DataOutputBuffer out = new DataOutputBuffer())
-            {
-                serialize(out, userVersion);
-                return out.asNewBuffer();
-            }
-        }
-
-        public MinimalCommand asMinimal()
-        {
-            return new MinimalCommand(txnId, saveStatus, participants, 
durability, executeAt, writes);
-        }
-
-        public void serialize(DataOutputPlus out, int userVersion) throws 
IOException
-        {
-            Invariants.checkState(mask == 0);
-            Invariants.checkState(flags != 0);
-            out.writeInt(validateFlags(flags));
-
-            int iterable = toIterableSetFields(flags);
-            while (iterable != 0)
-            {
-                Fields field = nextSetField(iterable);
-                if (getFieldIsNull(field, flags))
-                {
-                    iterable = unsetIterableFields(field, iterable);
-                    continue;
-                }
-
-                switch (field)
-                {
-                    case EXECUTE_AT:
-                        CommandSerializers.timestamp.serialize(executeAt(), 
out, userVersion);
-                        break;
-                    case EXECUTES_AT_LEAST:
-                        
CommandSerializers.timestamp.serialize(executeAtLeast(), out, userVersion);
-                        break;
-                    case SAVE_STATUS:
-                        out.writeShort(saveStatus().ordinal());
-                        break;
-                    case DURABILITY:
-                        out.writeByte(durability().ordinal());
-                        break;
-                    case ACCEPTED:
-                        
CommandSerializers.ballot.serialize(acceptedOrCommitted(), out, userVersion);
-                        break;
-                    case PROMISED:
-                        CommandSerializers.ballot.serialize(promised(), out, 
userVersion);
-                        break;
-                    case PARTICIPANTS:
-                        
CommandSerializers.participants.serialize(participants(), out, userVersion);
-                        break;
-                    case PARTIAL_TXN:
-                        CommandSerializers.partialTxn.serialize(partialTxn(), 
out, userVersion);
-                        break;
-                    case PARTIAL_DEPS:
-                        DepsSerializers.partialDeps.serialize(partialDeps(), 
out, userVersion);
-                        break;
-                    case WAITING_ON:
-                        out.writeInt(waitingOnBytes.length);
-                        out.write(waitingOnBytes);
-                        break;
-                    case WRITES:
-                        CommandSerializers.writes.serialize(writes(), out, 
userVersion);
-                        break;
-                    case CLEANUP:
-                        out.writeByte(cleanup.ordinal());
-                        break;
-                    case RESULT:
-                        ResultSerializers.result.serialize(result(), out, 
userVersion);
-                        break;
-                }
-
-                iterable = unsetIterableFields(field, iterable);
-            }
-        }
-
-        public void deserializeNext(DataInputPlus in, int userVersion) throws 
IOException
-        {
-            Invariants.checkState(txnId != null);
-            int flags = in.readInt();
-            Invariants.checkState(flags != 0);
-            nextCalled = true;
-            count++;
-
-            int iterable = toIterableSetFields(flags);
-            while (iterable != 0)
-            {
-                Fields field = nextSetField(iterable);
-                if (getFieldChanged(field, this.flags) || 
getFieldIsNull(field, mask))
-                {
-                    if (!getFieldIsNull(field, flags))
-                        skip(field, in, userVersion);
-
-                    iterable = unsetIterableFields(field, iterable);
-                    continue;
-                }
-                this.flags = setFieldChanged(field, this.flags);
-
-                if (getFieldIsNull(field, flags))
-                {
-                    this.flags = setFieldIsNull(field, this.flags);
-                }
-                else
-                {
-                    deserialize(field, in, userVersion);
-                }
-
-                iterable = unsetIterableFields(field, iterable);
-            }
-        }
-
-        private void deserialize(Fields field, DataInputPlus in, int 
userVersion) throws IOException
-        {
-            switch (field)
-            {
-                case EXECUTE_AT:
-                    executeAt = CommandSerializers.timestamp.deserialize(in, 
userVersion);
-                    break;
-                case EXECUTES_AT_LEAST:
-                    executeAtLeast = 
CommandSerializers.timestamp.deserialize(in, userVersion);
-                    break;
-                case SAVE_STATUS:
-                    saveStatus = SaveStatus.values()[in.readShort()];
-                    break;
-                case DURABILITY:
-                    durability = Status.Durability.values()[in.readByte()];
-                    break;
-                case ACCEPTED:
-                    acceptedOrCommitted = 
CommandSerializers.ballot.deserialize(in, userVersion);
-                    break;
-                case PROMISED:
-                    promised = CommandSerializers.ballot.deserialize(in, 
userVersion);
-                    break;
-                case PARTICIPANTS:
-                    participants = 
CommandSerializers.participants.deserialize(in, userVersion);
-                    break;
-                case PARTIAL_TXN:
-                    partialTxn = CommandSerializers.partialTxn.deserialize(in, 
userVersion);
-                    break;
-                case PARTIAL_DEPS:
-                    partialDeps = DepsSerializers.partialDeps.deserialize(in, 
userVersion);
-                    break;
-                case WAITING_ON:
-                    int size = in.readInt();
-                    waitingOnBytes = new byte[size];
-                    in.readFully(waitingOnBytes);
-                    ByteBuffer buffer = ByteBuffer.wrap(waitingOnBytes);
-                    waitingOn = (localTxnId, deps) -> {
-                        try
-                        {
-                            Invariants.nonNull(deps);
-                            return WaitingOnSerializer.deserialize(localTxnId, 
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
-                        }
-                        catch (IOException e)
-                        {
-                            throw Throwables.unchecked(e);
-                        }
-                    };
-                    break;
-                case WRITES:
-                    writes = CommandSerializers.writes.deserialize(in, 
userVersion);
-                    break;
-                case CLEANUP:
-                    Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
-                    if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
-                        cleanup = newCleanup;
-                    break;
-                case RESULT:
-                    result = ResultSerializers.result.deserialize(in, 
userVersion);
-                    break;
-            }
-        }
-
-        private void skip(Fields field, DataInputPlus in, int userVersion) 
throws IOException
-        {
-            switch (field)
-            {
-                case EXECUTE_AT:
-                case EXECUTES_AT_LEAST:
-                    CommandSerializers.timestamp.skip(in, userVersion);
-                    break;
-                case SAVE_STATUS:
-                    in.readShort();
-                    break;
-                case DURABILITY:
-                    in.readByte();
-                    break;
-                case ACCEPTED:
-                case PROMISED:
-                    CommandSerializers.ballot.skip(in, userVersion);
-                    break;
-                case PARTICIPANTS:
-                    // TODO (expected): skip
-                    CommandSerializers.participants.deserialize(in, 
userVersion);
-                    break;
-                case PARTIAL_TXN:
-                    CommandSerializers.partialTxn.deserialize(in, userVersion);
-                    break;
-                case PARTIAL_DEPS:
-                    // TODO (expected): skip
-                    DepsSerializers.partialDeps.deserialize(in, userVersion);
-                    break;
-                case WAITING_ON:
-                    int size = in.readInt();
-                    in.skipBytesFully(size);
-                    break;
-                case WRITES:
-                    // TODO (expected): skip
-                    CommandSerializers.writes.deserialize(in, userVersion);
-                    break;
-                case CLEANUP:
-                    in.readByte();
-                    break;
-                case RESULT:
-                    // TODO (expected): skip
-                    result = ResultSerializers.result.deserialize(in, 
userVersion);
-                    break;
-            }
-        }
-
-        public void forceResult(Result newValue)
-        {
-            this.result = newValue;
-        }
-
-        public Command construct()
-        {
-            if (!nextCalled)
-                return null;
-
-            Invariants.checkState(txnId != null);
-            CommonAttributes.Mutable attrs = new 
CommonAttributes.Mutable(txnId);
-            if (partialTxn != null)
-                attrs.partialTxn(partialTxn);
-            if (durability != null)
-                attrs.durability(durability);
-            if (participants != null)
-                attrs.setParticipants(participants);
-            else
-                attrs.setParticipants(StoreParticipants.empty(txnId));
-            if (partialDeps != null &&
-                (saveStatus.known.deps != NoDeps &&
-                 saveStatus.known.deps != DepsErased &&
-                 saveStatus.known.deps != DepsUnknown))
-                attrs.partialDeps(partialDeps);
-
-            switch (saveStatus.known.outcome)
-            {
-                case Erased:
-                case WasApply:
-                    writes = null;
-                    result = null;
-                    break;
-            }
-
-            Command.WaitingOn waitingOn = null;
-            if (this.waitingOn != null)
-                waitingOn = this.waitingOn.provide(txnId, partialDeps);
-
-            switch (saveStatus.status)
-            {
-                case NotDefined:
-                    return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
-                                                                  : 
Command.NotDefined.notDefined(attrs, promised);
-                case PreAccepted:
-                    return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
-                case AcceptedInvalidate:
-                case Accepted:
-                case PreCommitted:
-                    if (saveStatus == SaveStatus.AcceptedInvalidate)
-                        return 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
-                    else
-                        return Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
-                case Committed:
-                case Stable:
-                    return Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
-                case PreApplied:
-                case Applied:
-                    return Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
-                case Truncated:
-                case Invalidated:
-                    return truncated(attrs, saveStatus, executeAt, 
executeAtLeast, writes, result);
-                default:
-                    throw new IllegalStateException();
-            }
-        }
-
-        private static Command.Truncated truncated(CommonAttributes.Mutable 
attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, 
Writes writes, Result result)
-        {
-            switch (status)
-            {
-                default:
-                    throw illegalState("Unhandled SaveStatus: " + status);
-                case TruncatedApplyWithOutcome:
-                case TruncatedApplyWithDeps:
-                case TruncatedApply:
-                    if (status != TruncatedApplyWithOutcome)
-                        result = null;
-                    if (attrs.txnId().kind().awaitsOnlyDeps())
-                        return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, executesAtLeast);
-                    return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, null);
-                case ErasedOrVestigial:
-                    return Command.Truncated.erasedOrVestigial(attrs.txnId(), 
attrs.participants());
-                case Erased:
-                    return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.participants());
-                case Invalidated:
-                    return Command.Truncated.invalidated(attrs.txnId());
-            }
-        }
-
-        public String toString()
-        {
-            return "Diff {" +
-                   "txnId=" + txnId +
-                   ", executeAt=" + executeAt +
-                   ", saveStatus=" + saveStatus +
-                   ", durability=" + durability +
-                   ", acceptedOrCommitted=" + acceptedOrCommitted +
-                   ", promised=" + promised +
-                   ", participants=" + participants +
-                   ", partialTxn=" + partialTxn +
-                   ", partialDeps=" + partialDeps +
-                   ", waitingOn=" + waitingOn +
-                   ", writes=" + writes +
-                   '}';
-        }
-    }
-
-    public interface WaitingOnProvider
-    {
-        Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
-    }
-}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 6ad2f09d2a..2a99a350b5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -186,9 +186,6 @@ public class AccordLoadTest extends AccordTestBase
                     System.out.println("compacting accord...");
                     cluster.forEach(i -> {
                         i.nodetool("compact", "system_accord.journal");
-                        i.runOnInstance(() -> {
-                            ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
-                        });
                     });
                 }
 
@@ -198,7 +195,6 @@ public class AccordLoadTest extends AccordTestBase
                     System.out.println("flushing journal...");
                     cluster.forEach(i -> i.runOnInstance(() -> {
                         ((AccordService) 
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
-                        ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
                     }));
                 }
 
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index 25f9c61d38..9e41e0add9 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -136,7 +136,7 @@ public class AccordJournalBurnTest extends BurnTestBase
                  operations,
                  10 + random.nextInt(30),
                  new RandomDelayQueue.Factory(random).get(),
-                 (node) -> {
+                 (node, agent) -> {
                      try
                      {
                          File directory = new 
File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet())));
@@ -175,6 +175,4 @@ public class AccordJournalBurnTest extends BurnTestBase
             throw SimulationException.wrap(seed, t);
         }
     }
-
-
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 34bb215c0a..3c655133fa 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -89,7 +89,7 @@ public class AccordJournalOrderTest
         Runnable check = () -> {
             for (JournalKey key : res.keySet())
             {
-                SavedCommand.Builder diffs = 
accordJournal.loadDiffs(key.commandStoreId, key.id);
+                AccordJournal.Builder diffs = 
accordJournal.load(key.commandStoreId, key.id);
                 Assert.assertEquals(String.format("%d != %d for key %s", 
diffs.count(), res.get(key).intValue(), key),
                                     diffs.count(), res.get(key).intValue());
             }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
similarity index 87%
rename from test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
rename to test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index 1760286ba3..2878c5750a 100644
--- a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -26,7 +26,9 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import accord.impl.CommandChange;
 import accord.local.Command;
+import accord.local.RedundantBefore;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
 import accord.utils.Gen;
@@ -39,17 +41,17 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.SavedCommand.Fields;
-import org.apache.cassandra.service.accord.SavedCommand.Load;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.utils.AccordGenerators;
 import org.assertj.core.api.SoftAssertions;
 
+import static accord.api.Journal.*;
+import static accord.impl.CommandChange.*;
+import static accord.impl.CommandChange.getFlags;
 import static accord.utils.Property.qt;
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
-import static org.apache.cassandra.service.accord.SavedCommand.getFlags;
 
-public class SavedCommandTest
+public class CommandChangeTest
 {
     private static final EnumSet<Fields> ALL = EnumSet.allOf(Fields.class);
 
@@ -97,13 +99,14 @@ public class SavedCommandTest
                     if (saveStatus == SaveStatus.TruncatedApplyWithDeps) 
continue;
                     out.clear();
                     Command orig = cmdBuilder.build(saveStatus);
-                    SavedCommand.serialize(orig, getFlags(null, orig), out, 
userVersion);
-                    SavedCommand.Builder builder = new 
SavedCommand.Builder(orig.txnId(), Load.ALL);
+
+                    AccordJournal.Writer.make(null, orig).write(out, 
userVersion);
+                    AccordJournal.Builder builder = new 
AccordJournal.Builder(orig.txnId(), Load.ALL);
                     builder.deserializeNext(new 
DataInputBuffer(out.unsafeGetBufferAndFlip(), false), userVersion);
                     // We are not persisting the result, so force it for 
strict equality
                     builder.forceResult(orig.result());
 
-                    Command reconstructed = builder.construct();
+                    Command reconstructed = 
builder.construct(RedundantBefore.EMPTY);
 
                     checks.assertThat(reconstructed)
                           .describedAs("lhs=expected\nrhs=actual\n%s", new 
LazyToString(() -> ReflectionUtils.recursiveEquals(orig, 
reconstructed).toString()))
@@ -119,10 +122,10 @@ public class SavedCommandTest
         SoftAssertions checks = new SoftAssertions();
         for (Fields field : missing)
         {
-            checks.assertThat(SavedCommand.getFieldChanged(field, flags))
+            checks.assertThat(CommandChange.getFieldChanged(field, flags))
                   .describedAs("field %s changed", field).
                   isTrue();
-            checks.assertThat(SavedCommand.getFieldIsNull(field, flags))
+            checks.assertThat(CommandChange.getFieldIsNull(field, flags))
                   .describedAs("field %s not null", field)
                   .isFalse();
         }
@@ -135,11 +138,11 @@ public class SavedCommandTest
         for (Fields field : missing)
         {
             if (field == Fields.CLEANUP) continue;
-            checks.assertThat(SavedCommand.getFieldChanged(field, flags))
+            checks.assertThat(CommandChange.getFieldChanged(field, flags))
                   .describedAs("field %s changed", field)
                   .isFalse();
             // Is null flag can not be set on a field that has not changed
-            checks.assertThat(SavedCommand.getFieldIsNull(field, flags))
+            checks.assertThat(CommandChange.getFieldIsNull(field, flags))
                   .describedAs("field %s not null", field)
                   .isFalse();
         }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index c376c46fbf..65b315a7e2 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RemoteListeners;
@@ -39,6 +40,7 @@ import accord.impl.DefaultLocalListeners;
 import accord.impl.DefaultTimeouts;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TestAgent;
+import accord.impl.basic.InMemoryJournal;
 import accord.impl.basic.SimulatedFault;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -68,7 +70,6 @@ import accord.primitives.Unseekables;
 import accord.topology.Topologies;
 import accord.topology.Topology;
 import accord.utils.Gens;
-import accord.utils.PersistentField;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
@@ -105,7 +106,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
     public final Node.Id nodeId;
     public final Topology topology;
     public final Topologies topologies;
-    public final IJournal journal;
+    public final Journal journal;
     public final ScheduledExecutorPlus unorderedScheduled;
     public final List<String> evictions = new ArrayList<>();
     public Predicate<Throwable> ignoreExceptions = ignore -> false;
@@ -185,7 +186,6 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
             }
         };
 
-        this.journal = new InMemoryJournal(nodeId);
         TestAgent.RethrowAgent agent = new TestAgent.RethrowAgent()
         {
             @Override
@@ -201,6 +201,8 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
                 super.onUncaughtException(t);
             }
         };
+
+        this.journal = new InMemoryJournal(nodeId, agent);
         this.commandStore = new AccordCommandStore(0,
                                                    storeService,
                                                    agent,
@@ -246,19 +248,6 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
         });
     }
 
-    private final class InMemoryJournal extends 
accord.impl.basic.InMemoryJournal implements IJournal
-    {
-        public InMemoryJournal(Node.Id id)
-        {
-            super(id);
-        }
-
-        public PersistentField.Persister<DurableBefore, DurableBefore> 
durableBeforePersister()
-        {
-            throw new IllegalArgumentException("Not implemented");
-        }
-    }
-
     private <K, V> void updateLoadFunction(AccordCache.Type<K, V, ?> i, 
FunctionWrapper wrapper)
     {
         i.unsafeSetLoadFunction(wrapper.wrap(i.unsafeGetLoadFunction()));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to