dcapwell commented on code in PR #3494:
URL: https://github.com/apache/cassandra/pull/3494#discussion_r1731820569


##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -362,6 +363,31 @@ public List<V> readAll(K id)
         return res;
     }
 
+    public void readAll(K id, Reader reader)

Review Comment:
   this is copy/paste of `org.apache.cassandra.journal.Journal#readAll just 
changing this one line
   
   `reader.read(in, segment.descriptor.userVersion);` rather than 
`res.add(valueSerializer.deserialize(holder.key, in, 
segment.descriptor.userVersion));`
   
   maybe we should refactor so we only do this once and just pass a lambda 
around?



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -510,6 +536,25 @@ public RecordPointer asyncWrite(K id, V record, 
Set<Integer> hosts)
         return recordPointer;
     }
 
+    public RecordPointer asyncWrite(K id, Writer writer, Set<Integer> hosts)

Review Comment:
   this looks like copy/paste of 
`org.apache.cassandra.journal.Journal#asyncWrite`. should we refactor?



##########
test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java:
##########
@@ -380,10 +380,10 @@ Consumer<List<Partition>> expectAccordCommandsNoChange()
         };
     }
 
-
     private static RedundantBefore redundantBefore(TxnId txnId)
     {
         Ranges ranges = AccordTestUtils.fullRange(AccordTestUtils.keys(table, 
42));
+        txnId = txnId.as(Kind.Read, Routable.Domain.Range);

Review Comment:
   why is this change needed?



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -362,6 +363,31 @@ public List<V> readAll(K id)
         return res;
     }
 
