ifesdjeen commented on code in PR #4321:
URL: https://github.com/apache/cassandra/pull/4321#discussion_r2284905678
##########
src/java/org/apache/cassandra/service/accord/serializers/DepsSerializers.java:
##########
@@ -74,164 +83,294 @@ public DepsSerializer(UnversionedSerializer<Range>
tokenRange)
@Override
public void serialize(D deps, DataOutputPlus out) throws IOException
{
+ boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+ boolean rangesByTxnId = forceByTxnId ||
deps.rangeDeps.hasByTxnId();
+ out.writeUnsignedVInt32((keysByTxnId ? KEYS_BY_TXNID : 0) |
(rangesByTxnId ? RANGES_BY_TXNID : 0));
{
KeyDeps keyDeps = deps.keyDeps;
KeySerializers.routingKeys.serialize(keyDeps.keys(), out);
- int txnIdCount = keyDeps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- CommandSerializers.txnId.serialize(keyDeps.txnId(i), out);
-
- int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
- out.writeUnsignedVInt32(keysToTxnIdsCount);
- for (int i = 0; i < keysToTxnIdsCount; i++)
- out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+
CommandSerializers.txnId.serializeArray(KeyDeps.SerializerSupport.txnIds(keyDeps),
out);
+ if (keysByTxnId) serializePackedXtoY(txnIdsToKeys(keyDeps),
keyDeps.txnIdCount(), keyDeps.keys().size(), out);
+ else serializePackedXtoY(keysToTxnIds(keyDeps),
keyDeps.keys().size(), keyDeps.txnIdCount(), out);
}
{
RangeDeps rangeDeps = deps.rangeDeps;
- int rangeCount = rangeDeps.rangeCount();
- out.writeUnsignedVInt32(rangeCount);
- for (int i = 0; i < rangeCount; i++)
- tokenRange.serialize(rangeDeps.range(i), out);
-
- int txnIdCount = rangeDeps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- CommandSerializers.txnId.serialize(rangeDeps.txnId(i),
out);
-
- int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
- out.writeUnsignedVInt32(rangesToTxnIdsCount);
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+ KeySerializers.rangeArray.serialize(ranges(rangeDeps), out);
+
CommandSerializers.txnId.serializeArray(RangeDeps.SerializerSupport.txnIds(rangeDeps),
out);
+ if (rangesByTxnId)
serializePackedXtoY(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(),
rangeDeps.rangeCount(), out);
+ else serializePackedXtoY(rangesToTxnIds(rangeDeps),
rangeDeps.rangeCount(), rangeDeps.txnIdCount(), out);
}
}
+ private static void serializePackedXtoY(int[] xtoy, int xCount, int
yCount, DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(xtoy.length);
+
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ if (Invariants.isParanoid())
+ {
+ if (xCount == 1)
+ {
+ Invariants.require(xtoy[0] == xtoy.length, "%d != %d",
xtoy[0], xtoy.length);
+ for (int i = 0 ; i < yCount ; ++i)
Invariants.require(xtoy[1 + i] == i, "%d != %d", xtoy[1 + i], i);
+ }
+ else if (yCount == 1)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
Invariants.require(xtoy[i] == xCount + i + 1, "%d != %d", xtoy[i], xCount + i +
1);
+ for (int i = xCount ; i < xtoy.length ; ++i)
Invariants.require(xtoy[i] == 0, "%d != %d", xtoy[i], 0);
+ }
+ else if (yCount == 0)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
Invariants.require(xtoy[i] == xCount, "%d != %d", xtoy[i], xCount);
+ }
+ else
+ {
+ Invariants.require(xtoy.length == 0);
+ }
+ }
+ }
+ else
+ {
+ serializePackedInts(xtoy, 0, xCount, xtoy.length, out);
+ serializePackedInts(xtoy, xCount, xtoy.length, yCount - 1,
out);
+ }
+ }
+
+ private static void serializePackedInts(int[] vs, int from, int to,
int max, DataOutputPlus out) throws IOException
+ {
+ int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+ long buffer = 0L;
+ int bufferCount = 0;
+ for (int i = from; i < to; i++)
+ {
+ Invariants.require(vs[i] <= max);
+ buffer |= (long)vs[i] << bufferCount;
+ bufferCount = bufferCount + bitsPerEntry;
+ if (bufferCount >= 64)
+ {
+ out.writeLong(buffer);
+ bufferCount -= 64;
+ buffer = vs[i] >>> (bitsPerEntry - bufferCount);
+ }
+ }
+ if (bufferCount > 0)
+ out.writeLeastSignificantBytes(buffer, (bufferCount + 7) / 8);
+ }
+
@Override
public D deserialize(DataInputPlus in) throws IOException
{
+ int flags = in.readUnsignedVInt32();
KeyDeps keyDeps;
{
RoutingKeys keys = KeySerializers.routingKeys.deserialize(in);
- int txnIdCount = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[txnIdCount];
- for (int i = 0; i < txnIdCount; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
- int keysToTxnIdsCount = in.readUnsignedVInt32();
- int[] keysToTxnIds = new int[keysToTxnIdsCount];
- for (int i = 0; i < keysToTxnIdsCount; i++)
- keysToTxnIds[i] = in.readUnsignedVInt32();
-
- keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds);
+ TxnId[] txnIds =
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+ int[] txnIdsToKeys = null, keysToTxnIds = null;
+ if (0 != (flags & KEYS_BY_TXNID)) txnIdsToKeys =
deserializePackedXtoY(txnIds.length, keys.size(), in);
+ else keysToTxnIds = deserializePackedXtoY(keys.size(),
txnIds.length, in);
+ keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds, txnIdsToKeys);
}
RangeDeps rangeDeps;
{
- int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
- Range[] ranges = new Range[rangeCount];
- for (int i = 0; i < rangeCount; i++)
- ranges[i] = tokenRange.deserialize(in);
-
- int txnIdCount = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[txnIdCount];
- for (int i = 0; i < txnIdCount; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
- int rangesToTxnIdsCount = in.readUnsignedVInt32();
- int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- rangesToTxnIds[i] = in.readUnsignedVInt32();
-
- rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds);
+ Range[] ranges = KeySerializers.rangeArray.deserialize(in);
+ TxnId[] txnIds =
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+ int[] txnIdsToRanges = null, rangesToTxnIds = null;
+ if (0 != (flags & RANGES_BY_TXNID)) txnIdsToRanges =
deserializePackedXtoY(txnIds.length, ranges.length, in);
+ else rangesToTxnIds = deserializePackedXtoY(ranges.length,
txnIds.length, in);
+ rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds, txnIdsToRanges);
}
return deserialize(keyDeps, rangeDeps, in);
}
+ private static int[] deserializePackedXtoY(int xCount, int yCount,
DataInputPlus in) throws IOException
+ {
+ int length = in.readUnsignedVInt32();
+ int[] xtoy = new int[length];
+
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ if (xCount == 1)
+ {
+ xtoy[0] = xtoy.length;
+ for (int i = 0 ; i < yCount ; ++i)
+ xtoy[1 + i] = i;
+ }
+ else if (yCount == 1)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
+ xtoy[i] = xCount + i + 1;
+ }
+ else if (yCount == 0)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
+ xtoy[i] = xCount;
+ }
+ else
+ {
+ Invariants.require(length == 0);
+ }
+ }
+ else
+ {
+ deserializePackedInts(xtoy, 0, xCount, xtoy.length, in);
+ deserializePackedInts(xtoy, xCount, xtoy.length, yCount - 1,
in);
+ }
+ return xtoy;
+ }
+
+ private static void deserializePackedInts(int[] vs, int from, int to,
int max, DataInputPlus in) throws IOException
Review Comment:
I think it's worth describing the bit packing scheme here in a 4-5 sentence
comment.
##########
.gitmodules:
##########
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
- url = https://github.com/apache/cassandra-accord.git
- branch = trunk
+ url = https://github.com/belliottsmith/cassandra-accord.git
Review Comment:
nit: Should be changed back to mainline branch on commit
##########
src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java:
##########
@@ -46,42 +46,56 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
import
org.apache.cassandra.service.accord.serializers.TxnRequestSerializer.WithUnsyncedSerializer;
+import org.apache.cassandra.utils.vint.VIntCoding;
import static accord.messages.BeginRecovery.RecoverReply.Kind.Ok;
-import static
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
public class RecoverySerializers
{
+ static final int HAS_ROUTE = 0x1;
+ static final int HAS_EXECUTE_AT_EPOCH = 0x2;
+ static final int IS_FAST_PATH_DECIDED = 0x4;
public static final IVersionedSerializer<BeginRecovery> request = new
WithUnsyncedSerializer<BeginRecovery>()
{
@Override
public void serializeBody(BeginRecovery recover, DataOutputPlus out,
Version version) throws IOException
{
CommandSerializers.partialTxn.serialize(recover.partialTxn, out,
version);
+ int flags = (recover.route != null ? HAS_ROUTE : 0)
+ | (recover.executeAtOrTxnIdEpoch !=
recover.txnId.epoch() ? HAS_EXECUTE_AT_EPOCH : 0)
+ | (recover.isFastPathDecided ? IS_FAST_PATH_DECIDED :
0);
CommandSerializers.ballot.serialize(recover.ballot, out);
- serializeNullable(recover.route, out, KeySerializers.fullRoute);
- out.writeUnsignedVInt(recover.executeAtOrTxnIdEpoch -
recover.txnId.epoch());
+ out.writeUnsignedVInt32(flags);
+ if (recover.route != null)
+ KeySerializers.fullRoute.serialize(recover.route, out);
+ if (0 != (flags & HAS_EXECUTE_AT_EPOCH))
+ out.writeUnsignedVInt(recover.executeAtOrTxnIdEpoch -
recover.txnId.epoch());
}
@Override
public BeginRecovery deserializeBody(DataInputPlus in, Version
version, TxnId txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws
IOException
{
PartialTxn partialTxn =
CommandSerializers.partialTxn.deserialize(in, version);
Ballot ballot = CommandSerializers.ballot.deserialize(in);
- @Nullable FullRoute<?> route = deserializeNullable(in,
KeySerializers.fullRoute);
- long executeAtOrTxnIdEpoch = in.readUnsignedVInt32() +
txnId.epoch();
- return BeginRecovery.SerializationSupport.create(txnId, scope,
waitForEpoch, minEpoch, partialTxn, ballot, route, executeAtOrTxnIdEpoch);
+ int flags = in.readUnsignedVInt32();
+ FullRoute<?> route = null;
+ if (0 != (flags & HAS_ROUTE))
+ route = KeySerializers.fullRoute.deserialize(in);
+ long executeAtOrTxnIdEpoch = txnId.epoch();
+ if (0 != (flags & HAS_EXECUTE_AT_EPOCH))
+ executeAtOrTxnIdEpoch += in.readUnsignedVInt32();
+ boolean isFastPathDecided = 0 != (flags & IS_FAST_PATH_DECIDED);
+ return BeginRecovery.SerializationSupport.create(txnId, scope,
waitForEpoch, minEpoch, partialTxn, ballot, route, executeAtOrTxnIdEpoch,
isFastPathDecided);
}
@Override
public long serializedBodySize(BeginRecovery recover, Version version)
{
return
CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
+ CommandSerializers.ballot.serializedSize(recover.ballot)
- + serializedNullableSize(recover.route,
KeySerializers.fullRoute)
- +
TypeSizes.sizeofUnsignedVInt(recover.executeAtOrTxnIdEpoch -
recover.txnId.epoch());
+ + VIntCoding.computeUnsignedVIntSize(IS_FAST_PATH_DECIDED)
Review Comment:
Should we leave a comment somewhere to indicate fast path is _currently_ the
highest so we don't miss this in future?
##########
src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java:
##########
@@ -292,6 +294,16 @@ public TxnUpdate deserialize(TableMetadatasAndKeys
tablesAndKeys, DataInputPlus
return new TxnUpdate(tablesAndKeys.tables, keys, fragments, new
SerializedTxnCondition(condition), consistencyLevel, preserveTimestamps ?
PreserveTimestamp.yes : PreserveTimestamp.no);
}
+ @Override
+ public void skip(TableMetadatasAndKeys tablesAndKeys, DataInputPlus
in, Version version) throws IOException
+ {
+ in.readByte();
+ tablesAndKeys.skipKeys(in);
+ skipWithVIntLength(in);
+ skipArray(in, ByteBufferUtil.byteBufferSerializer);
+ deserializeNullable(in, consistencyLevelSerializer);
Review Comment:
Should we add `skip` for nullable? CL see does have skip, for example
##########
src/java/org/apache/cassandra/service/accord/serializers/DepsSerializers.java:
##########
@@ -74,164 +83,294 @@ public DepsSerializer(UnversionedSerializer<Range>
tokenRange)
@Override
public void serialize(D deps, DataOutputPlus out) throws IOException
{
+ boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+ boolean rangesByTxnId = forceByTxnId ||
deps.rangeDeps.hasByTxnId();
+ out.writeUnsignedVInt32((keysByTxnId ? KEYS_BY_TXNID : 0) |
(rangesByTxnId ? RANGES_BY_TXNID : 0));
{
KeyDeps keyDeps = deps.keyDeps;
KeySerializers.routingKeys.serialize(keyDeps.keys(), out);
- int txnIdCount = keyDeps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- CommandSerializers.txnId.serialize(keyDeps.txnId(i), out);
-
- int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
- out.writeUnsignedVInt32(keysToTxnIdsCount);
- for (int i = 0; i < keysToTxnIdsCount; i++)
- out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+
CommandSerializers.txnId.serializeArray(KeyDeps.SerializerSupport.txnIds(keyDeps),
out);
+ if (keysByTxnId) serializePackedXtoY(txnIdsToKeys(keyDeps),
keyDeps.txnIdCount(), keyDeps.keys().size(), out);
+ else serializePackedXtoY(keysToTxnIds(keyDeps),
keyDeps.keys().size(), keyDeps.txnIdCount(), out);
}
{
RangeDeps rangeDeps = deps.rangeDeps;
- int rangeCount = rangeDeps.rangeCount();
- out.writeUnsignedVInt32(rangeCount);
- for (int i = 0; i < rangeCount; i++)
- tokenRange.serialize(rangeDeps.range(i), out);
-
- int txnIdCount = rangeDeps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- CommandSerializers.txnId.serialize(rangeDeps.txnId(i),
out);
-
- int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
- out.writeUnsignedVInt32(rangesToTxnIdsCount);
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+ KeySerializers.rangeArray.serialize(ranges(rangeDeps), out);
+
CommandSerializers.txnId.serializeArray(RangeDeps.SerializerSupport.txnIds(rangeDeps),
out);
+ if (rangesByTxnId)
serializePackedXtoY(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(),
rangeDeps.rangeCount(), out);
+ else serializePackedXtoY(rangesToTxnIds(rangeDeps),
rangeDeps.rangeCount(), rangeDeps.txnIdCount(), out);
}
}
+ private static void serializePackedXtoY(int[] xtoy, int xCount, int
yCount, DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(xtoy.length);
+
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ if (Invariants.isParanoid())
+ {
+ if (xCount == 1)
+ {
+ Invariants.require(xtoy[0] == xtoy.length, "%d != %d",
xtoy[0], xtoy.length);
+ for (int i = 0 ; i < yCount ; ++i)
Invariants.require(xtoy[1 + i] == i, "%d != %d", xtoy[1 + i], i);
+ }
+ else if (yCount == 1)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
Invariants.require(xtoy[i] == xCount + i + 1, "%d != %d", xtoy[i], xCount + i +
1);
+ for (int i = xCount ; i < xtoy.length ; ++i)
Invariants.require(xtoy[i] == 0, "%d != %d", xtoy[i], 0);
+ }
+ else if (yCount == 0)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
Invariants.require(xtoy[i] == xCount, "%d != %d", xtoy[i], xCount);
+ }
+ else
+ {
+ Invariants.require(xtoy.length == 0);
+ }
+ }
+ }
+ else
+ {
+ serializePackedInts(xtoy, 0, xCount, xtoy.length, out);
+ serializePackedInts(xtoy, xCount, xtoy.length, yCount - 1,
out);
+ }
+ }
+
+ private static void serializePackedInts(int[] vs, int from, int to,
int max, DataOutputPlus out) throws IOException
+ {
+ int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+ long buffer = 0L;
+ int bufferCount = 0;
+ for (int i = from; i < to; i++)
+ {
+ Invariants.require(vs[i] <= max);
+ buffer |= (long)vs[i] << bufferCount;
+ bufferCount = bufferCount + bitsPerEntry;
+ if (bufferCount >= 64)
+ {
+ out.writeLong(buffer);
+ bufferCount -= 64;
+ buffer = vs[i] >>> (bitsPerEntry - bufferCount);
+ }
+ }
+ if (bufferCount > 0)
+ out.writeLeastSignificantBytes(buffer, (bufferCount + 7) / 8);
+ }
+
@Override
public D deserialize(DataInputPlus in) throws IOException
{
+ int flags = in.readUnsignedVInt32();
KeyDeps keyDeps;
{
RoutingKeys keys = KeySerializers.routingKeys.deserialize(in);
- int txnIdCount = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[txnIdCount];
- for (int i = 0; i < txnIdCount; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
- int keysToTxnIdsCount = in.readUnsignedVInt32();
- int[] keysToTxnIds = new int[keysToTxnIdsCount];
- for (int i = 0; i < keysToTxnIdsCount; i++)
- keysToTxnIds[i] = in.readUnsignedVInt32();
-
- keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds);
+ TxnId[] txnIds =
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+ int[] txnIdsToKeys = null, keysToTxnIds = null;
+ if (0 != (flags & KEYS_BY_TXNID)) txnIdsToKeys =
deserializePackedXtoY(txnIds.length, keys.size(), in);
+ else keysToTxnIds = deserializePackedXtoY(keys.size(),
txnIds.length, in);
+ keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds, txnIdsToKeys);
}
RangeDeps rangeDeps;
{
- int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
- Range[] ranges = new Range[rangeCount];
- for (int i = 0; i < rangeCount; i++)
- ranges[i] = tokenRange.deserialize(in);
-
- int txnIdCount = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[txnIdCount];
- for (int i = 0; i < txnIdCount; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
- int rangesToTxnIdsCount = in.readUnsignedVInt32();
- int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- rangesToTxnIds[i] = in.readUnsignedVInt32();
-
- rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds);
+ Range[] ranges = KeySerializers.rangeArray.deserialize(in);
+ TxnId[] txnIds =
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+ int[] txnIdsToRanges = null, rangesToTxnIds = null;
+ if (0 != (flags & RANGES_BY_TXNID)) txnIdsToRanges =
deserializePackedXtoY(txnIds.length, ranges.length, in);
+ else rangesToTxnIds = deserializePackedXtoY(ranges.length,
txnIds.length, in);
+ rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds, txnIdsToRanges);
}
return deserialize(keyDeps, rangeDeps, in);
}
+ private static int[] deserializePackedXtoY(int xCount, int yCount,
DataInputPlus in) throws IOException
+ {
+ int length = in.readUnsignedVInt32();
+ int[] xtoy = new int[length];
+
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ if (xCount == 1)
+ {
+ xtoy[0] = xtoy.length;
+ for (int i = 0 ; i < yCount ; ++i)
+ xtoy[1 + i] = i;
+ }
+ else if (yCount == 1)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
+ xtoy[i] = xCount + i + 1;
+ }
+ else if (yCount == 0)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
+ xtoy[i] = xCount;
+ }
+ else
+ {
+ Invariants.require(length == 0);
+ }
+ }
+ else
+ {
+ deserializePackedInts(xtoy, 0, xCount, xtoy.length, in);
+ deserializePackedInts(xtoy, xCount, xtoy.length, yCount - 1,
in);
+ }
+ return xtoy;
+ }
+
+ private static void deserializePackedInts(int[] vs, int from, int to,
int max, DataInputPlus in) throws IOException
+ {
+ int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+ int mask = -1 >>> (32 - bitsPerEntry);
+ int remainingBytes = (bitsPerEntry * (to - from) + 7) / 8;
+ long buffer = 0L;
+ int bufferCount = 0;
+ for (int i = from; i < to; i++)
+ {
+ int v = (int)buffer & mask;
+ if (bufferCount >= bitsPerEntry)
+ {
+ bufferCount -= bitsPerEntry;
+ buffer >>>= bitsPerEntry;
+ }
+ else
+ {
+ int newBufferCount;
+ if (remainingBytes >= 8)
+ {
+ buffer = in.readLong();
+ newBufferCount = 64;
+ remainingBytes -= 8;
+ }
+ else
+ {
+ Invariants.require(remainingBytes > 0);
+ newBufferCount = remainingBytes * 8;
+ buffer = in.readLeastSignificantBytes(remainingBytes);
+ remainingBytes = 0;
+ }
+ int readExtra = bitsPerEntry - bufferCount;
+ int extraBits = (int)buffer & (mask >>> bufferCount);
+ v |= extraBits << bufferCount;
+ bufferCount = newBufferCount - readExtra;
+ buffer >>>= readExtra;
+ }
+ Invariants.require(v <= max);
+ vs[i] = v;
+ }
+ }
+
@Override
public long serializedSize(D deps)
{
- long size;
+ boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+ boolean rangesByTxnId = forceByTxnId ||
deps.rangeDeps.hasByTxnId();
+ long size = 1;
Review Comment:
Is it guaranteed to be 1 byte? Looks like we're writing vint there, maybe
add a comment here, or write it as byte?
##########
src/java/org/apache/cassandra/service/accord/serializers/DepsSerializers.java:
##########
@@ -74,164 +83,294 @@ public DepsSerializer(UnversionedSerializer<Range>
tokenRange)
@Override
public void serialize(D deps, DataOutputPlus out) throws IOException
{
+ boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+ boolean rangesByTxnId = forceByTxnId ||
deps.rangeDeps.hasByTxnId();
+ out.writeUnsignedVInt32((keysByTxnId ? KEYS_BY_TXNID : 0) |
(rangesByTxnId ? RANGES_BY_TXNID : 0));
{
KeyDeps keyDeps = deps.keyDeps;
KeySerializers.routingKeys.serialize(keyDeps.keys(), out);
- int txnIdCount = keyDeps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- CommandSerializers.txnId.serialize(keyDeps.txnId(i), out);
-
- int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
- out.writeUnsignedVInt32(keysToTxnIdsCount);
- for (int i = 0; i < keysToTxnIdsCount; i++)
- out.writeUnsignedVInt32(keysToTxnIds(keyDeps, i));
+
CommandSerializers.txnId.serializeArray(KeyDeps.SerializerSupport.txnIds(keyDeps),
out);
+ if (keysByTxnId) serializePackedXtoY(txnIdsToKeys(keyDeps),
keyDeps.txnIdCount(), keyDeps.keys().size(), out);
+ else serializePackedXtoY(keysToTxnIds(keyDeps),
keyDeps.keys().size(), keyDeps.txnIdCount(), out);
}
{
RangeDeps rangeDeps = deps.rangeDeps;
- int rangeCount = rangeDeps.rangeCount();
- out.writeUnsignedVInt32(rangeCount);
- for (int i = 0; i < rangeCount; i++)
- tokenRange.serialize(rangeDeps.range(i), out);
-
- int txnIdCount = rangeDeps.txnIdCount();
- out.writeUnsignedVInt32(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- CommandSerializers.txnId.serialize(rangeDeps.txnId(i),
out);
-
- int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
- out.writeUnsignedVInt32(rangesToTxnIdsCount);
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- out.writeUnsignedVInt32(rangesToTxnIds(rangeDeps, i));
+ KeySerializers.rangeArray.serialize(ranges(rangeDeps), out);
+
CommandSerializers.txnId.serializeArray(RangeDeps.SerializerSupport.txnIds(rangeDeps),
out);
+ if (rangesByTxnId)
serializePackedXtoY(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(),
rangeDeps.rangeCount(), out);
+ else serializePackedXtoY(rangesToTxnIds(rangeDeps),
rangeDeps.rangeCount(), rangeDeps.txnIdCount(), out);
}
}
+ private static void serializePackedXtoY(int[] xtoy, int xCount, int
yCount, DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(xtoy.length);
+
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ if (Invariants.isParanoid())
+ {
+ if (xCount == 1)
+ {
+ Invariants.require(xtoy[0] == xtoy.length, "%d != %d",
xtoy[0], xtoy.length);
+ for (int i = 0 ; i < yCount ; ++i)
Invariants.require(xtoy[1 + i] == i, "%d != %d", xtoy[1 + i], i);
+ }
+ else if (yCount == 1)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
Invariants.require(xtoy[i] == xCount + i + 1, "%d != %d", xtoy[i], xCount + i +
1);
+ for (int i = xCount ; i < xtoy.length ; ++i)
Invariants.require(xtoy[i] == 0, "%d != %d", xtoy[i], 0);
+ }
+ else if (yCount == 0)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
Invariants.require(xtoy[i] == xCount, "%d != %d", xtoy[i], xCount);
+ }
+ else
+ {
+ Invariants.require(xtoy.length == 0);
+ }
+ }
+ }
+ else
+ {
+ serializePackedInts(xtoy, 0, xCount, xtoy.length, out);
+ serializePackedInts(xtoy, xCount, xtoy.length, yCount - 1,
out);
+ }
+ }
+
+ private static void serializePackedInts(int[] vs, int from, int to,
int max, DataOutputPlus out) throws IOException
+ {
+ int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+ long buffer = 0L;
+ int bufferCount = 0;
+ for (int i = from; i < to; i++)
+ {
+ Invariants.require(vs[i] <= max);
+ buffer |= (long)vs[i] << bufferCount;
+ bufferCount = bufferCount + bitsPerEntry;
+ if (bufferCount >= 64)
+ {
+ out.writeLong(buffer);
+ bufferCount -= 64;
+ buffer = vs[i] >>> (bitsPerEntry - bufferCount);
+ }
+ }
+ if (bufferCount > 0)
+ out.writeLeastSignificantBytes(buffer, (bufferCount + 7) / 8);
+ }
+
@Override
public D deserialize(DataInputPlus in) throws IOException
{
+ int flags = in.readUnsignedVInt32();
KeyDeps keyDeps;
{
RoutingKeys keys = KeySerializers.routingKeys.deserialize(in);
- int txnIdCount = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[txnIdCount];
- for (int i = 0; i < txnIdCount; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
- int keysToTxnIdsCount = in.readUnsignedVInt32();
- int[] keysToTxnIds = new int[keysToTxnIdsCount];
- for (int i = 0; i < keysToTxnIdsCount; i++)
- keysToTxnIds[i] = in.readUnsignedVInt32();
-
- keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds);
+ TxnId[] txnIds =
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+ int[] txnIdsToKeys = null, keysToTxnIds = null;
+ if (0 != (flags & KEYS_BY_TXNID)) txnIdsToKeys =
deserializePackedXtoY(txnIds.length, keys.size(), in);
+ else keysToTxnIds = deserializePackedXtoY(keys.size(),
txnIds.length, in);
+ keyDeps = KeyDeps.SerializerSupport.create(keys, txnIds,
keysToTxnIds, txnIdsToKeys);
}
RangeDeps rangeDeps;
{
- int rangeCount = Ints.checkedCast(in.readUnsignedVInt32());
- Range[] ranges = new Range[rangeCount];
- for (int i = 0; i < rangeCount; i++)
- ranges[i] = tokenRange.deserialize(in);
-
- int txnIdCount = in.readUnsignedVInt32();
- TxnId[] txnIds = new TxnId[txnIdCount];
- for (int i = 0; i < txnIdCount; i++)
- txnIds[i] = CommandSerializers.txnId.deserialize(in);
-
- int rangesToTxnIdsCount = in.readUnsignedVInt32();
- int[] rangesToTxnIds = new int[rangesToTxnIdsCount];
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- rangesToTxnIds[i] = in.readUnsignedVInt32();
-
- rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds);
+ Range[] ranges = KeySerializers.rangeArray.deserialize(in);
+ TxnId[] txnIds =
CommandSerializers.txnId.deserializeArray(TxnId[]::new, in);
+ int[] txnIdsToRanges = null, rangesToTxnIds = null;
+ if (0 != (flags & RANGES_BY_TXNID)) txnIdsToRanges =
deserializePackedXtoY(txnIds.length, ranges.length, in);
+ else rangesToTxnIds = deserializePackedXtoY(ranges.length,
txnIds.length, in);
+ rangeDeps = RangeDeps.SerializerSupport.create(ranges, txnIds,
rangesToTxnIds, txnIdsToRanges);
}
return deserialize(keyDeps, rangeDeps, in);
}
+ private static int[] deserializePackedXtoY(int xCount, int yCount,
DataInputPlus in) throws IOException
+ {
+ int length = in.readUnsignedVInt32();
+ int[] xtoy = new int[length];
+
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ if (xCount == 1)
+ {
+ xtoy[0] = xtoy.length;
+ for (int i = 0 ; i < yCount ; ++i)
+ xtoy[1 + i] = i;
+ }
+ else if (yCount == 1)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
+ xtoy[i] = xCount + i + 1;
+ }
+ else if (yCount == 0)
+ {
+ for (int i = 0 ; i < xCount ; ++i)
+ xtoy[i] = xCount;
+ }
+ else
+ {
+ Invariants.require(length == 0);
+ }
+ }
+ else
+ {
+ deserializePackedInts(xtoy, 0, xCount, xtoy.length, in);
+ deserializePackedInts(xtoy, xCount, xtoy.length, yCount - 1,
in);
+ }
+ return xtoy;
+ }
+
+ private static void deserializePackedInts(int[] vs, int from, int to,
int max, DataInputPlus in) throws IOException
+ {
+ int bitsPerEntry = BitUtils.numberOfBitsToRepresent(max);
+ int mask = -1 >>> (32 - bitsPerEntry);
+ int remainingBytes = (bitsPerEntry * (to - from) + 7) / 8;
+ long buffer = 0L;
+ int bufferCount = 0;
+ for (int i = from; i < to; i++)
+ {
+ int v = (int)buffer & mask;
+ if (bufferCount >= bitsPerEntry)
+ {
+ bufferCount -= bitsPerEntry;
+ buffer >>>= bitsPerEntry;
+ }
+ else
+ {
+ int newBufferCount;
+ if (remainingBytes >= 8)
+ {
+ buffer = in.readLong();
+ newBufferCount = 64;
+ remainingBytes -= 8;
+ }
+ else
+ {
+ Invariants.require(remainingBytes > 0);
+ newBufferCount = remainingBytes * 8;
+ buffer = in.readLeastSignificantBytes(remainingBytes);
+ remainingBytes = 0;
+ }
+ int readExtra = bitsPerEntry - bufferCount;
+ int extraBits = (int)buffer & (mask >>> bufferCount);
+ v |= extraBits << bufferCount;
+ bufferCount = newBufferCount - readExtra;
+ buffer >>>= readExtra;
+ }
+ Invariants.require(v <= max);
+ vs[i] = v;
+ }
+ }
+
@Override
public long serializedSize(D deps)
{
- long size;
+ boolean keysByTxnId = forceByTxnId || deps.keyDeps.hasByTxnId();
+ boolean rangesByTxnId = forceByTxnId ||
deps.rangeDeps.hasByTxnId();
+ long size = 1;
{
KeyDeps keyDeps = deps.keyDeps;
- size =
KeySerializers.routingKeys.serializedSize(deps.keyDeps.keys());
- int txnIdCount = keyDeps.txnIdCount();
- size += sizeofUnsignedVInt(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- size +=
CommandSerializers.txnId.serializedSize(keyDeps.txnId(i));
-
- int keysToTxnIdsCount = keysToTxnIdsCount(keyDeps);
- size += sizeofUnsignedVInt(keysToTxnIdsCount);
- for (int i = 0; i < keysToTxnIdsCount; i++)
- size += sizeofUnsignedVInt(keysToTxnIds(keyDeps, i));
+ size +=
KeySerializers.routingKeys.serializedSize(keyDeps.keys());
+ size +=
CommandSerializers.txnId.serializedArraySize(KeyDeps.SerializerSupport.txnIds(keyDeps));
+ size += keysByTxnId ?
serializedPackedXtoYSize(txnIdsToKeys(keyDeps), keyDeps.txnIdCount(),
keyDeps.keys().size())
+ :
serializedPackedXtoYSize(keysToTxnIds(keyDeps), keyDeps.keys().size(),
keyDeps.txnIdCount());
}
-
{
RangeDeps rangeDeps = deps.rangeDeps;
- int rangeCount = rangeDeps.rangeCount();
- size += sizeofUnsignedVInt(rangeCount);
- for (int i = 0; i < rangeCount; ++i)
- size += tokenRange.serializedSize(rangeDeps.range(i));
-
- int txnIdCount = rangeDeps.txnIdCount();
- size += sizeofUnsignedVInt(txnIdCount);
- for (int i = 0; i < txnIdCount; i++)
- size +=
CommandSerializers.txnId.serializedSize(rangeDeps.txnId(i));
-
- int rangesToTxnIdsCount = rangesToTxnIdsCount(rangeDeps);
- size += sizeofUnsignedVInt(rangesToTxnIdsCount);
- for (int i = 0; i < rangesToTxnIdsCount; i++)
- size += sizeofUnsignedVInt(rangesToTxnIds(rangeDeps, i));
+ size +=
KeySerializers.rangeArray.serializedSize(ranges(rangeDeps));
+ size +=
CommandSerializers.txnId.serializedArraySize(RangeDeps.SerializerSupport.txnIds(rangeDeps));
+ size += rangesByTxnId ?
serializedPackedXtoYSize(txnIdsToRanges(rangeDeps), rangeDeps.txnIdCount(),
rangeDeps.rangeCount())
+ :
serializedPackedXtoYSize(rangesToTxnIds(rangeDeps), rangeDeps.rangeCount(),
rangeDeps.txnIdCount());
}
return size;
}
+
+ private static long serializedPackedXtoYSize(int[] xtoy, int xCount,
int yCount)
+ {
+ long size = VIntCoding.sizeOfUnsignedVInt(xtoy.length);
+ if ((xCount <= 1 || yCount <= 1) && (xtoy.length == xCount +
yCount || xCount == 0 || yCount == 0))
+ {
+ // no point serializing as can be directly inferred
+ }
+ else
+ {
+ size += serializedPackedIntsSize(xtoy, 0, xCount, xtoy.length);
+ size += serializedPackedIntsSize(xtoy, xCount, xtoy.length,
yCount - 1);
+ }
+ return size;
+ }
+
+ private static long serializedPackedIntsSize(int[] vs, int from, int
to, int max)
Review Comment:
nit: vs unused
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]