ifesdjeen commented on code in PR #4321:
URL: https://github.com/apache/cassandra/pull/4321#discussion_r2284905678


##########
src/java/org/apache/cassandra/service/accord/serializers/DepsSerializers.java:
##########
@@ -74,164 +83,294 @@ public DepsSerializer(UnversionedSerializer<Range> 
tokenRange)
         @Override
         public void serialize(D deps, DataOutputPlus out) throws IOException
         {
+            boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+            boolean rangesByTxnId = forceByTxnId || 
deps.rangeDeps.hasByTxnId();
+            out.writeUnsignedVInt32((keysByTxnId ? KEYS_BY_TXNID : 0) | 
(rangesByTxnId ? RANGES_BY_TXNID : 0));
             {
                 KeyDeps keyDeps = deps.keyDeps;
                 KeySerializers.routingKeys.serialize(keyDeps.keys(), out);
-                int txnIdCount = keyDeps.txnIdCount();
-                out.writeUnsignedVInt32(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    CommandSerializers.txnId.serialize(keyDeps.txnId(i), out);
-
-                int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
-                out.writeUnsignedVInt32(keysToTxnIdsCount);
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+                
CommandSerializers.txnId.serializeArray(KeyDeps.SerializerSupport.txnIds(keyDeps),
 out);
+                if (keysByTxnId) serializePackedXtoY(txnIdsToKeys(keyDeps), 
keyDeps.txnIdCount(), keyDeps.keys().size(), out);
+                else serializePackedXtoY(keysToTxnIds(keyDeps), 
keyDeps.keys().size(), keyDeps.txnIdCount(), out);
             }
             {
                 RangeDeps rangeDeps = deps.rangeDeps;
-                int rangeCount = rangeDeps.rangeCount();
-                out.writeUnsignedVInt32(rangeCount);
-                for (int i = 0; i < rangeCount; i++)
-                    tokenRange.serialize(rangeDeps.range(i), out);
-
-                int txnIdCount = rangeDeps.txnIdCount();
-                out.writeUnsignedVInt32(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    CommandSerializers.txnId.serialize(rangeDeps.txnId(i), 
out);
-
-                int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
-                out.writeUnsignedVInt32(rangesToTxnIdsCount);
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+                KeySerializers.rangeArray.serialize(ranges(rangeDeps), out);
+                
CommandSerializers.txnId.serializeArray(RangeDeps.SerializerSupport.txnIds(rangeDeps),
 out);
+                if (rangesByTxnId) 
serializePackedXtoY(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(), 
rangeDeps.rangeCount(), out);
+                else serializePackedXtoY(rangesToTxnIds(rangeDeps), 
rangeDeps.rangeCount(), rangeDeps.txnIdCount(), out);
             }
         }
 
+        private static void serializePackedXtoY(int[] xtoy, int xCount, int 
yCount, DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt32(xtoy.length);
+
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+                if (Invariants.isParanoid())
+                {
+                    if (xCount == 1)
+                    {
+                        Invariants.require(xtoy[0] == xtoy.length, "%d != %d", 
xtoy[0], xtoy.length);
+                        for (int i = 0 ; i < yCount ; ++i) 
Invariants.require(xtoy[1 + i] == i, "%d != %d", xtoy[1 + i], i);
+                    }
+                    else if (yCount == 1)
+                    {
+                        for (int i = 0 ; i < xCount ; ++i) 
Invariants.require(xtoy[i] == xCount + i + 1, "%d != %d", xtoy[i], xCount + i + 
1);
+                        for (int i = xCount ; i < xtoy.length ; ++i) 
Invariants.require(xtoy[i] == 0, "%d != %d", xtoy[i], 0);
+                    }
+                    else if (yCount == 0)
+                    {
+                        for (int i = 0 ; i < xCount ; ++i) 
Invariants.require(xtoy[i] == xCount, "%d != %d", xtoy[i], xCount);
+                    }
+                    else
+                    {
+                        Invariants.require(xtoy.length == 0);
+                    }
+                }
+            }
+            else
+            {
+                serializePackedInts(xtoy, 0, xCount, xtoy.length, out);
+                serializePackedInts(xtoy, xCount, xtoy.length, yCount - 1, 
out);
+            }
+        }
+
+        private static void serializePackedInts(int[] vs, int from, int to, 
int max, DataOutputPlus out) throws IOException
+        {
+            int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+            long buffer = 0L;
+            int bufferCount = 0;
+            for (int i = from; i < to; i++)
+            {
+                Invariants.require(vs[i] <= max);
+                buffer |= (long)vs[i] << bufferCount;
+                bufferCount = bufferCount + bitsPerEntry;
+                if (bufferCount >= 64)
+                {
+                    out.writeLong(buffer);
+                    bufferCount -= 64;
+                    buffer = vs[i] >>> (bitsPerEntry - bufferCount);
+                }
+            }
+            if (bufferCount > 0)
+                out.writeLeastSignificantBytes(buffer, (bufferCount + 7) / 8);
+        }
+
         @Override
         public D deserialize(DataInputPlus in) throws IOException
         {
+            int flags = in.readUnsignedVInt32();
             KeyDeps keyDeps;
             {
                 RoutingKeys keys = KeySerializers.routingKeys.deserialize(in);
-                int txnIdCount = in.readUnsignedVInt32();
-                TxnId[] txnIds = new TxnId[txnIdCount];
-                for (int i = 0; i < txnIdCount; i++)
-                    txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
-                int keysToTxnIdsCount = in.readUnsignedVInt32();
-                int[] keysToTxnIds = new int[keysToTxnIdsCount];
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    keysToTxnIds[i] = in.readUnsignedVInt32();
-
-                keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds);
+                TxnId[] txnIds = 
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+                int[] txnIdsToKeys = null, keysToTxnIds = null;
+                if (0 != (flags & KEYS_BY_TXNID)) txnIdsToKeys = 
deserializePackedXtoY(txnIds.length, keys.size(), in);
+                else keysToTxnIds = deserializePackedXtoY(keys.size(), 
txnIds.length, in);
+                keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds, txnIdsToKeys);
             }
 
             RangeDeps rangeDeps;
             {
-                int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
-                Range[] ranges = new Range[rangeCount];
-                for (int i = 0; i < rangeCount; i++)
-                    ranges[i] = tokenRange.deserialize(in);
-
-                int txnIdCount = in.readUnsignedVInt32();
-                TxnId[] txnIds = new TxnId[txnIdCount];
-                for (int i = 0; i < txnIdCount; i++)
-                    txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
-                int rangesToTxnIdsCount = in.readUnsignedVInt32();
-                int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    rangesToTxnIds[i] = in.readUnsignedVInt32();
-
-                rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds);
+                Range[] ranges = KeySerializers.rangeArray.deserialize(in);
+                TxnId[] txnIds = 
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+                int[] txnIdsToRanges = null, rangesToTxnIds = null;
+                if (0 != (flags & RANGES_BY_TXNID)) txnIdsToRanges = 
deserializePackedXtoY(txnIds.length, ranges.length, in);
+                else rangesToTxnIds = deserializePackedXtoY(ranges.length, 
txnIds.length, in);
+                rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds, txnIdsToRanges);
             }
             return deserialize(keyDeps, rangeDeps, in);
         }
 