+    public void readAll(K id, Reader reader)
+    {
+        EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
+        try (ReferencedSegments<K, V> segments = selectAndReference(id))
+        {
+            for (Segment<K, V> segment : segments.all())
+            {
+                segment.readAll(id, holder, () -> {
+                    try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
+                    {
+                        reader.read(in, segment.descriptor.userVersion);
+                        Invariants.checkState(Objects.equals(holder.key, id),

Review Comment:
   shouldn't you do this before the `read`?  



##########
src/java/org/apache/cassandra/service/accord/SavedCommand.java:
##########
@@ -73,540 +71,427 @@ private enum HasFields
         LISTENERS
     }
 
-    public final TxnId txnId;
-
-    public final Timestamp executeAt;
-    public final SaveStatus saveStatus;
-    public final Status.Durability durability;
-
-    public final Ballot acceptedOrCommitted;
-    public final Ballot promised;
-
-    public final Route<?> route;
-    public final PartialTxn partialTxn;
-    public final PartialDeps partialDeps;
-    public final Seekables<?, ?> additionalKeysOrRanges;
-
-    public final Writes writes;
-    public final Listeners.Immutable<Command.DurableAndIdempotentListener> 
listeners;
+    public interface Writer<K> extends Journal.Writer
+    {
+        void write(DataOutputPlus out, int userVersion) throws IOException;
+        K key();
+    }
 
-    public SavedCommand(TxnId txnId,
-                        Timestamp executeAt,
-                        SaveStatus saveStatus,
-                        Status.Durability durability,
+    public static class DiffWriter implements Writer<TxnId>
+    {
+        private final Command before;
+        private final Command after;
+        private final TxnId txnId;
 
-                        Ballot acceptedOrCommitted,
-                        Ballot promised,
+        public DiffWriter(Command before, Command after)
+        {
+            this(after.txnId(), before, after);
+        }
 
-                        Route<?> route,
-                        PartialTxn partialTxn,
-                        PartialDeps partialDeps,
-                        Seekables<?, ?> additionalKeysOrRanges,
+        public DiffWriter(TxnId txnId, Command before, Command after)
+        {
+            this.txnId = txnId;
+            this.before = before;
+            this.after = after;
+        }
 
-                        Writes writes,
-                        
Listeners.Immutable<Command.DurableAndIdempotentListener> listeners)
-    {
-        this.txnId = txnId;
-        this.executeAt = executeAt;
-        this.saveStatus = saveStatus;
-        this.durability = durability;
+        @VisibleForTesting
+        public Command before()
+        {
+            return before;
+        }
 
-        this.acceptedOrCommitted = acceptedOrCommitted;
-        this.promised = promised;
+        @VisibleForTesting
+        public Command after()
+        {
+            return after;
+        }
 
-        this.route = route;
-        this.partialTxn = partialTxn;
-        this.partialDeps = partialDeps;
-        this.additionalKeysOrRanges = additionalKeysOrRanges;
+        public void write(DataOutputPlus out, int userVersion) throws 
IOException
+        {
+            serialize(before, after, out, userVersion);
+        }
 
-        this.writes = writes;
-        this.listeners = listeners;
+        public TxnId key()
+        {
+            return txnId;
+        }
     }
 
-    public static SavedDiff diff(Command before, Command after)
-    {
-        if (before == after)
-            return null;
 
-        // TODO: we do not need to save `waitingOn` _every_ time.
-        Command.WaitingOn waitingOn = getWaitingOn(after);
-        return new SavedDiff(after.txnId(),
-                             ifNotEqual(before, after, Command::executeAt, 
true),
-                             ifNotEqual(before, after, Command::saveStatus, 
false),
-                             ifNotEqual(before, after, Command::durability, 
false),
-
-                             ifNotEqual(before, after, 
Command::acceptedOrCommitted, false),
-                             ifNotEqual(before, after, Command::promised, 
false),
-
-                             ifNotEqual(before, after, Command::route, true),
-                             ifNotEqual(before, after, Command::partialTxn, 
false),
-                             ifNotEqual(before, after, Command::partialDeps, 
false),
-                             ifNotEqual(before, after, 
Command::additionalKeysOrRanges, false),
-
-                             waitingOn,
-                             ifNotEqual(before, after, Command::writes, false),
-                             ifNotEqual(before, after, 
Command::durableListeners, true));
-    }
-
-    static Command reconstructFromDiff(List<LoadedDiff> diffs)
+    public static Writer<TxnId> diffWriter(Command before, Command after)
     {
-        return reconstructFromDiff(diffs, CommandSerializers.APPLIED);
+        return new DiffWriter(before, after);
     }
 
-    /**
-     * @param result is exposed because we are _not_ persisting result, since 
during loading or replay
-     *               we do not expect we will have to send a result to the 
client, and data results
-     *               can potentially contain a large number of entries, so 
it's best if they are not
-     *               written into the log.
-     */
-    @VisibleForTesting
-    static Command reconstructFromDiff(List<LoadedDiff> diffs, Result result)
+    public static void serialize(Command before, Command after, DataOutputPlus 
out, int userVersion) throws IOException
     {
-        TxnId txnId = null;
-
-        Timestamp executeAt = null;
-        SaveStatus saveStatus = null;
-        Status.Durability durability = null;
+        int flags = 0;
 
-        Ballot acceptedOrCommitted = Ballot.ZERO;
-        Ballot promised = null;
+        flags = collectFlags(before, after, Command::txnId, true, 
Fields.TXN_ID, flags);
+        flags = collectFlags(before, after, Command::executeAt, true, 
Fields.EXECUTE_AT, flags);
+        flags = collectFlags(before, after, Command::saveStatus, false, 
Fields.SAVE_STATUS, flags);
+        flags = collectFlags(before, after, Command::durability, false, 
Fields.DURABILITY, flags);
 
-        Route<?> route = null;
-        PartialTxn partialTxn = null;
-        PartialDeps partialDeps = null;
-        Seekables<?, ?> additionalKeysOrRanges = null;
+        flags = collectFlags(before, after, Command::acceptedOrCommitted, 
false, Fields.ACCEPTED, flags);
+        flags = collectFlags(before, after, Command::promised, false, 
Fields.PROMISED, flags);
 
-        WaitingOnProvider waitingOnProvider = null;
-        Writes writes = null;
-        Listeners.Immutable listeners = null;
+        flags = collectFlags(before, after, Command::route, true, 
Fields.ROUTE, flags);
+        flags = collectFlags(before, after, Command::partialTxn, false, 
Fields.PARTIAL_TXN, flags);
+        flags = collectFlags(before, after, Command::partialDeps, false, 
Fields.PARTIAL_DEPS, flags);
+        flags = collectFlags(before, after, Command::additionalKeysOrRanges, 
false, Fields.ADDITIONAL_KEYS, flags);
 
-        for (LoadedDiff diff : diffs)
+        Command.WaitingOn waitingOn = getWaitingOn(after);
+        if (waitingOn == null)
+            flags = setFieldIsNull(Fields.WAITING_ON, flags);
+        else
+            flags = setFieldChanged(Fields.WAITING_ON, flags);
+
+        flags = collectFlags(before, after, Command::writes, false, 
Fields.WRITES, flags);
+        flags = collectFlags(before, after, Command::durableListeners, true, 
Fields.LISTENERS, flags);
+
+        out.writeInt(flags);
+
+        // We encode all changed fields unless their value is null
+        if (getFieldChanged(Fields.TXN_ID, flags) && after.txnId() != null)
+            CommandSerializers.txnId.serialize(after.txnId(), out, 
userVersion);
+        if (getFieldChanged(Fields.EXECUTE_AT, flags) && after.executeAt() != 
null)
+            CommandSerializers.timestamp.serialize(after.executeAt(), out, 
userVersion);
+        if (getFieldChanged(Fields.SAVE_STATUS, flags))
+            out.writeInt(after.saveStatus().ordinal());
+        if (getFieldChanged(Fields.DURABILITY, flags) && after.durability() != 
null)
+            out.writeInt(after.durability().ordinal());
+
+        if (getFieldChanged(Fields.ACCEPTED, flags) && 
after.acceptedOrCommitted() != null)
+            CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), 
out, userVersion);
+        if (getFieldChanged(Fields.PROMISED, flags) && after.promised() != 
null)
+            CommandSerializers.ballot.serialize(after.promised(), out, 
userVersion);
+
+        if (getFieldChanged(Fields.ROUTE, flags) && after.route() != null)
+            
AccordKeyspace.LocalVersionedSerializers.route.serialize(after.route(), out); 
// TODO (required): user version
+        if (getFieldChanged(Fields.PARTIAL_TXN, flags) && after.partialTxn() 
!= null)
+            CommandSerializers.partialTxn.serialize(after.partialTxn(), out, 
userVersion);
+        if (getFieldChanged(Fields.PARTIAL_DEPS, flags) && after.partialDeps() 
!= null)
+            DepsSerializer.partialDeps.serialize(after.partialDeps(), out, 
userVersion);
+        if (getFieldChanged(Fields.ADDITIONAL_KEYS, flags) && 
after.additionalKeysOrRanges() != null)
+            KeySerializers.seekables.serialize(after.additionalKeysOrRanges(), 
out, userVersion);
+
+        if (getFieldChanged(Fields.WAITING_ON, flags) && waitingOn != null)
         {
-            if (diff.txnId != null)
-                txnId = diff.txnId;
-            if (diff.executeAt != null)
-                executeAt = diff.executeAt;
-            if (diff.saveStatus != null)
-                saveStatus = diff.saveStatus;
-            if (diff.durability != null)
-                durability = diff.durability;
-
-            if (diff.acceptedOrCommitted != null)
-                acceptedOrCommitted = diff.acceptedOrCommitted;
-            if (diff.promised != null)
-                promised = diff.promised;
-
-            if (diff.route != null)
-                route = diff.route;
-            if (diff.partialTxn != null)
-                partialTxn = diff.partialTxn;
-            if (diff.partialDeps != null)
-                partialDeps = diff.partialDeps;
-            if (diff.additionalKeysOrRanges != null)
-                additionalKeysOrRanges = diff.additionalKeysOrRanges;
-
-            if (diff.waitingOn != null)
-                waitingOnProvider = diff.waitingOn;
-            if (diff.writes != null)
-                writes = diff.writes;
-            if (diff.listeners != null)
-                listeners = diff.listeners;
+            long size = WaitingOnSerializer.serializedSize(waitingOn);
+            ByteBuffer serialized = 
WaitingOnSerializer.serialize(after.txnId(), waitingOn);
+            out.writeInt((int) size);
+            out.write(serialized);
         }
 
-        CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
-        if (partialTxn != null)
-            attrs.partialTxn(partialTxn);
-        if (durability != null)
-            attrs.durability(durability);
-        if (route != null)
-            attrs.route(route);
-        if (partialDeps != null &&
-            (saveStatus.known.deps != Status.KnownDeps.NoDeps &&
-             saveStatus.known.deps != Status.KnownDeps.DepsErased &&
-             saveStatus.known.deps != Status.KnownDeps.DepsUnknown))
-            attrs.partialDeps(partialDeps);
-        if (additionalKeysOrRanges != null)
-            attrs.additionalKeysOrRanges(additionalKeysOrRanges);
-        if (listeners != null && !listeners.isEmpty())
-            attrs.setListeners(listeners);
-
-        Command.WaitingOn waitingOn = null;
-        if (waitingOnProvider != null)
-            waitingOn = waitingOnProvider.provide(txnId, partialDeps);
-
-        Invariants.checkState(saveStatus != null,
-                              "Save status is null after applying %s", diffs);
-        switch (saveStatus.status)
+        if (getFieldChanged(Fields.WRITES, flags) && after.writes() != null)
+            CommandSerializers.writes.serialize(after.writes(), out, 
userVersion);
+
+        if (getFieldChanged(Fields.LISTENERS, flags) && 
after.durableListeners() != null)
         {
-            case NotDefined:
-                return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
-                                                              : 
Command.NotDefined.notDefined(attrs, promised);
-            case PreAccepted:
-                return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
-            case AcceptedInvalidate:
-            case Accepted:
-            case PreCommitted:
-                return Command.Accepted.accepted(attrs, saveStatus, executeAt, 
promised, acceptedOrCommitted);
-            case Committed:
-            case Stable:
-                return Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
-            case PreApplied:
-            case Applied:
-                return Command.Executed.executed(attrs, saveStatus, executeAt, 
promised, acceptedOrCommitted, waitingOn, writes, result);
-            case Truncated:
-            case Invalidated:
-            default:
-                throw new IllegalStateException();
+            out.writeByte(after.durableListeners().size());
+            for (Command.DurableAndIdempotentListener listener : 
after.durableListeners())
+                
AccordKeyspace.LocalVersionedSerializers.listeners.serialize(listener, out);
         }
     }
 
-    // TODO (required): this convert function was added only because 
AsyncOperationTest was failing without it; maybe after switching to loading 
from the log we can just pass l and r directly or remove != null checks.
-    private static <OBJ, VAL> VAL ifNotEqual(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch)
+    static Command.WaitingOn getWaitingOn(Command command)
+    {
+        if (command instanceof Command.Committed)
+            return command.asCommitted().waitingOn();
+
+        return null;
+    }
+
+    private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch, Fields field, int oldFlags)
     {
         VAL l = null;
         VAL r = null;
         if (lo != null) l = convert.apply(lo);
         if (ro != null) r = convert.apply(ro);
 
+        if (r == null)
+            oldFlags = setFieldIsNull(field, oldFlags);
+
         if (l == r)
-            return null;
+            return oldFlags; // no change
+
         if (l == null || r == null)
-            return r;
+            return setFieldChanged(field, oldFlags);
+
         assert allowClassMismatch || l.getClass() == r.getClass() : 
String.format("%s != %s", l.getClass(), r.getClass());
 
         if (l.equals(r))
-            return null;
+            return oldFlags; // no change
 
-        return r;
+        return setFieldChanged(field, oldFlags);
     }
 
-    static Command.WaitingOn getWaitingOn(Command command)
+    private static int setFieldChanged(Fields field, int oldFlags)
     {
-        if (command instanceof Command.Committed)
-            return command.asCommitted().waitingOn();
-
-        return null;
+        return oldFlags | (1 << (field.ordinal() + Short.SIZE));
     }
 
-    public static class SavedDiff extends SavedCommand
+    private static boolean getFieldChanged(Fields field, int oldFlags)
     {
-        public final Command.WaitingOn waitingOn;
+        return (oldFlags & (1 << (field.ordinal() + Short.SIZE))) != 0;
+    }
 
-        public SavedDiff(TxnId txnId,
-                         Timestamp executeAt,
-                         SaveStatus saveStatus,
-                         Status.Durability durability,
+    private static boolean getFieldIsNull(Fields field, int oldFlags)
+    {
+        return (oldFlags & (1 << field.ordinal())) != 0;
+    }
 
-                         Ballot acceptedOrCommitted,
-                         Ballot promised,
+    private static int setFieldIsNull(Fields field, int oldFlags)
+    {
+        return oldFlags | (1 << field.ordinal());
+    }
 
-                         Route<?> route,
-                         PartialTxn partialTxn,
-                         PartialDeps partialDeps,
-                         Seekables<?, ?> additionalKeysOrRanges,
 
-                         Command.WaitingOn waitingOn,
-                         Writes writes,
-                         
Listeners.Immutable<Command.DurableAndIdempotentListener> listeners)
-        {
-            super(txnId, executeAt, saveStatus, durability, 
acceptedOrCommitted, promised, route, partialTxn, partialDeps, 
additionalKeysOrRanges, writes, listeners);
-            this.waitingOn = waitingOn;
-        }
+    public static class Builder
+    {
+        TxnId txnId = null;
 
-        @Override
-        public String toString()
-        {
-            return "SavedDiff{" +
-                   " txnId=" + txnId +
-                   ", executeAt=" + executeAt +
-                   ", saveStatus=" + saveStatus +
-                   ", durability=" + durability +
-                   ", acceptedOrCommitted=" + acceptedOrCommitted +
-                   ", promised=" + promised +
-                   ", route=" + route +
-                   ", partialTxn=" + partialTxn +
-                   ", partialDeps=" + partialDeps +
-                   ", writes=" + writes +
-                   ", waitingOn=" + waitingOn +
-                   '}';
-        }
-    }
+        Timestamp executeAt = null;
+        SaveStatus saveStatus = null;
+        Status.Durability durability = null;
 
-    public static class LoadedDiff extends SavedCommand
-    {
-        public final WaitingOnProvider waitingOn;
+        Ballot acceptedOrCommitted = Ballot.ZERO;
+        Ballot promised = null;
 
-        public LoadedDiff(TxnId txnId,
-                          Timestamp executeAt,
-                          SaveStatus saveStatus,
-                          Status.Durability durability,
+        Route<?> route = null;
+        PartialTxn partialTxn = null;
+        PartialDeps partialDeps = null;
+        Seekables<?, ?> additionalKeysOrRanges = null;
 
-                          Ballot acceptedOrCommitted,
-                          Ballot promised,
+        SavedCommand.WaitingOnProvider waitingOn = (txn, deps) -> null;
+        Writes writes = null;
+        Listeners.Immutable<?> listeners = null;
+        Result result = CommandSerializers.APPLIED;
 
-                          Route<?> route,
-                          PartialTxn partialTxn,
-                          PartialDeps partialDeps,
-                          Seekables<?, ?> additionalKeysOrRanges,
+        boolean nextCalled = false;
+        int count = 0;
 
-                          WaitingOnProvider waitingOn,
-                          Writes writes,
-                          
Listeners.Immutable<Command.DurableAndIdempotentListener> listeners)
+        public int count()
         {
-            super(txnId, executeAt, saveStatus, durability, 
acceptedOrCommitted, promised, route, partialTxn, partialDeps, 
additionalKeysOrRanges, writes, listeners);
-            this.waitingOn = waitingOn;
+            return count;
         }
 
-        public String toString()
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public void deserializeNext(DataInputPlus in, int userVersion) throws 
IOException
         {
-            return "LoadedDiff{" +
-                   "waitingOn=" + waitingOn +
-                   '}';
-        }
-    }
-    
-    final static class SavedCommandSerializer implements 
ValueSerializer<JournalKey, Object>
-    {
-        @Override
-        public int serializedSize(JournalKey key, Object value, int 
userVersion)
-        {
-            SavedDiff diff = (SavedDiff) value;
-            long size = 0;
-            size += SHORT_SIZE; // flags
-
-            if (diff.txnId != null)
-                size += CommandSerializers.txnId.serializedSize(diff.txnId, 
userVersion);
-            if (diff.executeAt != null)
-                size += 
CommandSerializers.timestamp.serializedSize(diff.executeAt, userVersion);
-            if (diff.saveStatus != null)
-                size += Integer.BYTES;
-            if (diff.durability != null)
-                size += Integer.BYTES;
-
-            if (diff.acceptedOrCommitted != null)
-                size += 
CommandSerializers.ballot.serializedSize(diff.acceptedOrCommitted, userVersion);
-            if (diff.promised != null)
-                size += 
CommandSerializers.ballot.serializedSize(diff.promised, userVersion);
-
-            if (diff.route != null)
-                size += 
AccordKeyspace.LocalVersionedSerializers.route.serializedSize(diff.route);
-            if (diff.partialTxn != null)
-                CommandSerializers.partialTxn.serializedSize(diff.partialTxn, 
userVersion);
-            if (diff.partialDeps != null)
-                DepsSerializer.partialDeps.serializedSize(diff.partialDeps, 
userVersion);
-            if (diff.additionalKeysOrRanges != null)
-                
KeySerializers.seekables.serializedSize(diff.additionalKeysOrRanges, 
userVersion);
-
-            if (diff.waitingOn != null)
+            nextCalled = true;
+            count++;
+
+            final int flags = in.readInt();
+
+            if (getFieldChanged(Fields.TXN_ID, flags))
             {
-                size += Integer.BYTES;
-                size += WaitingOnSerializer.serializedSize(diff.waitingOn);
+                if (getFieldIsNull(Fields.TXN_ID, flags))
+                    txnId = null;
+                else
+                    txnId = CommandSerializers.txnId.deserialize(in, 
userVersion);
             }
 
-            if (diff.writes != null)
-                CommandSerializers.writes.serializedSize(diff.writes, 
userVersion);
-
-            if (diff.listeners != null && !diff.listeners.isEmpty())
+            if (getFieldChanged(Fields.EXECUTE_AT, flags))
             {
-                size += Byte.BYTES;
-                for (Command.DurableAndIdempotentListener listener : 
diff.listeners)
-                    size += 
AccordKeyspace.LocalVersionedSerializers.listeners.serializedSize(listener);
+                if (getFieldIsNull(Fields.EXECUTE_AT, flags))
+                    executeAt = null;
+                else
+                    executeAt = CommandSerializers.timestamp.deserialize(in, 
userVersion);
             }
-            return (int) size;
-        }
 
-        @Override
-        public void serialize(JournalKey key, Object value, DataOutputPlus 
out, int userVersion) throws IOException
-        {
-            SavedDiff diff = (SavedDiff) value;
-            int flags = getFlags(diff);
-
-            out.writeShort(flags);
-
-            if (diff.txnId != null)
-                CommandSerializers.txnId.serialize(diff.txnId, out, 
userVersion);
-            if (diff.executeAt != null)
-                CommandSerializers.timestamp.serialize(diff.executeAt, out, 
userVersion);
-            if (diff.saveStatus != null)
-                out.writeInt(diff.saveStatus.ordinal());
-            if (diff.durability != null)
-                out.writeInt(diff.durability.ordinal());
-
-            if (diff.acceptedOrCommitted != null)
-                CommandSerializers.ballot.serialize(diff.acceptedOrCommitted, 
out, userVersion);
-            if (diff.promised != null)
-                CommandSerializers.ballot.serialize(diff.promised, out, 
userVersion);
-
-            if (diff.route != null)
-                
AccordKeyspace.LocalVersionedSerializers.route.serialize(diff.route, out); // 
TODO (required): user version
-            if (diff.partialTxn != null)
-                CommandSerializers.partialTxn.serialize(diff.partialTxn, out, 
userVersion);
-            if (diff.partialDeps != null)
-                DepsSerializer.partialDeps.serialize(diff.partialDeps, out, 
userVersion);
-            if (diff.additionalKeysOrRanges != null)
-                
KeySerializers.seekables.serialize(diff.additionalKeysOrRanges, out, 
userVersion);
-
-            if (diff.waitingOn != null)
+            if (getFieldChanged(Fields.SAVE_STATUS, flags))
             {
-                long size = WaitingOnSerializer.serializedSize(diff.waitingOn);
-                ByteBuffer serialized = 
WaitingOnSerializer.serialize(diff.txnId, diff.waitingOn);
-                out.writeInt((int) size);
-                out.write(serialized);
+                if (getFieldIsNull(Fields.SAVE_STATUS, flags))
+                    saveStatus = null;
+                else
+                    saveStatus = SaveStatus.values()[in.readInt()];
+            }
+            if (getFieldChanged(Fields.DURABILITY, flags))
+            {
+                if (getFieldIsNull(Fields.DURABILITY, flags))
+                    durability = null;
+                else
+                    durability = Status.Durability.values()[in.readInt()];
             }
 
-            if (diff.writes != null)
-                CommandSerializers.writes.serialize(diff.writes, out, 
userVersion);
+            if (getFieldChanged(Fields.ACCEPTED, flags))
+            {
+                if (getFieldIsNull(Fields.ACCEPTED, flags))
+                    acceptedOrCommitted = null;
+                else
+                    acceptedOrCommitted = 
CommandSerializers.ballot.deserialize(in, userVersion);
+            }
 
-            if (diff.listeners != null && !diff.listeners.isEmpty())
+            if (getFieldChanged(Fields.PROMISED, flags))
             {
-                out.writeByte(diff.listeners.size());
-                for (Command.DurableAndIdempotentListener listener : 
diff.listeners)
-                    
AccordKeyspace.LocalVersionedSerializers.listeners.serialize(listener, out);
+                if (getFieldIsNull(Fields.PROMISED, flags))
+                    promised = null;
+                else
+                    promised = CommandSerializers.ballot.deserialize(in, 
userVersion);
             }
 
-        }
+            if (getFieldChanged(Fields.ROUTE, flags))
+            {
+                if (getFieldIsNull(Fields.ROUTE, flags))
+                    route = null;
+                else
+                    route = 
AccordKeyspace.LocalVersionedSerializers.route.deserialize(in);
+            }
 
-        private static int getFlags(SavedDiff diff)
-        {
-            int flags = 0;
-
-            if (diff.txnId != null)
-                flags = setBit(flags, HasFields.TXN_ID.ordinal());
-            if (diff.executeAt != null)
-                flags = setBit(flags, HasFields.EXECUTE_AT.ordinal());
-            if (diff.saveStatus != null)
-                flags = setBit(flags, HasFields.SAVE_STATUS.ordinal());
-            if (diff.durability != null)
-                flags = setBit(flags, HasFields.DURABILITY.ordinal());
-
-            if (diff.acceptedOrCommitted != null)
-                flags = setBit(flags, HasFields.ACCEPTED.ordinal());
-            if (diff.promised != null)
-                flags = setBit(flags, HasFields.PROMISED.ordinal());
-
-            if (diff.route != null)
-                flags = setBit(flags, HasFields.ROUTE.ordinal());
-            if (diff.partialTxn != null)
-                flags = setBit(flags, HasFields.PARTIAL_TXN.ordinal());
-            if (diff.partialDeps != null)
-                flags = setBit(flags, HasFields.PARTIAL_DEPS.ordinal());
-            if (diff.additionalKeysOrRanges != null)
-                flags = setBit(flags, HasFields.ADDITIONAL_KEYS.ordinal());
-
-            if (diff.waitingOn != null)
-                flags = setBit(flags, HasFields.WAITING_ON.ordinal());
-            if (diff.writes != null)
-                flags = setBit(flags, HasFields.WRITES.ordinal());
-            if (diff.listeners != null && !diff.listeners.isEmpty())
-                flags = setBit(flags, HasFields.LISTENERS.ordinal());
-            return flags;
-        }
+            if (getFieldChanged(Fields.PARTIAL_TXN, flags))
+            {
+                if (getFieldIsNull(Fields.PARTIAL_TXN, flags))
+                    partialTxn = null;
+                else
+                    partialTxn = CommandSerializers.partialTxn.deserialize(in, 
userVersion);
+            }
 
-        @Override
-        public Object deserialize(JournalKey key, DataInputPlus in, int 
userVersion) throws IOException
-        {
-            int flags = in.readShort();
-
-            TxnId txnId = null;
-            Timestamp executedAt = null;
-            SaveStatus saveStatus = null;
-            Status.Durability durability = null;
-
-            Ballot acceptedOrCommitted = null;
-            Ballot promised = null;
-            Route<?> route = null;
-
-            PartialTxn partialTxn = null;
-            PartialDeps partialDeps = null;
-            Seekables<?, ?> additionalKeysOrRanges = null;
-
-            WaitingOnProvider waitingOn = (txn, deps) -> null;
-            Writes writes = null;
-            Listeners.Immutable listeners = null;
-
-            if (isSet(flags, HasFields.TXN_ID.ordinal()))
-                txnId = CommandSerializers.txnId.deserialize(in, userVersion);
-            if (isSet(flags, HasFields.EXECUTE_AT.ordinal()))
-                executedAt = CommandSerializers.timestamp.deserialize(in, 
userVersion);
-            if (isSet(flags, HasFields.SAVE_STATUS.ordinal()))
-                saveStatus = SaveStatus.values()[in.readInt()];
-            if (isSet(flags, HasFields.DURABILITY.ordinal()))
-                durability = Status.Durability.values()[in.readInt()];
-
-            if (isSet(flags, HasFields.ACCEPTED.ordinal()))
-                acceptedOrCommitted = 
CommandSerializers.ballot.deserialize(in, userVersion);
-            if (isSet(flags, HasFields.PROMISED.ordinal()))
-                promised = CommandSerializers.ballot.deserialize(in, 
userVersion);
-
-            if (isSet(flags, HasFields.ROUTE.ordinal()))
-                route = 
AccordKeyspace.LocalVersionedSerializers.route.deserialize(in);
-            if (isSet(flags, HasFields.PARTIAL_TXN.ordinal()))
-                partialTxn = CommandSerializers.partialTxn.deserialize(in, 
userVersion);
-            if (isSet(flags, HasFields.PARTIAL_DEPS.ordinal()))
-                partialDeps = DepsSerializer.partialDeps.deserialize(in, 
userVersion);
-            if (isSet(flags, HasFields.ADDITIONAL_KEYS.ordinal()))
-                additionalKeysOrRanges = 
KeySerializers.seekables.deserialize(in, userVersion);
-
-            if (isSet(flags, HasFields.WAITING_ON.ordinal()))
+            if (getFieldChanged(Fields.PARTIAL_DEPS, flags))
             {
-                int size = in.readInt();
-                byte[] bytes = new byte[size];
-                in.readFully(bytes);
-                ByteBuffer buffer = ByteBuffer.wrap(bytes);
-                waitingOn = (localTxnId, deps) -> {
-                    try
-                    {
-                        return WaitingOnSerializer.deserialize(localTxnId, 
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
-                    }
-                    catch (IOException e)
-                    {
-                        throw Throwables.unchecked(e);
-                    }
-                };
+                if (getFieldIsNull(Fields.PARTIAL_DEPS, flags))
+                    partialDeps = null;
+                else
+                    partialDeps = DepsSerializer.partialDeps.deserialize(in, 
userVersion);
             }
-            if (isSet(flags, HasFields.WRITES.ordinal()))
-                writes = CommandSerializers.writes.deserialize(in, 
userVersion);
 
-            if (isSet(flags, HasFields.LISTENERS.ordinal()))
+            if (getFieldChanged(Fields.ADDITIONAL_KEYS, flags))
             {
-                Listeners builder = Listeners.Immutable.EMPTY.mutable();
-                int cnt = in.readByte();
-                for (int i = 0; i < cnt; i++)
-                    
builder.add(AccordKeyspace.LocalVersionedSerializers.listeners.deserialize(in));
-                listeners = new Listeners.Immutable(builder);
+                if (getFieldIsNull(Fields.ADDITIONAL_KEYS, flags))
+                    additionalKeysOrRanges = null;
+                else
+                    additionalKeysOrRanges = 
KeySerializers.seekables.deserialize(in, userVersion);
             }
 
-            return new LoadedDiff(txnId,
-                                  executedAt,
-                                  saveStatus,
-                                  durability,
+            if (getFieldChanged(Fields.WAITING_ON, flags))
+            {
+                if (getFieldIsNull(Fields.WAITING_ON, flags))
+                {
+                    waitingOn = null;
+                }
+                else
+                {
+                    int size = in.readInt();
+                    byte[] bytes = new byte[size];
+                    in.readFully(bytes);
+                    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+                    waitingOn = (localTxnId, deps) -> {
+                        try
+                        {
+                            return WaitingOnSerializer.deserialize(localTxnId, 
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
+                        }
+                        catch (IOException e)
+                        {
+                            throw Throwables.unchecked(e);
+                        }
+                    };
+                }
+            }
 
-                                  acceptedOrCommitted,
-                                  promised,
+            if (getFieldChanged(Fields.WRITES, flags))
+            {
+                if (getFieldIsNull(Fields.WRITES, flags))
+                    writes = null;
+                else
+                    writes = CommandSerializers.writes.deserialize(in, 
userVersion);
+            }
 
-                                  route,
-                                  partialTxn,
-                                  partialDeps,
-                                  additionalKeysOrRanges,
+            if (getFieldChanged(Fields.LISTENERS, flags))
+            {
+                if (getFieldIsNull(Fields.LISTENERS, flags))
+                {
+                    listeners = null;
+                }
+                else
+                {
+                    Listeners builder = Listeners.Immutable.EMPTY.mutable();
+                    int cnt = in.readByte();
+                    for (int i = 0; i < cnt; i++)
+                        
builder.add(AccordKeyspace.LocalVersionedSerializers.listeners.deserialize(in));
+                    listeners = new Listeners.Immutable(builder);
+                }
+            }
+        }
 
-                                  waitingOn,
-                                  writes,
-                                  listeners);
+        public void forceResult(Result newValue)
+        {
+            this.result = newValue;
         }
-    }
 
-    static int setBit(int value, int bit)
-    {
-        return value | (1 << bit);
-    }
+        public Command construct() throws IOException
+        {
+            if (!nextCalled)
+                return null;
+
+            CommonAttributes.Mutable attrs = new 
CommonAttributes.Mutable(txnId);
+            if (partialTxn != null)
+                attrs.partialTxn(partialTxn);
+            if (durability != null)
+                attrs.durability(durability);
+            if (route != null)
+                attrs.route(route);
+            if (partialDeps != null &&
+                (saveStatus.known.deps != Status.KnownDeps.NoDeps &&
+                 saveStatus.known.deps != Status.KnownDeps.DepsErased &&
+                 saveStatus.known.deps != Status.KnownDeps.DepsUnknown))
+                attrs.partialDeps(partialDeps);
+            if (additionalKeysOrRanges != null)
+                attrs.additionalKeysOrRanges(additionalKeysOrRanges);
+            if (listeners != null && !listeners.isEmpty())
+                attrs.setListeners(listeners);
+
+            Command.WaitingOn waitingOn = null;
+            if (this.waitingOn != null)
+                waitingOn = this.waitingOn.provide(txnId, partialDeps);
+
+            switch (saveStatus.status)
+            {
+                case NotDefined:
+                    return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+                                                                  : 
Command.NotDefined.notDefined(attrs, promised);
+                case PreAccepted:
+                    return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
+                case AcceptedInvalidate:
+                case Accepted:
+                case PreCommitted:
+                    return Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
+                case Committed:
+                case Stable:
+                    return Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
+                case PreApplied:
+                case Applied:
+                    return Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+                case Truncated:
+                case Invalidated:
+                default:
+                    throw new IllegalStateException();
+            }
+        }
 
-    static boolean isSet(int value, int bit)
-    {
-        return (value & (1 << bit)) != 0;
+        public String toString()
+        {
+            return "Diff {" +
+                   "txnId=" + txnId +
+                   ", executeAt=" + executeAt +
+                   ", saveStatus=" + saveStatus +
+                   ", durability=" + durability +
+                   ", acceptedOrCommitted=" + acceptedOrCommitted +
+                   ", promised=" + promised +
+                   ", route=" + route +
+                   ", partialTxn=" + partialTxn +
+                   ", partialDeps=" + partialDeps +
+                   ", additionalKeysOrRanges=" + additionalKeysOrRanges +
+                   ", waitingOn=" + waitingOn +
+                   ", writes=" + writes +
+                   ", listeners=" + listeners +
+                   '}';
+        }
     }
 
     public interface WaitingOnProvider
     {
         Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
     }
-}
\ No newline at end of file
+
+}//import java.io.IOException;//import java.nio.ByteBuffer;//import 
java.util.List;//import java.util.function.Function;////import 
com.google.common.annotations.VisibleForTesting;////import 
accord.api.Result;//import accord.local.Command;//import 
accord.local.CommonAttributes;//import accord.local.Listeners;//import 
accord.local.SaveStatus;//import accord.local.Status;//import 
accord.primitives.Ballot;//import accord.primitives.PartialDeps;//import 
accord.primitives.PartialTxn;//import accord.primitives.Route;//import 
accord.primitives.Seekables;//import accord.primitives.Timestamp;//import 
accord.primitives.TxnId;//import accord.primitives.Writes;//import 
accord.utils.Invariants;//import 
org.apache.cassandra.io.util.DataInputPlus;//import 
org.apache.cassandra.io.util.DataOutputPlus;//import 
org.apache.cassandra.journal.ValueSerializer;//import 
org.apache.cassandra.service.accord.serializers.CommandSerializers;//import 
org.apache.cassandra.service.accord.serializers.DepsSerializer;/
 /import 
org.apache.cassandra.service.accord.serializers.KeySerializers;//import 
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;//import 
org.apache.cassandra.utils.Throwables;////public class SavedCommand//{//    
public static final ValueSerializer<JournalKey, Object> serializer = new 
SavedCommandSerializer();////    // This enum is order-dependent//    private 
enum Fields//    {//        TXN_ID,//        EXECUTE_AT,//        
SAVE_STATUS,//        DURABILITY,//        ACCEPTED,//        PROMISED,//       
 ROUTE,//        PARTIAL_TXN,//        PARTIAL_DEPS,//        
ADDITIONAL_KEYS,//        WAITING_ON,//        WRITES,//        LISTENERS//    
}////    public final TxnId txnId;////    public final NewValue<Timestamp> 
executeAt;//    public final NewValue<SaveStatus> saveStatus;//    public final 
NewValue<Status.Durability> durability;////    public final NewValue<Ballot> 
acceptedOrCommitted;//    public final NewValue<Ballot> promised;////    public 
final NewValue
 <Route<?>> route;//    public final NewValue<PartialTxn> partialTxn;//    
public final NewValue<PartialDeps> partialDeps;//    public final 
NewValue<Seekables<?, ?>> additionalKeysOrRanges;////    public final 
NewValue<Writes> writes;//    public final 
NewValue<Listeners.Immutable<Command.DurableAndIdempotentListener>> 
listeners;////    public SavedCommand(TxnId txnId,//                        
NewValue<Timestamp> executeAt,//                        NewValue<SaveStatus> 
saveStatus,//                        NewValue<Status.Durability> 
durability,////                        NewValue<Ballot> acceptedOrCommitted,//  
                      NewValue<Ballot> promised,////                        
NewValue<Route<?>> route,//                        NewValue<PartialTxn> 
partialTxn,//                        NewValue<PartialDeps> partialDeps,//       
                 NewValue<Seekables<?, ?>> additionalKeysOrRanges,////          
              NewValue<Writes> writes,//                        NewVal
 ue<Listeners.Immutable<Command.DurableAndIdempotentListener>> listeners)//    
{//        this.txnId = txnId;//        this.executeAt = executeAt;//        
this.saveStatus = saveStatus;//        this.durability = durability;////        
this.acceptedOrCommitted = acceptedOrCommitted;//        this.promised = 
promised;////        this.route = route;//        this.partialTxn = 
partialTxn;//        this.partialDeps = partialDeps;//        
this.additionalKeysOrRanges = additionalKeysOrRanges;////        this.writes = 
writes;//        this.listeners = listeners;//    }////    // Instead of saved 
diff, just encode/serialize right away.//    // Loaded diff will have both a 
method to check if the field exists _and_ a getter//    public static SavedDiff 
diff(Command before, Command after)//    {//        if (before == after)//      
      return null;////        // TODO: we do not need to save `waitingOn` 
_every_ time.//        Command.WaitingOn waitingOn = getWaitingOn(after);//     
   return 
 new SavedDiff(after.txnId(),//                             ifNotEqual(before, 
after, Command::executeAt, true),//                             
ifNotEqual(before, after, Command::saveStatus, false),//                        
     ifNotEqual(before, after, Command::durability, false),////                 
            ifNotEqual(before, after, Command::acceptedOrCommitted, false),//   
                          ifNotEqual(before, after, Command::promised, 
false),////                             ifNotEqual(before, after, 
Command::route, true),//                             ifNotEqual(before, after, 
Command::partialTxn, false),//                             ifNotEqual(before, 
after, Command::partialDeps, false),//                             
ifNotEqual(before, after, Command::additionalKeysOrRanges, false),////          
                   new NewValue<>(waitingOn),//                             
ifNotEqual(before, after, Command::writes, false),//                            
 ifNotEqual(before
 , after, Command::durableListeners, true));//    }////    static Command 
reconstructFromDiff(List<LoadedDiff> diffs)//    {//        return 
reconstructFromDiff(diffs, CommandSerializers.APPLIED);//    }////    /**//     
* @param result is exposed because we are _not_ persisting result, since during 
loading or replay//     *               we do not expect we will have to send a 
result to the client, and data results//     *               can potentially 
contain a large number of entries, so it's best if they are not//     *         
      written into the log.//     *///    @VisibleForTesting//    static 
Command reconstructFromDiff(List<LoadedDiff> diffs, Result result)//    {//     
   TxnId txnId = null;////        Timestamp executeAt = null;//        
SaveStatus saveStatus = null;//        Status.Durability durability = null;//// 
       Ballot acceptedOrCommitted = Ballot.ZERO;//        Ballot promised = 
null;////        Route<?> route = null;//        PartialTxn partialTxn = null;//
         PartialDeps partialDeps = null;//        Seekables<?, ?> 
additionalKeysOrRanges = null;////        WaitingOnProvider waitingOnProvider = 
null;//        Writes writes = null;//        Listeners.Immutable listeners = 
null;////        for (LoadedDiff diff : diffs)//        {//            if 
(diff.txnId != null)//                txnId = diff.txnId;//            if 
(diff.executeAt != null)//                executeAt = diff.executeAt.get();//   
         if (diff.saveStatus != null)//                saveStatus = 