+        private static int[] deserializePackedXtoY(int xCount, int yCount, 
DataInputPlus in) throws IOException
+        {
+            int length = in.readUnsignedVInt32();
+            int[] xtoy = new int[length];
+
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+                if (xCount == 1)
+                {
+                    xtoy[0] = xtoy.length;
+                    for (int i = 0 ; i < yCount ; ++i)
+                        xtoy[1 + i] = i;
+                }
+                else if (yCount == 1)
+                {
+                    for (int i = 0 ; i < xCount ; ++i)
+                        xtoy[i] = xCount + i + 1;
+                }
+                else if (yCount == 0)
+                {
+                    for (int i = 0 ; i < xCount ; ++i)
+                        xtoy[i] = xCount;
+                }
+                else
+                {
+                    Invariants.require(length == 0);
+                }
+            }
+            else
+            {
+                deserializePackedInts(xtoy, 0, xCount, xtoy.length, in);
+                deserializePackedInts(xtoy, xCount, xtoy.length, yCount - 1, 
in);
+            }
+            return xtoy;
+        }
+
+        private static void deserializePackedInts(int[] vs, int from, int to, 
int max, DataInputPlus in) throws IOException

Review Comment:
   I think it's worth describing the bit packing scheme here in a 4-5 sentence 
comment. 



##########
.gitmodules:
##########
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = https://github.com/belliottsmith/cassandra-accord.git

Review Comment:
   nit: Should be changed back to mainline branch on commit



##########
src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java:
##########
@@ -46,42 +46,56 @@
 import org.apache.cassandra.io.util.DataOutputPlus;
 import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
 import 
org.apache.cassandra.service.accord.serializers.TxnRequestSerializer.WithUnsyncedSerializer;
+import org.apache.cassandra.utils.vint.VIntCoding;
 
 import static accord.messages.BeginRecovery.RecoverReply.Kind.Ok;