diff.saveStatus.get();//            if (diff.durability != null)//              
  durability = diff.durability.get();////            if 
(diff.acceptedOrCommitted != null)//                acceptedOrCommitted = 
diff.acceptedOrCommitted.get();//            if (diff.promised != null)//       
         promised = diff.promised.get();////            if (diff.route != 
null)//                route = diff.route.get();//            if 
(diff.partialTxn != null)//                partia
 lTxn = diff.partialTxn.get();//            if (diff.partialDeps != null)//     
           partialDeps = diff.partialDeps.get();//            if 
(diff.additionalKeysOrRanges != null)//                additionalKeysOrRanges = 
diff.additionalKeysOrRanges.get();////            if (diff.waitingOn != null)// 
               waitingOnProvider = diff.waitingOn.get();//            if 
(diff.writes != null)//                writes = diff.writes.get();//            
if (diff.listeners != null)//                listeners = 
diff.listeners.get();//        }////        CommonAttributes.Mutable attrs = 
new CommonAttributes.Mutable(txnId);//        if (partialTxn != null)//         
   attrs.partialTxn(partialTxn);//        if (durability != null)//            
attrs.durability(durability);//        if (route != null)//            
attrs.route(route);//        if (partialDeps != null &&//            
(saveStatus.known.deps != Status.KnownDeps.NoDeps &&//             
saveStatus.known.deps != Status.KnownDep
 s.DepsErased &&//             saveStatus.known.deps != 
Status.KnownDeps.DepsUnknown))//            attrs.partialDeps(partialDeps);//   
     if (additionalKeysOrRanges != null)//            
attrs.additionalKeysOrRanges(additionalKeysOrRanges);//        if (listeners != 
null && !listeners.isEmpty())//            attrs.setListeners(listeners);////   
     Command.WaitingOn waitingOn = null;//        if (waitingOnProvider != 
null)//            waitingOn = waitingOnProvider.provide(txnId, 
partialDeps);////        Invariants.checkState(saveStatus != null,//            
                  "Save status is null after applying %s", diffs);//        
switch (saveStatus.status)//        {//            case NotDefined://           
     return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())//                               
                               : Command.NotDefined.notDefined(attrs, 
promised);//            case PreAccepted://                return Comm
 and.PreAccepted.preAccepted(attrs, executeAt, promised);//            case 
AcceptedInvalidate://            case Accepted://            case 
PreCommitted://                return Command.Accepted.accepted(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted);//            case 
Committed://            case Stable://                return 
Command.Committed.committed(attrs, saveStatus, executeAt, promised, 
acceptedOrCommitted, waitingOn);//            case PreApplied://            
case Applied://                return Command.Executed.executed(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, 
result);//            case Truncated://            case Invalidated://          
  default://                throw new IllegalStateException();//        }//    
}////    // TODO (required): this convert function was added only because 
AsyncOperationTest was failing without it;//    //  maybe after switching to 
loading from the log we can just pass l and r directl
 y or remove != null checks.//    private static <OBJ, VAL> NewValue<VAL> 