-import static 
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class RecoverySerializers
 {
+    static final int HAS_ROUTE = 0x1;
+    static final int HAS_EXECUTE_AT_EPOCH = 0x2;
+    static final int IS_FAST_PATH_DECIDED = 0x4;
     public static final IVersionedSerializer<BeginRecovery> request = new 
WithUnsyncedSerializer<BeginRecovery>()
     {
         @Override
         public void serializeBody(BeginRecovery recover, DataOutputPlus out, 
Version version) throws IOException
         {
             CommandSerializers.partialTxn.serialize(recover.partialTxn, out, 
version);
+            int flags =   (recover.route != null ? HAS_ROUTE : 0)
+                        | (recover.executeAtOrTxnIdEpoch != 
recover.txnId.epoch() ? HAS_EXECUTE_AT_EPOCH : 0)
+                        | (recover.isFastPathDecided ? IS_FAST_PATH_DECIDED : 
0);
             CommandSerializers.ballot.serialize(recover.ballot, out);
-            serializeNullable(recover.route, out, KeySerializers.fullRoute);
-            out.writeUnsignedVInt(recover.executeAtOrTxnIdEpoch - 
recover.txnId.epoch());
+            out.writeUnsignedVInt32(flags);
+            if (recover.route != null)
+                KeySerializers.fullRoute.serialize(recover.route, out);
+            if (0 != (flags & HAS_EXECUTE_AT_EPOCH))
+                out.writeUnsignedVInt(recover.executeAtOrTxnIdEpoch - 
recover.txnId.epoch());
         }
 
         @Override
         public BeginRecovery deserializeBody(DataInputPlus in, Version 
version, TxnId txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws 
IOException
         {
             PartialTxn partialTxn = 
CommandSerializers.partialTxn.deserialize(in, version);
             Ballot ballot = CommandSerializers.ballot.deserialize(in);
-            @Nullable FullRoute<?> route = deserializeNullable(in, 
KeySerializers.fullRoute);
-            long executeAtOrTxnIdEpoch = in.readUnsignedVInt32() + 
txnId.epoch();
-            return BeginRecovery.SerializationSupport.create(txnId, scope, 
waitForEpoch, minEpoch, partialTxn, ballot, route, executeAtOrTxnIdEpoch);
+            int flags = in.readUnsignedVInt32();
+            FullRoute<?> route = null;
+            if (0 != (flags & HAS_ROUTE))
+                route = KeySerializers.fullRoute.deserialize(in);
+            long executeAtOrTxnIdEpoch = txnId.epoch();
+            if (0 != (flags & HAS_EXECUTE_AT_EPOCH))
+                executeAtOrTxnIdEpoch += in.readUnsignedVInt32();
+            boolean isFastPathDecided = 0 != (flags & IS_FAST_PATH_DECIDED);
+            return BeginRecovery.SerializationSupport.create(txnId, scope, 
waitForEpoch, minEpoch, partialTxn, ballot, route, executeAtOrTxnIdEpoch, 
isFastPathDecided);
         }
 
         @Override
         public long serializedBodySize(BeginRecovery recover, Version version)
         {
             return 
CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
                    + CommandSerializers.ballot.serializedSize(recover.ballot)
-                   + serializedNullableSize(recover.route, 
KeySerializers.fullRoute)
-                   + 
TypeSizes.sizeofUnsignedVInt(recover.executeAtOrTxnIdEpoch - 
recover.txnId.epoch());
+                   + VIntCoding.computeUnsignedVIntSize(IS_FAST_PATH_DECIDED)

Review Comment:
   Should we leave a comment somewhere to indicate fast path is _currently_ the 
highest so we don't miss this in future?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java:
##########
@@ -292,6 +294,16 @@ public TxnUpdate deserialize(TableMetadatasAndKeys 
tablesAndKeys, DataInputPlus
             return new TxnUpdate(tablesAndKeys.tables, keys, fragments, new 
SerializedTxnCondition(condition), consistencyLevel, preserveTimestamps ? 
PreserveTimestamp.yes : PreserveTimestamp.no);
         }
 
+        @Override
+        public void skip(TableMetadatasAndKeys tablesAndKeys, DataInputPlus 
in, Version version) throws IOException
+        {
+            in.readByte();
+            tablesAndKeys.skipKeys(in);
+            skipWithVIntLength(in);
+            skipArray(in, ByteBufferUtil.byteBufferSerializer);
+            deserializeNullable(in, consistencyLevelSerializer);

Review Comment:
   Should we add `skip` for nullable? CL see does have skip, for example 



##########
src/java/org/apache/cassandra/service/accord/serializers/DepsSerializers.java:
##########
@@ -74,164 +83,294 @@ public DepsSerializer(UnversionedSerializer<Range> 
tokenRange)
         @Override
         public void serialize(D deps, DataOutputPlus out) throws IOException
         {
+            boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+            boolean rangesByTxnId = forceByTxnId || 
deps.rangeDeps.hasByTxnId();
+            out.writeUnsignedVInt32((keysByTxnId ? KEYS_BY_TXNID : 0) | 
(rangesByTxnId ? RANGES_BY_TXNID : 0));
             {
                 KeyDeps keyDeps = deps.keyDeps;
                 KeySerializers.routingKeys.serialize(keyDeps.keys(), out);
-                int txnIdCount = keyDeps.txnIdCount();
-                out.writeUnsignedVInt32(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    CommandSerializers.txnId.serialize(keyDeps.txnId(i), out);
-
-                int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
-                out.writeUnsignedVInt32(keysToTxnIdsCount);
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+                
CommandSerializers.txnId.serializeArray(KeyDeps.SerializerSupport.txnIds(keyDeps),
 out);
+                if (keysByTxnId) serializePackedXtoY(txnIdsToKeys(keyDeps), 
keyDeps.txnIdCount(), keyDeps.keys().size(), out);
+                else serializePackedXtoY(keysToTxnIds(keyDeps), 
keyDeps.keys().size(), keyDeps.txnIdCount(), out);
             }
             {
                 RangeDeps rangeDeps = deps.rangeDeps;
-                int rangeCount = rangeDeps.rangeCount();
-                out.writeUnsignedVInt32(rangeCount);
-                for (int i = 0; i < rangeCount; i++)
-                    tokenRange.serialize(rangeDeps.range(i), out);
-
-                int txnIdCount = rangeDeps.txnIdCount();
-                out.writeUnsignedVInt32(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    CommandSerializers.txnId.serialize(rangeDeps.txnId(i), 
out);
-
-                int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
-                out.writeUnsignedVInt32(rangesToTxnIdsCount);
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+                KeySerializers.rangeArray.serialize(ranges(rangeDeps), out);
+                
CommandSerializers.txnId.serializeArray(RangeDeps.SerializerSupport.txnIds(rangeDeps),
 out);
+                if (rangesByTxnId) 
serializePackedXtoY(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(), 
rangeDeps.rangeCount(), out);
+                else serializePackedXtoY(rangesToTxnIds(rangeDeps), 
rangeDeps.rangeCount(), rangeDeps.txnIdCount(), out);
             }
         }
 
+        private static void serializePackedXtoY(int[] xtoy, int xCount, int 
yCount, DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt32(xtoy.length);
+
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+                if (Invariants.isParanoid())
+                {
+                    if (xCount == 1)
+                    {
+                        Invariants.require(xtoy[0] == xtoy.length, "%d != %d", 
xtoy[0], xtoy.length);
+                        for (int i = 0 ; i < yCount ; ++i) 
Invariants.require(xtoy[1 + i] == i, "%d != %d", xtoy[1 + i], i);
+                    }
+                    else if (yCount == 1)
+                    {
+                        for (int i = 0 ; i < xCount ; ++i) 
Invariants.require(xtoy[i] == xCount + i + 1, "%d != %d", xtoy[i], xCount + i + 
1);
+                        for (int i = xCount ; i < xtoy.length ; ++i) 
Invariants.require(xtoy[i] == 0, "%d != %d", xtoy[i], 0);
+                    }
+                    else if (yCount == 0)
+                    {
+                        for (int i = 0 ; i < xCount ; ++i) 
Invariants.require(xtoy[i] == xCount, "%d != %d", xtoy[i], xCount);
+                    }
+                    else
+                    {
+                        Invariants.require(xtoy.length == 0);
+                    }
+                }
+            }
+            else
+            {
+                serializePackedInts(xtoy, 0, xCount, xtoy.length, out);
+                serializePackedInts(xtoy, xCount, xtoy.length, yCount - 1, 
out);
+            }
+        }
+
+        private static void serializePackedInts(int[] vs, int from, int to, 
int max, DataOutputPlus out) throws IOException
+        {
+            int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+            long buffer = 0L;
+            int bufferCount = 0;
+            for (int i = from; i < to; i++)
+            {
+                Invariants.require(vs[i] <= max);
+                buffer |= (long)vs[i] << bufferCount;
+                bufferCount = bufferCount + bitsPerEntry;
+                if (bufferCount >= 64)
+                {
+                    out.writeLong(buffer);
+                    bufferCount -= 64;
+                    buffer = vs[i] >>> (bitsPerEntry - bufferCount);
+                }
+            }
+            if (bufferCount > 0)
+                out.writeLeastSignificantBytes(buffer, (bufferCount + 7) / 8);
+        }
+
         @Override
         public D deserialize(DataInputPlus in) throws IOException
         {
+            int flags = in.readUnsignedVInt32();
             KeyDeps keyDeps;
             {
                 RoutingKeys keys = KeySerializers.routingKeys.deserialize(in);
-                int txnIdCount = in.readUnsignedVInt32();
-                TxnId[] txnIds = new TxnId[txnIdCount];
-                for (int i = 0; i < txnIdCount; i++)
-                    txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
-                int keysToTxnIdsCount = in.readUnsignedVInt32();
-                int[] keysToTxnIds = new int[keysToTxnIdsCount];
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    keysToTxnIds[i] = in.readUnsignedVInt32();
-
-                keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds);
+                TxnId[] txnIds = 
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+                int[] txnIdsToKeys = null, keysToTxnIds = null;
+                if (0 != (flags & KEYS_BY_TXNID)) txnIdsToKeys = 
deserializePackedXtoY(txnIds.length, keys.size(), in);
+                else keysToTxnIds = deserializePackedXtoY(keys.size(), 
txnIds.length, in);
+                keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds, txnIdsToKeys);
             }
 
             RangeDeps rangeDeps;
             {
-                int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
-                Range[] ranges = new Range[rangeCount];
-                for (int i = 0; i < rangeCount; i++)
-                    ranges[i] = tokenRange.deserialize(in);
-
-                int txnIdCount = in.readUnsignedVInt32();
-                TxnId[] txnIds = new TxnId[txnIdCount];
-                for (int i = 0; i < txnIdCount; i++)
-                    txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
-                int rangesToTxnIdsCount = in.readUnsignedVInt32();
-                int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    rangesToTxnIds[i] = in.readUnsignedVInt32();
-
-                rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds);
+                Range[] ranges = KeySerializers.rangeArray.deserialize(in);
+                TxnId[] txnIds = 
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+                int[] txnIdsToRanges = null, rangesToTxnIds = null;
+                if (0 != (flags & RANGES_BY_TXNID)) txnIdsToRanges = 
deserializePackedXtoY(txnIds.length, ranges.length, in);
+                else rangesToTxnIds = deserializePackedXtoY(ranges.length, 
txnIds.length, in);
+                rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds, txnIdsToRanges);
             }
             return deserialize(keyDeps, rangeDeps, in);
         }
 
+        private static int[] deserializePackedXtoY(int xCount, int yCount, 
DataInputPlus in) throws IOException
+        {
+            int length = in.readUnsignedVInt32();
+            int[] xtoy = new int[length];
+
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+                if (xCount == 1)
+                {
+                    xtoy[0] = xtoy.length;
+                    for (int i = 0 ; i < yCount ; ++i)
+                        xtoy[1 + i] = i;
+                }
+                else if (yCount == 1)
+                {
+                    for (int i = 0 ; i < xCount ; ++i)
+                        xtoy[i] = xCount + i + 1;
+                }
+                else if (yCount == 0)
+                {
+                    for (int i = 0 ; i < xCount ; ++i)
+                        xtoy[i] = xCount;
+                }
+                else
+                {
+                    Invariants.require(length == 0);
+                }
+            }
+            else
+            {
+                deserializePackedInts(xtoy, 0, xCount, xtoy.length, in);
+                deserializePackedInts(xtoy, xCount, xtoy.length, yCount - 1, 
in);
+            }
+            return xtoy;
+        }
+
+        private static void deserializePackedInts(int[] vs, int from, int to, 
int max, DataInputPlus in) throws IOException
+        {
+            int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+            int mask = -1 >>> (32 - bitsPerEntry);
+            int remainingBytes = (bitsPerEntry * (to - from) + 7) / 8;
+            long buffer = 0L;
+            int bufferCount = 0;
+            for (int i = from; i < to; i++)
+            {
+                int v = (int)buffer & mask;
+                if (bufferCount >= bitsPerEntry)
+                {
+                    bufferCount -= bitsPerEntry;
+                    buffer >>>= bitsPerEntry;
+                }
+                else
+                {
+                    int newBufferCount;
+                    if (remainingBytes >= 8)
+                    {
+                        buffer = in.readLong();
+                        newBufferCount = 64;
+                        remainingBytes -= 8;
+                    }
+                    else
+                    {
+                        Invariants.require(remainingBytes > 0);
+                        newBufferCount = remainingBytes * 8;
+                        buffer = in.readLeastSignificantBytes(remainingBytes);
+                        remainingBytes = 0;
+                    }
+                    int readExtra = bitsPerEntry - bufferCount;
+                    int extraBits = (int)buffer & (mask >>> bufferCount);
+                    v |= extraBits << bufferCount;
+                    bufferCount = newBufferCount - readExtra;
+                    buffer >>>= readExtra;
+                }
+                Invariants.require(v <= max);
+                vs[i] = v;
+            }
+        }
+
         @Override
         public long serializedSize(D deps)
         {
-            long size;
+            boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+            boolean rangesByTxnId = forceByTxnId || 
deps.rangeDeps.hasByTxnId();
+            long size = 1;

Review Comment:
   Is it guaranteed to be 1 byte? Looks like we're writing vint there, maybe 
add a comment here, or write it as byte?



##########
src/java/org/apache/cassandra/service/accord/serializers/DepsSerializers.java:
##########
@@ -74,164 +83,294 @@ public DepsSerializer(UnversionedSerializer<Range> 
tokenRange)
         @Override
         public void serialize(D deps, DataOutputPlus out) throws IOException
         {
+            boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+            boolean rangesByTxnId = forceByTxnId || 
deps.rangeDeps.hasByTxnId();
+            out.writeUnsignedVInt32((keysByTxnId ? KEYS_BY_TXNID : 0) | 
(rangesByTxnId ? RANGES_BY_TXNID : 0));
             {
                 KeyDeps keyDeps = deps.keyDeps;
                 KeySerializers.routingKeys.serialize(keyDeps.keys(), out);
-                int txnIdCount = keyDeps.txnIdCount();
-                out.writeUnsignedVInt32(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    CommandSerializers.txnId.serialize(keyDeps.txnId(i), out);
-
-                int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
-                out.writeUnsignedVInt32(keysToTxnIdsCount);
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+                
CommandSerializers.txnId.serializeArray(KeyDeps.SerializerSupport.txnIds(keyDeps),
 out);
+                if (keysByTxnId) serializePackedXtoY(txnIdsToKeys(keyDeps), 
keyDeps.txnIdCount(), keyDeps.keys().size(), out);
+                else serializePackedXtoY(keysToTxnIds(keyDeps), 
keyDeps.keys().size(), keyDeps.txnIdCount(), out);
             }
             {
                 RangeDeps rangeDeps = deps.rangeDeps;
-                int rangeCount = rangeDeps.rangeCount();
-                out.writeUnsignedVInt32(rangeCount);
-                for (int i = 0; i < rangeCount; i++)
-                    tokenRange.serialize(rangeDeps.range(i), out);
-
-                int txnIdCount = rangeDeps.txnIdCount();
-                out.writeUnsignedVInt32(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    CommandSerializers.txnId.serialize(rangeDeps.txnId(i), 
out);
-
-                int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
-                out.writeUnsignedVInt32(rangesToTxnIdsCount);
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+                KeySerializers.rangeArray.serialize(ranges(rangeDeps), out);
+                
CommandSerializers.txnId.serializeArray(RangeDeps.SerializerSupport.txnIds(rangeDeps),
 out);
+                if (rangesByTxnId) 
serializePackedXtoY(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(), 
rangeDeps.rangeCount(), out);
+                else serializePackedXtoY(rangesToTxnIds(rangeDeps), 
rangeDeps.rangeCount(), rangeDeps.txnIdCount(), out);
             }
         }
 
+        private static void serializePackedXtoY(int[] xtoy, int xCount, int 
yCount, DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt32(xtoy.length);
+
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+                if (Invariants.isParanoid())
+                {
+                    if (xCount == 1)
+                    {
+                        Invariants.require(xtoy[0] == xtoy.length, "%d != %d", 
xtoy[0], xtoy.length);
+                        for (int i = 0 ; i < yCount ; ++i) 
Invariants.require(xtoy[1 + i] == i, "%d != %d", xtoy[1 + i], i);
+                    }
+                    else if (yCount == 1)
+                    {
+                        for (int i = 0 ; i < xCount ; ++i) 
Invariants.require(xtoy[i] == xCount + i + 1, "%d != %d", xtoy[i], xCount + i + 
1);
+                        for (int i = xCount ; i < xtoy.length ; ++i) 
Invariants.require(xtoy[i] == 0, "%d != %d", xtoy[i], 0);
+                    }
+                    else if (yCount == 0)
+                    {
+                        for (int i = 0 ; i < xCount ; ++i) 
Invariants.require(xtoy[i] == xCount, "%d != %d", xtoy[i], xCount);
+                    }
+                    else
+                    {
+                        Invariants.require(xtoy.length == 0);
+                    }
+                }
+            }
+            else
+            {
+                serializePackedInts(xtoy, 0, xCount, xtoy.length, out);
+                serializePackedInts(xtoy, xCount, xtoy.length, yCount - 1, 
out);
+            }
+        }
+
+        private static void serializePackedInts(int[] vs, int from, int to, 
int max, DataOutputPlus out) throws IOException
+        {
+            int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+            long buffer = 0L;
+            int bufferCount = 0;
+            for (int i = from; i < to; i++)
+            {
+                Invariants.require(vs[i] <= max);
+                buffer |= (long)vs[i] << bufferCount;
+                bufferCount = bufferCount + bitsPerEntry;
+                if (bufferCount >= 64)
+                {
+                    out.writeLong(buffer);
+                    bufferCount -= 64;
+                    buffer = vs[i] >>> (bitsPerEntry - bufferCount);
+                }
+            }
+            if (bufferCount > 0)
+                out.writeLeastSignificantBytes(buffer, (bufferCount + 7) / 8);
+        }
+
         @Override
         public D deserialize(DataInputPlus in) throws IOException
         {
+            int flags = in.readUnsignedVInt32();
             KeyDeps keyDeps;
             {
                 RoutingKeys keys = KeySerializers.routingKeys.deserialize(in);
-                int txnIdCount = in.readUnsignedVInt32();
-                TxnId[] txnIds = new TxnId[txnIdCount];
-                for (int i = 0; i < txnIdCount; i++)
-                    txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
-                int keysToTxnIdsCount = in.readUnsignedVInt32();
-                int[] keysToTxnIds = new int[keysToTxnIdsCount];
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    keysToTxnIds[i] = in.readUnsignedVInt32();
-
-                keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds);
+                TxnId[] txnIds = 
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+                int[] txnIdsToKeys = null, keysToTxnIds = null;
+                if (0 != (flags & KEYS_BY_TXNID)) txnIdsToKeys = 
deserializePackedXtoY(txnIds.length, keys.size(), in);
+                else keysToTxnIds = deserializePackedXtoY(keys.size(), 
txnIds.length, in);
+                keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds, 
keysToTxnIds, txnIdsToKeys);
             }
 
             RangeDeps rangeDeps;
             {
-                int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
-                Range[] ranges = new Range[rangeCount];
-                for (int i = 0; i < rangeCount; i++)
-                    ranges[i] = tokenRange.deserialize(in);
-
-                int txnIdCount = in.readUnsignedVInt32();
-                TxnId[] txnIds = new TxnId[txnIdCount];
-                for (int i = 0; i < txnIdCount; i++)
-                    txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
-                int rangesToTxnIdsCount = in.readUnsignedVInt32();
-                int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    rangesToTxnIds[i] = in.readUnsignedVInt32();
-
-                rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds);
+                Range[] ranges = KeySerializers.rangeArray.deserialize(in);
+                TxnId[] txnIds = 
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+                int[] txnIdsToRanges = null, rangesToTxnIds = null;
+                if (0 != (flags & RANGES_BY_TXNID)) txnIdsToRanges = 
deserializePackedXtoY(txnIds.length, ranges.length, in);
+                else rangesToTxnIds = deserializePackedXtoY(ranges.length, 
txnIds.length, in);
+                rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds, 
rangesToTxnIds, txnIdsToRanges);
             }
             return deserialize(keyDeps, rangeDeps, in);
         }
 