ifNotEqual(OBJ lo, OBJ ro, Function<OBJ, VAL> convert, boolean 
allowClassMismatch)//    {//        VAL l = null;//        VAL r = null;//      
  if (lo != null) l = convert.apply(lo);//        if (ro != null) r = 
convert.apply(ro);////        if (l == r)//            return null; // null 
here means there was no change////        if (l == null || r == null)//         
   return NewValue.of(r);////        assert allowClassMismatch || l.getClass() 
== r.getClass() : String.format("%s != %s", l.getClass(), r.getClass());////    
    if (l.equals(r))//            return null;////        return 
NewValue.of(r);//    }//////    public static class NewValue<T>//    {//        
final T value;////        private NewValue(T value)//        {//            
this.value = value;//        }////        public boolean isNull()//        {//  
          return value == null;//        }////        public T get()//        
{//            re
 turn value;//        }////        public static <T> NewValue<T> of(T value)//  
      {//            return new NewValue<>(value);//        }////        public 
String toString()//        {//            return "" + value;//        }//    
}////    static Command.WaitingOn getWaitingOn(Command command)//    {//        
if (command instanceof Command.Committed)//            return 
command.asCommitted().waitingOn();////        return null;//    }////    public 
static class SavedDiff extends SavedCommand//    {//        public final 
NewValue<Command.WaitingOn> waitingOn;////        public SavedDiff(TxnId 
txnId,//                         NewValue<Timestamp> executeAt,//               
          NewValue<SaveStatus> saveStatus,//                         
NewValue<Status.Durability> durability,////                         
NewValue<Ballot> acceptedOrCommitted,//                         
NewValue<Ballot> promised,////                         NewValue<Route<?>> 
route,//                         NewVa
 lue<PartialTxn> partialTxn,//                         NewValue<PartialDeps> 
partialDeps,//                         NewValue<Seekables<?, ?>> 
additionalKeysOrRanges,////                         NewValue<Command.WaitingOn> 
waitingOn,//                         NewValue<Writes> writes,//                 
        NewValue<Listeners.Immutable<Command.DurableAndIdempotentListener>> 
listeners)//        {//            super(txnId, executeAt, saveStatus, 
durability, acceptedOrCommitted, promised, route, partialTxn, partialDeps, 
additionalKeysOrRanges, writes, listeners);//            this.waitingOn = 
waitingOn;//        }////        @Override//        public String toString()//  
      {//            return "SavedDiff{" +//                   " txnId=" + 
txnId +//                   ", executeAt=" + executeAt +//                   ", 
saveStatus=" + saveStatus +//                   ", durability=" + durability 
+//                   ", acceptedOrCommitted=" + acceptedOrCommitted +//        
        
    ", promised=" + promised +//                   ", route=" + route +//       
            ", partialTxn=" + partialTxn +//                   ", partialDeps=" 
+ partialDeps +//                   ", writes=" + writes +//                   
", waitingOn=" + waitingOn +//                   '}';//        }//    }////    
public static class LoadedDiff extends SavedCommand//    {//        public 
final NewValue<WaitingOnProvider> waitingOn;////        public LoadedDiff(TxnId 
txnId,//                          NewValue<Timestamp> executeAt,//              
            NewValue<SaveStatus> saveStatus,//                          
NewValue<Status.Durability> durability,////                          
NewValue<Ballot> acceptedOrCommitted,//                          
NewValue<Ballot> promised,////                          NewValue<Route<?>> 
route,//                          NewValue<PartialTxn> partialTxn,//            
              NewValue<PartialDeps> partialDeps,//                          
NewValue
 <Seekables<?, ?>> additionalKeysOrRanges,////                          
NewValue<WaitingOnProvider> waitingOn,//                          
NewValue<Writes> writes,//                          
NewValue<Listeners.Immutable<Command.DurableAndIdempotentListener>> 
listeners)//        {//            super(txnId, executeAt, saveStatus, 
durability, acceptedOrCommitted, promised, route, partialTxn, partialDeps, 
additionalKeysOrRanges, writes, listeners);//            this.waitingOn = 
waitingOn;//        }////        public String toString()//        {//          
  return "LoadedDiff{" +//                   "waitingOn=" + waitingOn +//       
            '}';//        }//    }////    final static class 
SavedCommandSerializer implements ValueSerializer<JournalKey, Object>//    {//  
      @Override//        public int serializedSize(JournalKey key, Object 
value, int userVersion)//        {//            SavedDiff diff = (SavedDiff) 
value;//            long size = 0;////            size += SHORT_SIZE
 ; // flags////////            if (diff.txnId != null)////                size 
+= CommandSerializers.txnId.serializedSize(diff.txnId, userVersion);////        
    if (diff.executeAt != null)////                size += 
CommandSerializers.timestamp.serializedSize(diff.executeAt, userVersion);////   
         if (diff.saveStatus != null)////                size += 
Integer.BYTES;////            if (diff.durability != null)////                
size += Integer.BYTES;////////            if (diff.acceptedOrCommitted != 
null)////                size += 
CommandSerializers.ballot.serializedSize(diff.acceptedOrCommitted, 
userVersion);////            if (diff.promised != null)////                size 
+= CommandSerializers.ballot.serializedSize(diff.promised, 
userVersion);////////            if (diff.route != null)////                
size += 
AccordKeyspace.LocalVersionedSerializers.route.serializedSize(diff.route);////  
          if (diff.partialTxn != null)////                CommandSerializers.par
 tialTxn.serializedSize(diff.partialTxn, userVersion);////            if 
(diff.partialDeps != null)////                
DepsSerializer.partialDeps.serializedSize(diff.partialDeps, userVersion);////   
         if (diff.additionalKeysOrRanges != null)////                
KeySerializers.seekables.serializedSize(diff.additionalKeysOrRanges, 
userVersion);////////            if (diff.waitingOn != null)////            
{////                size += Integer.BYTES;////                size += 
WaitingOnSerializer.serializedSize(diff.waitingOn);////            }////////    
        if (diff.writes != null)////                
CommandSerializers.writes.serializedSize(diff.writes, userVersion);////////     
       if (diff.listeners != null && !diff.listeners.isEmpty())////            
{////                size += Byte.BYTES;////                for 
(Command.DurableAndIdempotentListener listener : diff.listeners)////            
        size += AccordKeyspace.LocalVersionedSerializers.listeners.serializedSi
 ze(listener);////            }//            return (int) size;//        }////  
      @Override//        public void serialize(JournalKey key, Object value, 
DataOutputPlus out, int userVersion) throws IOException//        {//            
SavedDiff diff = (SavedDiff) value;////            
out.writeShort(getSerializedFieldsFlags(diff));//            
out.writeShort(getNullFieldFlags(diff));////            if (diff.txnId != 
null)//                CommandSerializers.txnId.serialize(diff.txnId, out, 
userVersion);//            if (diff.executeAt != null && 
!diff.executeAt.isNull())//                
CommandSerializers.timestamp.serialize(diff.executeAt.get(), out, 
userVersion);//            if (diff.saveStatus != null && 
!diff.saveStatus.isNull())//                
out.writeInt(diff.saveStatus.get().ordinal());//            if (diff.durability 
!= null && !diff.durability.isNull())//                
out.writeInt(diff.durability.get().ordinal());////            if 
(diff.acceptedOrCommitted != nul
 l && !diff.acceptedOrCommitted.isNull())//                
CommandSerializers.ballot.serialize(diff.acceptedOrCommitted.get(), out, 
userVersion);//            if (diff.promised != null && 
!diff.promised.isNull())//                
CommandSerializers.ballot.serialize(diff.promised.get(), out, userVersion);//// 
           if (diff.route != null && !diff.route.isNull())//                
AccordKeyspace.LocalVersionedSerializers.route.serialize(diff.route.get(), 
out); // TODO (required): user version//            if (diff.partialTxn != null 
&& !diff.partialTxn.isNull())//                
CommandSerializers.partialTxn.serialize(diff.partialTxn.get(), out, 
userVersion);//            if (diff.partialDeps != null && 
!diff.partialDeps.isNull())//                
DepsSerializer.partialDeps.serialize(diff.partialDeps.get(), out, 
userVersion);//            if (diff.additionalKeysOrRanges != null && 
!diff.additionalKeysOrRanges.isNull())//                
KeySerializers.seekables.serialize(diff.additi
 onalKeysOrRanges.get(), out, userVersion);////            if (diff.waitingOn 
!= null && !diff.waitingOn.isNull())//            {//                long size 
= WaitingOnSerializer.serializedSize(diff.waitingOn.get());//                
ByteBuffer serialized = WaitingOnSerializer.serialize(diff.txnId, 
diff.waitingOn.get());//                out.writeInt((int) size);//             
   out.write(serialized);//            }////            if (diff.writes != null 
&& !diff.writes.isNull())//                
CommandSerializers.writes.serialize(diff.writes.get(), out, userVersion);////   
         if (diff.listeners != null && !diff.listeners.isNull())//            
{//                out.writeByte(diff.listeners.get().size());//                
for (Command.DurableAndIdempotentListener listener : diff.listeners.get())//    
                
AccordKeyspace.LocalVersionedSerializers.listeners.serialize(listener, out);//  
          }//        }////        private static int 
getSerializedFieldsFlags(Sav
 edDiff diff)//        {//            int flags = 0;////            if 
(diff.txnId != null)//                flags = setBit(flags, 
Fields.TXN_ID.ordinal());//            if (diff.executeAt != null)//            
    flags = setBit(flags, Fields.EXECUTE_AT.ordinal());//            if 
(diff.saveStatus != null)//                flags = setBit(flags, 
Fields.SAVE_STATUS.ordinal());//            if (diff.durability != null)//      
          flags = setBit(flags, Fields.DURABILITY.ordinal());////            if 
(diff.acceptedOrCommitted != null)//                flags = setBit(flags, 
Fields.ACCEPTED.ordinal());//            if (diff.promised != null)//           
     flags = setBit(flags, Fields.PROMISED.ordinal());////            if 
(diff.route != null)//                flags = setBit(flags, 
Fields.ROUTE.ordinal());//            if (diff.partialTxn != null)//            
    flags = setBit(flags, Fields.PARTIAL_TXN.ordinal());//            if 
(diff.partialDeps != null)//                flags 
 = setBit(flags, Fields.PARTIAL_DEPS.ordinal());//            if 
(diff.additionalKeysOrRanges != null)//                flags = setBit(flags, 
Fields.ADDITIONAL_KEYS.ordinal());////            if (diff.waitingOn != null)// 
               flags = setBit(flags, Fields.WAITING_ON.ordinal());//            
if (diff.writes != null)//                flags = setBit(flags, 
Fields.WRITES.ordinal());//            if (diff.listeners != null)//            
    flags = setBit(flags, Fields.LISTENERS.ordinal());//            return 
flags;//        }////        private static int getNullFieldFlags(SavedDiff 
diff)//        {//            int flags = 0;////            if (diff.executeAt 
!= null && diff.executeAt.isNull())//                flags = setBit(flags, 
Fields.EXECUTE_AT.ordinal());//            if (diff.saveStatus != null && 
diff.saveStatus.isNull())//                flags = setBit(flags, 
Fields.SAVE_STATUS.ordinal());//            if (diff.durability != null && 
diff.durability.isNull())//      
           flags = setBit(flags, Fields.DURABILITY.ordinal());////            
if (diff.acceptedOrCommitted != null && diff.acceptedOrCommitted.isNull())//    
            flags = setBit(flags, Fields.ACCEPTED.ordinal());//            if 