+        private static int[] deserializePackedXtoY(int xCount, int yCount, 
DataInputPlus in) throws IOException
+        {
+            int length = in.readUnsignedVInt32();
+            int[] xtoy = new int[length];
+
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+                if (xCount == 1)
+                {
+                    xtoy[0] = xtoy.length;
+                    for (int i = 0 ; i < yCount ; ++i)
+                        xtoy[1 + i] = i;
+                }
+                else if (yCount == 1)
+                {
+                    for (int i = 0 ; i < xCount ; ++i)
+                        xtoy[i] = xCount + i + 1;
+                }
+                else if (yCount == 0)
+                {
+                    for (int i = 0 ; i < xCount ; ++i)
+                        xtoy[i] = xCount;
+                }
+                else
+                {
+                    Invariants.require(length == 0);
+                }
+            }
+            else
+            {
+                deserializePackedInts(xtoy, 0, xCount, xtoy.length, in);
+                deserializePackedInts(xtoy, xCount, xtoy.length, yCount - 1, 
in);
+            }
+            return xtoy;
+        }
+
+        private static void deserializePackedInts(int[] vs, int from, int to, 
int max, DataInputPlus in) throws IOException
+        {
+            int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+            int mask = -1 >>> (32 - bitsPerEntry);
+            int remainingBytes = (bitsPerEntry * (to - from) + 7) / 8;
+            long buffer = 0L;
+            int bufferCount = 0;
+            for (int i = from; i < to; i++)
+            {
+                int v = (int)buffer & mask;
+                if (bufferCount >= bitsPerEntry)
+                {
+                    bufferCount -= bitsPerEntry;
+                    buffer >>>= bitsPerEntry;
+                }
+                else
+                {
+                    int newBufferCount;
+                    if (remainingBytes >= 8)
+                    {
+                        buffer = in.readLong();
+                        newBufferCount = 64;
+                        remainingBytes -= 8;
+                    }
+                    else
+                    {
+                        Invariants.require(remainingBytes > 0);
+                        newBufferCount = remainingBytes * 8;
+                        buffer = in.readLeastSignificantBytes(remainingBytes);
+                        remainingBytes = 0;
+                    }
+                    int readExtra = bitsPerEntry - bufferCount;
+                    int extraBits = (int)buffer & (mask >>> bufferCount);
+                    v |= extraBits << bufferCount;
+                    bufferCount = newBufferCount - readExtra;
+                    buffer >>>= readExtra;
+                }
+                Invariants.require(v <= max);
+                vs[i] = v;
+            }
+        }
+
         @Override
         public long serializedSize(D deps)
         {
-            long size;
+            boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+            boolean rangesByTxnId = forceByTxnId || 
deps.rangeDeps.hasByTxnId();
+            long size = 1;
             {
                 KeyDeps keyDeps = deps.keyDeps;
-                size = 
KeySerializers.routingKeys.serializedSize(deps.keyDeps.keys());
-                int txnIdCount = keyDeps.txnIdCount();
-                size += sizeofUnsignedVInt(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    size += 
CommandSerializers.txnId.serializedSize(keyDeps.txnId(i));
-
-                int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
-                size += sizeofUnsignedVInt(keysToTxnIdsCount);
-                for (int i = 0; i < keysToTxnIdsCount; i++)
-                    size += sizeofUnsignedVInt(keysToTxnIds(keyDeps, i));
+                size += 
KeySerializers.routingKeys.serializedSize(keyDeps.keys());
+                size += 
CommandSerializers.txnId.serializedArraySize(KeyDeps.SerializerSupport.txnIds(keyDeps));
+                size += keysByTxnId ? 
serializedPackedXtoYSize(txnIdsToKeys(keyDeps), keyDeps.txnIdCount(), 
keyDeps.keys().size())
+                                    : 
serializedPackedXtoYSize(keysToTxnIds(keyDeps), keyDeps.keys().size(), 
keyDeps.txnIdCount());
             }
-
             {
                 RangeDeps rangeDeps = deps.rangeDeps;
-                int rangeCount = rangeDeps.rangeCount();
-                size += sizeofUnsignedVInt(rangeCount);
-                for (int i = 0; i < rangeCount; ++i)
-                    size += tokenRange.serializedSize(rangeDeps.range(i));
-
-                int txnIdCount = rangeDeps.txnIdCount();
-                size += sizeofUnsignedVInt(txnIdCount);
-                for (int i = 0; i < txnIdCount; i++)
-                    size += 
CommandSerializers.txnId.serializedSize(rangeDeps.txnId(i));
-
-                int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
-                size += sizeofUnsignedVInt(rangesToTxnIdsCount);
-                for (int i = 0; i < rangesToTxnIdsCount; i++)
-                    size += sizeofUnsignedVInt(rangesToTxnIds(rangeDeps, i));
+                size += 
KeySerializers.rangeArray.serializedSize(ranges(rangeDeps));
+                size += 
CommandSerializers.txnId.serializedArraySize(RangeDeps.SerializerSupport.txnIds(rangeDeps));
+                size += rangesByTxnId ? 
serializedPackedXtoYSize(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(), 
rangeDeps.rangeCount())
+                                      : 
serializedPackedXtoYSize(rangesToTxnIds(rangeDeps), rangeDeps.rangeCount(), 
rangeDeps.txnIdCount());
             }
             return size;
         }
+
+        private static long serializedPackedXtoYSize(int[] xtoy, int xCount, 
int yCount)
+        {
+            long size = VIntCoding.sizeOfUnsignedVInt(xtoy.length);
+            if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount + 
yCount || xCount == 0 || yCount == 0))
+            {
+                // no point serializing as can be directly inferred
+            }
+            else
+            {
+                size += serializedPackedIntsSize(xtoy, 0, xCount, xtoy.length);
+                size += serializedPackedIntsSize(xtoy, xCount, xtoy.length, 
yCount - 1);
+            }
+            return size;
+        }
+
+        private static long serializedPackedIntsSize(int[] vs, int from, int 
to, int max)

Review Comment:
   nit: vs unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to