(diff.promised != null && diff.promised.isNull())//                flags = 
setBit(flags, Fields.PROMISED.ordinal());////            if (diff.route != null 
&& diff.route.isNull())//                flags = setBit(flags, 
Fields.ROUTE.ordinal());//            if (diff.partialTxn != null && 
diff.partialTxn.isNull())//                flags = setBit(flags, 
Fields.PARTIAL_TXN.ordinal());//            if (diff.partialDeps != null && 
diff.partialDeps.isNull())//                flags = setBit(flags, 
Fields.PARTIAL_DEPS.ordinal());//            if (diff.additionalKeysOrRanges != 
null && diff.additionalKeysOrRanges.isNull())//                flags = 
setBit(flags, Fields.ADDITIONAL_KEYS.ordinal());////            if 
(diff.waitingOn != null && diff.waitingOn.isNull()
 )//                flags = setBit(flags, Fields.WAITING_ON.ordinal());//       
     if (diff.writes != null && diff.writes.isNull())//                flags = 
setBit(flags, Fields.WRITES.ordinal());//            if (diff.listeners != null 
&& diff.listeners.isNull())//                flags = setBit(flags, 
Fields.LISTENERS.ordinal());//            return flags;//        }//////        
@Override//        public Object deserialize(JournalKey key, DataInputPlus in, 
int userVersion) throws IOException//        {//            int hasFieldFlags = 
in.readShort();//            int nullFields = in.readShort();////            
TxnId txnId = null;//            Timestamp executedAt = null;//            
SaveStatus saveStatus = null;//            Status.Durability durability = 
null;////            Ballot acceptedOrCommitted = null;//            Ballot 
promised = null;//            Route<?> route = null;////            PartialTxn 
partialTxn = null;//            PartialDeps partialDeps = null;//       
      Seekables<?, ?> additionalKeysOrRanges = null;////            
WaitingOnProvider waitingOn = (txn, deps) -> null;//            Writes writes = 
null;//            Listeners.Immutable listeners = null;////            if 
(isSet(hasFieldFlags, Fields.TXN_ID.ordinal()) &&//                
!isSet(nullFields, Fields.TXN_ID.ordinal()))//                txnId = 
CommandSerializers.txnId.deserialize(in, userVersion);//            if 
(isSet(hasFieldFlags, Fields.EXECUTE_AT.ordinal()) &&//                
!isSet(nullFields, Fields.EXECUTE_AT.ordinal()))//                executedAt = 
CommandSerializers.timestamp.deserialize(in, userVersion);//            if 
(isSet(hasFieldFlags, Fields.SAVE_STATUS.ordinal()) &&//                
!isSet(nullFields, Fields.SAVE_STATUS.ordinal()))//                saveStatus = 
SaveStatus.values()[in.readInt()];//            if (isSet(hasFieldFlags, 
Fields.DURABILITY.ordinal()) &&//                !isSet(nullFields, 
Fields.DURABILITY.ordinal()))//                du
 rability = Status.Durability.values()[in.readInt()];////            if 
(isSet(hasFieldFlags, Fields.ACCEPTED.ordinal()) &&//                
!isSet(nullFields, Fields.ACCEPTED.ordinal()))//                
acceptedOrCommitted = CommandSerializers.ballot.deserialize(in, userVersion);// 
           if (isSet(hasFieldFlags, Fields.PROMISED.ordinal()) &&//             
   !isSet(nullFields, Fields.PROMISED.ordinal()))//                promised = 
CommandSerializers.ballot.deserialize(in, userVersion);////            if 
(isSet(hasFieldFlags, Fields.ROUTE.ordinal()) &&//                
!isSet(nullFields, Fields.ROUTE.ordinal()))//                route = 
AccordKeyspace.LocalVersionedSerializers.route.deserialize(in);//            if 
(isSet(hasFieldFlags, Fields.PARTIAL_TXN.ordinal()) &&//                
!isSet(nullFields, Fields.PARTIAL_TXN.ordinal()))//                partialTxn = 
CommandSerializers.partialTxn.deserialize(in, userVersion);//            if 
(isSet(hasFieldFlags, Fields.PARTIAL_D
 EPS.ordinal()) &&//                !isSet(nullFields, 
Fields.PARTIAL_DEPS.ordinal()))//                partialDeps = 
DepsSerializer.partialDeps.deserialize(in, userVersion);//            if 
(isSet(hasFieldFlags, Fields.ADDITIONAL_KEYS.ordinal()) &&//                
!isSet(nullFields, Fields.ADDITIONAL_KEYS.ordinal()))//                
additionalKeysOrRanges = KeySerializers.seekables.deserialize(in, 
userVersion);////            if (isSet(hasFieldFlags, 
Fields.WAITING_ON.ordinal()) &&//                !isSet(nullFields, 
Fields.WAITING_ON.ordinal()))//            {//                int size = 
in.readInt();//                byte[] bytes = new byte[size];//                
in.readFully(bytes);//                ByteBuffer buffer = 
ByteBuffer.wrap(bytes);//                waitingOn = (localTxnId, deps) -> {//  
                  try//                    {//                        return 
WaitingOnSerializer.deserialize(localTxnId, deps.keyDeps.keys(), 
deps.rangeDeps, deps.directKeyDeps, buff
 er);//                    }//                    catch (IOException e)//       
             {//                        throw Throwables.unchecked(e);//        
            }//                };//            }//            if 
(isSet(hasFieldFlags, Fields.WRITES.ordinal()) &&//                
!isSet(nullFields, Fields.WRITES.ordinal()))//                writes = 
CommandSerializers.writes.deserialize(in, userVersion);////            if 
(isSet(hasFieldFlags, Fields.LISTENERS.ordinal()) &&//                
!isSet(nullFields, Fields.LISTENERS.ordinal()))//            {//                
Listeners builder = Listeners.Immutable.EMPTY.mutable();//                int 
cnt = in.readByte();//                for (int i = 0; i < cnt; i++)//           
         
builder.add(AccordKeyspace.LocalVersionedSerializers.listeners.deserialize(in));//
                listeners = new Listeners.Immutable(builder);//            
}////            return new LoadedDiff(txnId,//                                 
 execut
 edAt == null && !isSet(nullFields, Fields.EXECUTE_AT.ordinal()) ? null : 
NewValue.of(executedAt),//                                  saveStatus == null 
&& !isSet(nullFields, Fields.SAVE_STATUS.ordinal()) ? null : 
NewValue.of(saveStatus),//                                  durability == null 
&& !isSet(nullFields, Fields.DURABILITY.ordinal()) ? null : 
NewValue.of(durability),////                                  
acceptedOrCommitted == null && !isSet(nullFields, Fields.ACCEPTED.ordinal()) ? 
null : NewValue.of(acceptedOrCommitted),//                                  
promised == null && !isSet(nullFields, Fields.PROMISED.ordinal()) ? null : 
NewValue.of(promised),////                                  route == null && 
!isSet(nullFields, Fields.ROUTE.ordinal()) ? null : NewValue.of(route),//       
                           partialTxn == null && !isSet(nullFields, 
Fields.PARTIAL_TXN.ordinal()) ? null : NewValue.of(partialTxn),//               
                   partialDeps == null && !isSet
 (nullFields, Fields.PARTIAL_DEPS.ordinal()) ? null : 
NewValue.of(partialDeps),//                                  
additionalKeysOrRanges == null && !isSet(nullFields, 
Fields.ADDITIONAL_KEYS.ordinal()) ? null : 
NewValue.of(additionalKeysOrRanges),////                                  
waitingOn == null && !isSet(nullFields, Fields.WAITING_ON.ordinal()) ? null : 
NewValue.of(waitingOn),//                                  writes == null && 
!isSet(nullFields, Fields.WRITES.ordinal()) ? null : NewValue.of(writes),//     
                             listeners == null && !isSet(nullFields, 
Fields.LISTENERS.ordinal()) ? null : NewValue.of(listeners));//        }//    
}////    static int setBit(int value, int bit)//    {//        return value | 
(1 << bit);//    }////    static boolean isSet(int value, int bit)//    {//     
   return (value & (1 << bit)) != 0;//    }////    public interface 
WaitingOnProvider//    {//        Command.WaitingOn provide(TxnId txnId, 
PartialDeps deps);//    }//}

Review Comment:
   think you want to remove this...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to