ifesdjeen commented on code in PR #4068:
URL: https://github.com/apache/cassandra/pull/4068#discussion_r2035781303
##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -546,60 +541,38 @@ private static ByteBuffer cellValue(Row row,
ColumnMetadata column)
public static class JournalColumns
{
- static final ClusteringComparator keyComparator =
Journal.partitionKeyAsClusteringComparator();
- static final CompositeType partitionKeyType = (CompositeType)
Journal.partitionKeyType;
- public static final ColumnMetadata store_id = getColumn(Journal,
"store_id");
- public static final ColumnMetadata type = getColumn(Journal, "type");
- public static final ColumnMetadata id = getColumn(Journal, "id");
+ public static final ColumnMetadata key = getColumn(Journal, "key");
public static final ColumnMetadata record = getColumn(Journal,
"record");
public static final ColumnMetadata user_version = getColumn(Journal,
"user_version");
public static final RegularAndStaticColumns regular = new
RegularAndStaticColumns(Columns.NONE, Columns.from(Arrays.asList(record,
user_version)));
public static DecoratedKey decorate(JournalKey key)
{
- ByteBuffer id =
ByteBuffer.allocate(CommandSerializers.txnId.serializedSize());
- CommandSerializers.txnId.serialize(key.id, id);
- id.flip();
- ByteBuffer pk = keyComparator.make(key.commandStoreId,
(byte)key.type.id, id).serializeAsPartitionKey();
- Invariants.require(getTxnId(splitPartitionKey(pk)).equals(key.id));
+ int commandStoreIdBytes =
VIntCoding.computeUnsignedVIntSize(key.commandStoreId);
+ int length = commandStoreIdBytes + 1;
+ if (key.type == JournalKey.Type.COMMAND_DIFF)
+ length += CommandSerializers.txnId.serializedSize(key.id);
+ ByteBuffer pk = ByteBuffer.allocate(length);
+ ByteBufferAccessor.instance.putUnsignedVInt32(pk, 0,
key.commandStoreId);
+ pk.put(commandStoreIdBytes, (byte)key.type.id);
+ if (key.type == JournalKey.Type.COMMAND_DIFF)
+ CommandSerializers.txnId.serializeComparable(key.id, pk,
ByteBufferAccessor.instance, commandStoreIdBytes + 1);
return Journal.partitioner.decorateKey(pk);
}
- public static ByteBuffer[] splitPartitionKey(DecoratedKey key)
- {
- return JournalColumns.partitionKeyType.split(key.getKey());
- }
-
- public static ByteBuffer[] splitPartitionKey(ByteBuffer key)
- {
- return JournalColumns.partitionKeyType.split(key);
- }
-
public static int getStoreId(DecoratedKey pk)
{
- return getStoreId(splitPartitionKey(pk));
- }
-
- public static int getStoreId(ByteBuffer[] partitionKeyComponents)
- {
- return
Int32Type.instance.compose(partitionKeyComponents[store_id.position()]);
- }
-
- public static JournalKey.Type getType(ByteBuffer[]
partitionKeyComponents)
- {
- return
JournalKey.Type.fromId(ByteType.instance.compose(partitionKeyComponents[type.position()]));
- }
-
- public static TxnId getTxnId(ByteBuffer[] partitionKeyComponents)
- {
- ByteBuffer buffer = partitionKeyComponents[id.position()];
- return CommandSerializers.txnId.deserialize(buffer,
buffer.position());
+ return VIntCoding.readUnsignedVInt32(pk.getKey(), 0);
}
public static JournalKey getJournalKey(DecoratedKey key)
{
- ByteBuffer[] parts = splitPartitionKey(key);
- return new JournalKey(getTxnId(parts), getType(parts),
getStoreId(parts));
+ ByteBuffer bb = key.getKey();
+ int storeId = ByteBufferAccessor.instance.getUnsignedVInt32(bb, 0);
+ int offset = VIntCoding.readLengthOfVInt(bb, 0);
+ JournalKey.Type type = JournalKey.Type.fromId(bb.get(offset));
+ TxnId txnId = type != JournalKey.Type.COMMAND_DIFF ? TxnId.NONE :
CommandSerializers.txnId.deserializeComparable(bb, ByteBufferAccessor.instance,
offset + 1);
Review Comment:
Should we apply same optimization (i.e. "skipping" txnid bytes and avoiding
generating txnid objects) for JournalKeySupport?
##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -546,60 +541,38 @@ private static ByteBuffer cellValue(Row row,
ColumnMetadata column)
public static class JournalColumns
{
- static final ClusteringComparator keyComparator =
Journal.partitionKeyAsClusteringComparator();
- static final CompositeType partitionKeyType = (CompositeType)
Journal.partitionKeyType;
- public static final ColumnMetadata store_id = getColumn(Journal,
"store_id");
- public static final ColumnMetadata type = getColumn(Journal, "type");
- public static final ColumnMetadata id = getColumn(Journal, "id");
+ public static final ColumnMetadata key = getColumn(Journal, "key");
public static final ColumnMetadata record = getColumn(Journal,
"record");
public static final ColumnMetadata user_version = getColumn(Journal,
"user_version");
public static final RegularAndStaticColumns regular = new
RegularAndStaticColumns(Columns.NONE, Columns.from(Arrays.asList(record,
user_version)));
public static DecoratedKey decorate(JournalKey key)
{
- ByteBuffer id =
ByteBuffer.allocate(CommandSerializers.txnId.serializedSize());
- CommandSerializers.txnId.serialize(key.id, id);
- id.flip();
- ByteBuffer pk = keyComparator.make(key.commandStoreId,
(byte)key.type.id, id).serializeAsPartitionKey();
- Invariants.require(getTxnId(splitPartitionKey(pk)).equals(key.id));
+ int commandStoreIdBytes =
VIntCoding.computeUnsignedVIntSize(key.commandStoreId);
+ int length = commandStoreIdBytes + 1;
+ if (key.type == JournalKey.Type.COMMAND_DIFF)
+ length += CommandSerializers.txnId.serializedSize(key.id);
+ ByteBuffer pk = ByteBuffer.allocate(length);
+ ByteBufferAccessor.instance.putUnsignedVInt32(pk, 0,
key.commandStoreId);
+ pk.put(commandStoreIdBytes, (byte)key.type.id);
+ if (key.type == JournalKey.Type.COMMAND_DIFF)
+ CommandSerializers.txnId.serializeComparable(key.id, pk,
ByteBufferAccessor.instance, commandStoreIdBytes + 1);
return Journal.partitioner.decorateKey(pk);
}
- public static ByteBuffer[] splitPartitionKey(DecoratedKey key)
- {
- return JournalColumns.partitionKeyType.split(key.getKey());
- }
-
- public static ByteBuffer[] splitPartitionKey(ByteBuffer key)
- {
- return JournalColumns.partitionKeyType.split(key);
- }
-
public static int getStoreId(DecoratedKey pk)
{
- return getStoreId(splitPartitionKey(pk));
- }
-
- public static int getStoreId(ByteBuffer[] partitionKeyComponents)
- {
- return
Int32Type.instance.compose(partitionKeyComponents[store_id.position()]);
- }
-
- public static JournalKey.Type getType(ByteBuffer[]
partitionKeyComponents)
- {
- return
JournalKey.Type.fromId(ByteType.instance.compose(partitionKeyComponents[type.position()]));
- }
-
- public static TxnId getTxnId(ByteBuffer[] partitionKeyComponents)
- {
- ByteBuffer buffer = partitionKeyComponents[id.position()];
- return CommandSerializers.txnId.deserialize(buffer,
buffer.position());
+ return VIntCoding.readUnsignedVInt32(pk.getKey(), 0);
}
public static JournalKey getJournalKey(DecoratedKey key)
{
- ByteBuffer[] parts = splitPartitionKey(key);
- return new JournalKey(getTxnId(parts), getType(parts),
getStoreId(parts));
+ ByteBuffer bb = key.getKey();
+ int storeId = ByteBufferAccessor.instance.getUnsignedVInt32(bb, 0);
+ int offset = VIntCoding.readLengthOfVInt(bb, 0);
Review Comment:
Should we use `VIntCoding.computeUnsignedVIntSize(storeId);` instead?
##########
src/java/org/apache/cassandra/db/marshal/ValueAccessor.java:
##########
@@ -525,23 +525,23 @@ public static <V> int
putLeastSignificantBytes(ValueAccessor<V> accessor, V dst,
break;
case 3:
accessor.putShort(dst, offset, (short)(register >>> 8));
- accessor.putByte(dst, offset, (byte)register);
+ accessor.putByte(dst, offset + 2, (byte)register);
Review Comment:
I was looking at this code several times now, changing subtly because of
more edge conditions, maybe it's time to cover this with tests?
##########
src/java/org/apache/cassandra/db/marshal/ByteBufferAccessor.java:
##########
@@ -318,13 +318,13 @@ public int putFloat(ByteBuffer dst, int offset, float
value)
@Override
public int putLeastSignificantBytes(ByteBuffer dst, int offset, long
register, int bytes)
{
- if (dst.remaining() < Long.BYTES)
+ int pos = dst.position() + offset;
+ if (dst.limit() - pos < Long.BYTES)
Review Comment:
Should we also add an assert that the number of bytes we want to write is
going to fit?
##########
src/java/org/apache/cassandra/service/accord/JournalKey.java:
##########
@@ -161,6 +161,11 @@ private void serializeTxnId(TxnId txnId, byte[] out)
ByteArrayUtil.putInt(out, NODE_OFFSET, txnId.node.id);
}
+ private int serializedSizeOffixedWidthTxnId()
Review Comment:
nit: `Fixed` capitalized?
##########
test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java:
##########
@@ -71,4 +79,48 @@ public void txnIdSerde()
DataOutputBuffer output = new DataOutputBuffer();
qt().forAll(AccordGens.txnIds()).check(txnId ->
Serializers.testSerde(output, CommandSerializers.txnId, txnId));
}
+
+ @Test
+ public void txnIdComparable()
+ {
+ qt().withSeed(3447825112572683888L).forAll(AccordGens.txnIds(),
AccordGens.txnIds()).check(CommandSerializersTest::testComparable);
Review Comment:
Probably worth un-hardcoding the seed.
##########
src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java:
##########
@@ -315,216 +316,461 @@ private static int flags(Timestamp executeAt, boolean
nullable)
}
// TODO (expected): optimise using subset serializers, or perhaps simply
with some deduping key serializer
- public static class StoreParticipantsSerializer implements
IVersionedSerializer<StoreParticipants>
+ public static class StoreParticipantsSerializer implements
UnversionedSerializer<StoreParticipants>
{
static final int HAS_ROUTE = 0x1;
- static final int HAS_TOUCHED_EQUALS_ROUTE = 0x2;
- static final int TOUCHES_EQUALS_HAS_TOUCHED = 0x4;
- static final int OWNS_EQUALS_TOUCHES = 0x8;
- static final int EXECUTES_IS_NULL = 0x10;
- static final int EXECUTES_IS_OWNS = 0x20;
- static final int WAITSON_IS_OWNS = 0x40;
+ static final int ROUTE_EQUALS_SUPERSET = 0x2;
+ static final int HAS_TOUCHED_EQUALS_SUPERSET = 0x4;
+ static final int TOUCHES_EQUALS_HAS_TOUCHED = 0x8;
+ static final int OWNS_EQUALS_TOUCHES = 0x10;
+ static final int EXECUTES_IS_NULL = 0x20;
+ static final int EXECUTES_IS_OWNS = 0x40;
+ static final int WAITSON_IS_OWNS = 0x80;
@Override
- public void serialize(StoreParticipants t, DataOutputPlus out, Version
version) throws IOException
- {
- boolean hasRoute = t.route() != null;
- boolean hasTouchedEqualsRoute = t.route() == t.hasTouched();
- boolean touchesEqualsHasTouched = t.touches() == t.hasTouched();
- boolean ownsEqualsTouches = t.owns() == t.touches();
- boolean executesIsNull = t.executes() == null;
- boolean executesIsOwns = !executesIsNull && t.executes() ==
t.owns();
- boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == t.owns();
+ public void serialize(StoreParticipants t, DataOutputPlus out) throws
IOException
+ {
+ Participants<?> hasTouched = t.hasTouched();
+ Route<?> route = t.route();
+ Participants<?> owns = t.owns();
+ Participants<?> executes = t.executes();
+ Participants<?> touches = t.touches();
+ boolean hasRoute = route != null;
+ boolean touchesEqualsHasTouched = touches == hasTouched;
+ boolean ownsEqualsTouches = owns == touches;
+ boolean executesIsNull = executes == null;
+ boolean executesIsOwns = !executesIsNull && executes == owns;
+ boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == owns;
+ boolean encodeSubsets = hasTouched.domain() == Routable.Domain.Key;
+ Participants<?> superset = !hasRoute ? hasTouched : encodeSubsets
? route.with((Participants)hasTouched) : route;
+ boolean routeEqualsSuperset = route == superset;
+ boolean hasTouchedEqualsSuperset = hasTouched == superset;
out.writeByte((hasRoute ? HAS_ROUTE : 0)
- | (hasTouchedEqualsRoute ? HAS_TOUCHED_EQUALS_ROUTE
: 0)
+ | (routeEqualsSuperset ? ROUTE_EQUALS_SUPERSET : 0)
+ | (hasTouchedEqualsSuperset ?
HAS_TOUCHED_EQUALS_SUPERSET : 0)
| (touchesEqualsHasTouched ?
TOUCHES_EQUALS_HAS_TOUCHED : 0)
| (ownsEqualsTouches ? OWNS_EQUALS_TOUCHES : 0)
| (executesIsNull ? EXECUTES_IS_NULL : 0)
| (executesIsOwns ? EXECUTES_IS_OWNS : 0)
| (waitsOnIsOwns ? WAITSON_IS_OWNS : 0)
);
- if (hasRoute) KeySerializers.route.serialize(t.route(), out);
- if (!hasTouchedEqualsRoute)
KeySerializers.participants.serialize(t.hasTouched(), out);
- if (!touchesEqualsHasTouched)
KeySerializers.participants.serialize(t.touches(), out);
- if (!ownsEqualsTouches)
KeySerializers.participants.serialize(t.owns(), out);
- if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serialize(t.executes(), out);
- if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serialize(t.waitsOn(), out);
+
+ KeySerializers.participants.serialize(superset, out);
+ if (encodeSubsets)
+ {
+ if (hasRoute && !routeEqualsSuperset)
KeySerializers.route.serializeSubset(route, superset, out);
+ if (!hasTouchedEqualsSuperset)
KeySerializers.participants.serializeSubset(hasTouched, superset, out);
+ if (!touchesEqualsHasTouched)
KeySerializers.participants.serializeSubset(touches, superset, out);
+ if (!ownsEqualsTouches)
KeySerializers.participants.serializeSubset(owns, superset, out);
+ if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serializeSubset(executes, superset, out);
+ if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serializeSubset(t.waitsOn(), superset, out);
+ }
+ else
+ {
+ if (hasRoute && !routeEqualsSuperset)
KeySerializers.route.serialize(route, out);
+ if (!hasTouchedEqualsSuperset)
KeySerializers.participants.serialize(hasTouched, out);
+ if (!touchesEqualsHasTouched)
KeySerializers.participants.serialize(touches, out);
+ if (!ownsEqualsTouches)
KeySerializers.participants.serialize(owns, out);
+ if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serialize(executes, out);
+ if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serialize(t.waitsOn(), out);
+ }
}
- public void skip(DataInputPlus in, Version version) throws IOException
+ public void skip(DataInputPlus in) throws IOException
{
int flags = in.readByte();
- if (0 != (flags & HAS_ROUTE)) KeySerializers.route.skip(in);
- if (0 == (flags & HAS_TOUCHED_EQUALS_ROUTE))
KeySerializers.participants.skip(in);
- if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skip(in);
- if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skip(in);
- if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
- if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
+ Unseekables.UnseekablesKind kind =
KeySerializers.participants.readKind(in);
+ int supersetCount = KeySerializers.participants.countAndSkip(kind,
in);
+ boolean skipSubset = kind.domain() == Routable.Domain.Key;
+ if (skipSubset)
+ {
+ if (0 != (flags & HAS_ROUTE) && 0 == (flags &
ROUTE_EQUALS_SUPERSET)) KeySerializers.route.skipSubset(supersetCount, in);
+ if (0 == (flags & HAS_TOUCHED_EQUALS_SUPERSET))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skipSubset(supersetCount, in);
+ }
+ else
+ {
+ if (0 != (flags & HAS_ROUTE) && 0 == (flags &
ROUTE_EQUALS_SUPERSET)) KeySerializers.route.skip(in);
+ if (0 == (flags & HAS_TOUCHED_EQUALS_SUPERSET))
KeySerializers.participants.skip(in);
+ if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skip(in);
+ if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skip(in);
+ if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
+ if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
+ }
}
@Override
- public StoreParticipants deserialize(DataInputPlus in, Version
version) throws IOException
+ public StoreParticipants deserialize(DataInputPlus in) throws
IOException
{
int flags = in.readByte();
- Route<?> route = 0 == (flags & HAS_ROUTE) ? null :
KeySerializers.route.deserialize(in);
- Participants<?> hasTouched = 0 != (flags &
HAS_TOUCHED_EQUALS_ROUTE) ? route : KeySerializers.participants.deserialize(in);
- Participants<?> touches = 0 != (flags &
TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched :
KeySerializers.participants.deserialize(in);
- Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ?
touches : KeySerializers.participants.deserialize(in);
- Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ? null
: 0 != (flags & EXECUTES_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in);
- Participants<?> waitsOn = 0 != (flags & EXECUTES_IS_NULL) ? null :
0 != (flags & WAITSON_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in);
- return StoreParticipants.create(route, owns, executes, waitsOn,
touches, hasTouched);
+ Participants<?> superset =
KeySerializers.participants.deserialize(in);
+ boolean decodeSubset = superset.domain() == Routable.Domain.Key;
+ if (decodeSubset)
+ {
+ Route<?> route = 0 == (flags & HAS_ROUTE) ? null : 0 != (flags
& ROUTE_EQUALS_SUPERSET) ? (Route<?>)superset :
KeySerializers.route.deserializeSubset(superset, in);
+ Participants<?> hasTouched = 0 != (flags &
HAS_TOUCHED_EQUALS_SUPERSET) ? superset :
KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> touches = 0 != (flags &
TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched :
KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ?
touches : KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ?
null : 0 != (flags & EXECUTES_IS_OWNS) ? owns :
KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> waitsOn = 0 != (flags & EXECUTES_IS_NULL) ?
null : 0 != (flags & WAITSON_IS_OWNS) ? owns :
KeySerializers.participants.deserializeSubset(superset, in);
+ return StoreParticipants.create(route, owns, executes,
waitsOn, touches, hasTouched);
+ }
+ else
+ {
+ Route<?> route = 0 == (flags & HAS_ROUTE) ? null : 0 != (flags
& ROUTE_EQUALS_SUPERSET) ? (Route<?>)superset :
KeySerializers.route.deserialize(in);
+ Participants<?> hasTouched = 0 != (flags &
HAS_TOUCHED_EQUALS_SUPERSET) ? superset :
KeySerializers.participants.deserialize(in);
+ Participants<?> touches = 0 != (flags &
TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched :
KeySerializers.participants.deserialize(in);
+ Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ?
touches : KeySerializers.participants.deserialize(in);
+ Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ?
null : 0 != (flags & EXECUTES_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in);
+ Participants<?> waitsOn = 0 != (flags & EXECUTES_IS_NULL) ?
null : 0 != (flags & WAITSON_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in);
+ return StoreParticipants.create(route, owns, executes,
waitsOn, touches, hasTouched);
+ }
}
@Override
- public long serializedSize(StoreParticipants t, Version version)
- {
- boolean hasRoute = t.route() != null;
- boolean hasTouchedEqualsRoute = t.route() == t.hasTouched();
- boolean touchesEqualsHasTouched = t.touches() == t.hasTouched();
- boolean ownsEqualsTouches = t.owns() == t.touches();
- boolean executesIsNotNullAndNotOwns = t.executes() != null &&
t.owns() != t.executes();
- long size = 1;
- if (hasRoute) size +=
KeySerializers.route.serializedSize(t.route());
- if (!hasTouchedEqualsRoute) size +=
KeySerializers.participants.serializedSize(t.hasTouched());
- if (!touchesEqualsHasTouched) size +=
KeySerializers.participants.serializedSize(t.touches());
- if (!ownsEqualsTouches) size +=
KeySerializers.participants.serializedSize(t.owns());
- if (executesIsNotNullAndNotOwns) size +=
KeySerializers.participants.serializedSize(t.executes());
+ public long serializedSize(StoreParticipants t)
+ {
+ Participants<?> hasTouched = t.hasTouched();
+ Route<?> route = t.route();
+ Participants<?> owns = t.owns();
+ Participants<?> executes = t.executes();
+ Participants<?> touches = t.touches();
+ boolean hasRoute = route != null;
+ boolean touchesEqualsHasTouched = touches == hasTouched;
+ boolean ownsEqualsTouches = owns == touches;
+ boolean executesIsNull = executes == null;
+ boolean executesIsOwns = !executesIsNull && executes == owns;
+ boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == owns;
+ boolean encodeSubsets = hasTouched.domain() == Routable.Domain.Key;
+ Participants<?> superset = !hasRoute ? hasTouched : encodeSubsets
? route.with((Participants)hasTouched) : route;
+ boolean routeEqualsSuperset = route == superset;
+ boolean hasTouchedEqualsSuperset = hasTouched == superset;
+ long size = 1 +
KeySerializers.participants.serializedSize(superset);
+ if (encodeSubsets)
+ {
+ if (hasRoute && !routeEqualsSuperset) size +=
KeySerializers.route.serializedSubsetSize(route, superset);
+ if (!hasTouchedEqualsSuperset) size +=
KeySerializers.participants.serializedSubsetSize(hasTouched, superset);
+ if (!touchesEqualsHasTouched) size +=
KeySerializers.participants.serializedSubsetSize(touches, superset);
+ if (!ownsEqualsTouches) size +=
KeySerializers.participants.serializedSubsetSize(owns, superset);
+ if (!executesIsNull && !executesIsOwns) size +=
KeySerializers.participants.serializedSubsetSize(executes, superset);
+ if (!executesIsNull && !waitsOnIsOwns) size +=
KeySerializers.participants.serializedSubsetSize(t.waitsOn(), superset);
+ }
+ else
+ {
+ if (hasRoute && !routeEqualsSuperset) size +=
KeySerializers.route.serializedSize(route);
+ if (!hasTouchedEqualsSuperset) size +=
KeySerializers.participants.serializedSize(hasTouched);
+ if (!touchesEqualsHasTouched) size +=
KeySerializers.participants.serializedSize(touches);
+ if (!ownsEqualsTouches) size +=
KeySerializers.participants.serializedSize(owns);
+ if (!executesIsNull && !executesIsOwns) size +=
KeySerializers.participants.serializedSize(executes);
+ if (!executesIsNull && !waitsOnIsOwns) size +=
KeySerializers.participants.serializedSize(t.waitsOn());
+ }
return size;
}
}
- public static class TimestampSerializer<T extends Timestamp> implements
UnversionedSerializer<T>
+ public static class VariableWidthTimestampSerializer<T extends Timestamp>
implements UnversionedSerializer<T>
{
+ private static final int NODE_SHIFT = 0;
+ private static final int NODE_MASK = 0x3;
+ private static final int NODE_MIN_LENGTH = 1;
+ private static final int FLAGS_SHIFT = NODE_SHIFT +
Integer.bitCount(NODE_MASK);
+ private static final int FLAGS_MASK = 0x1;
+ private static final int FLAGS_MIN_LENGTH = 1;
+ private static final int HLC_SHIFT = FLAGS_SHIFT +
Integer.bitCount(FLAGS_MASK);
+ private static final int HLC_MASK = 0x3;
+ private static final int HLC_MIN_LENGTH = 5;
+ private static final int EPOCH_SHIFT = HLC_SHIFT +
Integer.bitCount(HLC_MASK);
+ private static final int EPOCH_MASK = 0x3;
+ private static final int EPOCH_MIN_LENGTH = 3;
+ static final byte NULL_BYTE = (byte) 0x80;
+ static
+ {
+ Invariants.require(EPOCH_MASK << EPOCH_SHIFT >= 0);
+ }
+
interface Factory<T extends Timestamp>
{
- T create(long msb, long lsb, Node.Id node);
+ T create(long epoch, long hlc, int flags, Node.Id node);
+ }
+
+ private final VariableWidthTimestampSerializer.Factory<T> factory;
+
+ T decodeSpecial(int encodingFlags)
+ {
+ Invariants.require(encodingFlags == NULL_BYTE);
+ return null;
}
- private final TimestampSerializer.Factory<T> factory;
+ byte encodeSpecial(T value)
+ {
+ if (value != null)
+ return 0;
+ return NULL_BYTE;
+ }
- private TimestampSerializer(TimestampSerializer.Factory<T> factory)
+ private
VariableWidthTimestampSerializer(VariableWidthTimestampSerializer.Factory<T>
factory)
{
this.factory = factory;
}
@Override
public void serialize(T ts, DataOutputPlus out) throws IOException
{
- out.writeLong(ts.msb);
- out.writeLong(ts.lsb);
- TopologySerializers.nodeId.serialize(ts.node, out);
+ {
+ byte specialByte = encodeSpecial(ts);
+ if (specialByte != 0)
+ {
+ Invariants.require(specialByte < 0);
+ out.writeByte(specialByte);
+ return;
+ }
+ }
+ long epoch = ts.epoch();
+ long hlc = ts.hlc();
+ int flags = ts.flags();
+ int epochLength = length(epoch, EPOCH_MIN_LENGTH);
+ int hlcLength = length(hlc, HLC_MIN_LENGTH);
+ int flagsLength = length(flags, FLAGS_MIN_LENGTH);
+ int nodeLength = length(ts.node.id, NODE_MIN_LENGTH);
+ int encodingFlags = encodeLength(epochLength, EPOCH_SHIFT,
EPOCH_MIN_LENGTH, EPOCH_MASK)
+ | encodeLength(hlcLength, HLC_SHIFT,
HLC_MIN_LENGTH, HLC_MASK)
+ | encodeLength(flagsLength, FLAGS_SHIFT,
FLAGS_MIN_LENGTH, FLAGS_MASK)
+ | encodeLength(nodeLength, NODE_SHIFT,
NODE_MIN_LENGTH, NODE_MASK);
+ out.writeByte(encodingFlags);
+ out.writeLeastSignificantBytes(epoch, epochLength);
+ out.writeLeastSignificantBytes(hlc, hlcLength);
+ out.writeLeastSignificantBytes(flags, flagsLength);
+ out.writeLeastSignificantBytes(ts.node.id, nodeLength);
+ }
+
+ // exactly the same fundamental format as serialize(), only we
interleave the length bits with the values, maintaining ordering
+ public <V> int serializeComparable(T ts, V dst, ValueAccessor<V>
accessor, int offset)
+ {
+ int position = offset;
+ Invariants.require(encodeSpecial(ts) == 0);
+ long epoch = ts.epoch();
+ long hlc = ts.hlc();
+ int flags = ts.flags();
+ int epochLength = length(epoch, EPOCH_MIN_LENGTH);
+ int hlcLength = length(hlc, HLC_MIN_LENGTH);
+ int flagsLength = length(flags, FLAGS_MIN_LENGTH);
+ int nodeLength = length(ts.node.id, NODE_MIN_LENGTH);
+
+ long pack = packLength(epochLength, epochLength * 8,
EPOCH_MIN_LENGTH, EPOCH_MASK);
+ pack |= epoch;
+ pack <<= 5;
+ pack |= packLength(hlcLength, 3, HLC_MIN_LENGTH, HLC_MASK);
+ pack |= hlc >>> ((hlcLength*8)-3);
+ accessor.putLeastSignificantBytes(dst, position, pack, epochLength
+ 1);
+ position += epochLength + 1;
+
+ hlc <<= 3;
+ hlc |= packLength(flagsLength, 2, FLAGS_MIN_LENGTH, FLAGS_MASK);
+ hlc |= flags >>> ((flagsLength * 8) - 2);
+ accessor.putLeastSignificantBytes(dst, position, hlc, hlcLength);
+ position += hlcLength;
+
+ pack = (long)flags << (2 + nodeLength * 8);
+ pack |= packLength(nodeLength, nodeLength * 8, NODE_MIN_LENGTH,
NODE_MASK);
+ pack |= ts.node.id & 0xffffffffL;
+ accessor.putLeastSignificantBytes(dst, position, pack, flagsLength
+ nodeLength);
+ position += flagsLength + nodeLength;
+ return position - offset;
}
public <V> int serialize(T ts, V dst, ValueAccessor<V> accessor, int
offset)
{
+ {
+ byte specialByte = encodeSpecial(ts);
+ if (specialByte != 0)
+ {
+ Invariants.require(specialByte < 0);
+ accessor.putByte(dst, offset, specialByte);
+ return 1;
+ }
+ }
+
+ long epoch = ts.epoch();
+ long hlc = ts.hlc();
+ int flags = ts.flags();
+ int epochLength = length(epoch, EPOCH_MIN_LENGTH);
+ int hlcLength = length(hlc, HLC_MIN_LENGTH);
+ int flagsLength = length(flags, FLAGS_MIN_LENGTH);
+ int nodeLength = length(ts.node.id, NODE_MIN_LENGTH);
+ int encodingFlags = encodeLength(epochLength, EPOCH_SHIFT,
EPOCH_MIN_LENGTH, EPOCH_MASK)
+ | encodeLength(hlcLength, HLC_SHIFT,
HLC_MIN_LENGTH, HLC_MASK)
+ | encodeLength(flagsLength, FLAGS_SHIFT,
FLAGS_MIN_LENGTH, FLAGS_MASK)
+ | encodeLength(nodeLength, NODE_SHIFT,
NODE_MIN_LENGTH, NODE_MASK);
+
int position = offset;
- position += accessor.putLong(dst, position, ts.msb);
- position += accessor.putLong(dst, position, ts.lsb);
- position += TopologySerializers.nodeId.serialize(ts.node, dst,
accessor, position);
- int size = position - offset;
- Preconditions.checkState(size == serializedSize());
- return size;
+ position += accessor.putByte(dst, position, (byte)encodingFlags);
+ position += accessor.putLeastSignificantBytes(dst, position,
epoch, epochLength);
+ position += accessor.putLeastSignificantBytes(dst, position, hlc,
hlcLength);
+ position += accessor.putLeastSignificantBytes(dst, position,
flags, flagsLength);
+ position += accessor.putLeastSignificantBytes(dst, position,
flags, nodeLength);
Review Comment:
`flags` should be node id here?
##########
test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java:
##########
@@ -124,7 +124,8 @@ public void beforeTest() throws Throwable
@Test
public void testOne()
{
- long seed = System.nanoTime();
+// long seed = System.nanoTime();
+ long seed = 1L;
Review Comment:
probably worth un-hardcoding this too
##########
src/java/org/apache/cassandra/service/accord/serializers/IVersionedWithKeysSerializer.java:
##########
@@ -323,44 +277,132 @@ private void serializeLargeSubset(AbstractRanges
serialize, int serializeCount,
}
}
+ public Routables<?> deserializeSubsetInternal(Routables<?> superset,
DataInputPlus in) throws IOException
Review Comment:
I am a bit confused about why this is unused now
##########
src/java/org/apache/cassandra/service/accord/serializers/IVersionedWithKeysSerializer.java:
##########
Review Comment:
In this class, now `NullableWithKeysSerializer` is unused.
##########
src/java/org/apache/cassandra/service/accord/serializers/IVersionedWithKeysSerializer.java:
##########
@@ -323,44 +277,132 @@ private void serializeLargeSubset(AbstractRanges
serialize, int serializeCount,
}
}
+ public Routables<?> deserializeSubsetInternal(Routables<?> superset,
DataInputPlus in) throws IOException
+ {
+ switch (superset.domain())
+ {
+ default: throw UnhandledEnum.unknown(superset.domain());
+ case Key: return
deserializeRoutingKeySubset((AbstractUnseekableKeys) superset, in, (ks, s) ->
ks == null ? s : RoutingKeys.of(ks));
+ case Range: return deserializeRangeSubset((AbstractRanges)
superset, in, (rs, s) -> rs == null ? s : Ranges.of(rs));
+ }
+ }
+
+ public void skipSubsetInternal(int supersetCount, DataInputPlus in)
throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ if (supersetCount <= 64)
+ return;
+
+ int deserializeCount = supersetCount - (int)encoded;
+ int count = 0;
+ while (count < deserializeCount)
+ {
+ count += in.readUnsignedVInt32();
+ in.readUnsignedVInt32();
+ }
+ }
+
+ public <T, S extends AbstractUnseekableKeys> T
deserializeRoutingKeySubset(S superset, DataInputPlus in,
BiFunction<RoutingKey[], S, T> result) throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ int supersetCount = superset.size();
+ if (encoded == 0L)
+ return result.apply(null, superset);
+ else if (supersetCount >= 64)
+ return result.apply(deserializeLargeRoutingKeySubset(in,
superset, supersetCount, (int) encoded), superset);
+ else
+ return
result.apply(deserializeSmallRoutingKeySubsetArray(encoded, superset,
supersetCount), superset);
+ }
+
+ public <T, S extends AbstractRanges> T deserializeRangeSubset(S
superset, DataInputPlus in, BiFunction<Range[], S, T> result) throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ int supersetCount = superset.size();
+ if (encoded == 0L)
+ return result.apply(null, superset);
+ else if (supersetCount >= 64)
+ return result.apply(deserializeLargeRangeSubset(in, superset,
supersetCount, (int) encoded), superset);
+ else
+ return result.apply(deserializeSmallRangeSubsetArray(encoded,
superset, supersetCount), superset);
+ }
+
@DontInline
- private Unseekables<?> deserializeLargeSubset(DataInputPlus in,
Unseekables<?> superset, int supersetCount, int delta) throws IOException
+ private Routables<?> deserializeLargeSubset(DataInputPlus in,
Routables<?> superset, int supersetCount, int delta) throws IOException
{
- int deserializeCount = supersetCount - delta;
switch (superset.domain())
{
- default: throw new AssertionError("Unhandled domain: " +
superset.domain());
+ default:
+ throw new AssertionError("Unhandled domain: " +
superset.domain());
Review Comment:
nit: unhandled enum?
##########
src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java:
##########
@@ -812,29 +906,115 @@ final void serializeOffsets(KS keys, int startIndex, int
endIndex, DataOutputPlu
}
@Override
- final K deserializeWithPrefix(Object prefix, int length, DataInputPlus
in) throws IOException
+ final RoutingKey deserializeWithPrefix(Object prefix, int length,
DataInputPlus in) throws IOException
{
- return keySerializer.deserializeWithPrefix(prefix, length, in);
+ return routingKey.deserializeWithPrefix(prefix, length, in);
}
@Override
- final K deserializeWithPrefix(Object prefix, int lengthIndex, int[]
lengths, DataInputPlus in) throws IOException
+ final RoutingKey deserializeWithPrefix(Object prefix, int lengthIndex,
int[] lengths, DataInputPlus in) throws IOException
{
- return keySerializer.deserializeWithPrefix(prefix,
lengths[lengthIndex], in);
+ return routingKey.deserializeWithPrefix(prefix,
lengths[lengthIndex], in);
+ }
+
+ public KS deserializeSubset(AbstractUnseekableKeys superset,
DataInputPlus in) throws IOException
+ {
+ RoutingKey[] keys = deserializeRoutingKeySubset(superset, in, (ks,
s) -> ks == null ? s.unsafeKeys() : ks);
+ return deserialize(in, keys);
+ }
+ }
+
+ public abstract static class AbstractKeyRouteSerializer<KS extends
KeyRoute> extends AbstractSearchableRoutingKeysSerializer<KS>
+ {
+ public
AbstractKeyRouteSerializer(AccordSearchableKeySerializer<RoutingKey> serializer)
+ {
+ super(serializer);
+ }
+
+ abstract KS construct(RoutingKey homeKey, RoutingKey[] keys);
+
+ @Override
+ KS deserialize(DataInputPlus in, RoutingKey[] keys) throws IOException
+ {
+ int i = in.readUnsignedVInt32();
+ RoutingKey homeKey = i == 0 ? routingKey.deserialize(in) : keys[i
- 1];
+ return construct(homeKey, keys);
+ }
+
+ @Override
+ public int countAndSkip(DataInputPlus in) throws IOException
+ {
+ int count = super.countAndSkip(in);
+ completeSkip(in);
+ return count;
+ }
+
+ public void skipSubset(int supersetCount, DataInputPlus in) throws
IOException
+ {
+ skipSubsetInternal(supersetCount, in);
+ completeSkip(in);
+ }
+
+ @Override
+ public void serialize(KS route, DataOutputPlus out) throws IOException
+ {
+ super.serialize(route, out);
+ completeSerialize(route, out);
+ }
+
+ @Override
+ public void serializeSubset(KS route, Routables<?> superset,
DataOutputPlus out) throws IOException
+ {
+ super.serializeSubset(route, superset, out);
+ completeSerialize(route, out);
+ }
+
+ @Override
+ public long serializedSize(KS route)
+ {
+ return super.serializedSize(route)
+ + completeSerializedSize(route);
+ }
+
+ public long serializedSubsetSize(KS route, Routables<?> superset)
Review Comment:
nit: override here and skipSubset
##########
src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java:
##########
@@ -315,216 +316,461 @@ private static int flags(Timestamp executeAt, boolean
nullable)
}
// TODO (expected): optimise using subset serializers, or perhaps simply
with some deduping key serializer
- public static class StoreParticipantsSerializer implements
IVersionedSerializer<StoreParticipants>
+ public static class StoreParticipantsSerializer implements
UnversionedSerializer<StoreParticipants>
{
static final int HAS_ROUTE = 0x1;
- static final int HAS_TOUCHED_EQUALS_ROUTE = 0x2;
- static final int TOUCHES_EQUALS_HAS_TOUCHED = 0x4;
- static final int OWNS_EQUALS_TOUCHES = 0x8;
- static final int EXECUTES_IS_NULL = 0x10;
- static final int EXECUTES_IS_OWNS = 0x20;
- static final int WAITSON_IS_OWNS = 0x40;
+ static final int ROUTE_EQUALS_SUPERSET = 0x2;
+ static final int HAS_TOUCHED_EQUALS_SUPERSET = 0x4;
+ static final int TOUCHES_EQUALS_HAS_TOUCHED = 0x8;
+ static final int OWNS_EQUALS_TOUCHES = 0x10;
+ static final int EXECUTES_IS_NULL = 0x20;
+ static final int EXECUTES_IS_OWNS = 0x40;
+ static final int WAITSON_IS_OWNS = 0x80;
@Override
- public void serialize(StoreParticipants t, DataOutputPlus out, Version
version) throws IOException
- {
- boolean hasRoute = t.route() != null;
- boolean hasTouchedEqualsRoute = t.route() == t.hasTouched();
- boolean touchesEqualsHasTouched = t.touches() == t.hasTouched();
- boolean ownsEqualsTouches = t.owns() == t.touches();
- boolean executesIsNull = t.executes() == null;
- boolean executesIsOwns = !executesIsNull && t.executes() ==
t.owns();
- boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == t.owns();
+ public void serialize(StoreParticipants t, DataOutputPlus out) throws
IOException
+ {
+ Participants<?> hasTouched = t.hasTouched();
+ Route<?> route = t.route();
+ Participants<?> owns = t.owns();
+ Participants<?> executes = t.executes();
+ Participants<?> touches = t.touches();
+ boolean hasRoute = route != null;
+ boolean touchesEqualsHasTouched = touches == hasTouched;
+ boolean ownsEqualsTouches = owns == touches;
+ boolean executesIsNull = executes == null;
+ boolean executesIsOwns = !executesIsNull && executes == owns;
+ boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == owns;
+ boolean encodeSubsets = hasTouched.domain() == Routable.Domain.Key;
+ Participants<?> superset = !hasRoute ? hasTouched : encodeSubsets
? route.with((Participants)hasTouched) : route;
+ boolean routeEqualsSuperset = route == superset;
+ boolean hasTouchedEqualsSuperset = hasTouched == superset;
out.writeByte((hasRoute ? HAS_ROUTE : 0)
- | (hasTouchedEqualsRoute ? HAS_TOUCHED_EQUALS_ROUTE
: 0)
+ | (routeEqualsSuperset ? ROUTE_EQUALS_SUPERSET : 0)
+ | (hasTouchedEqualsSuperset ?
HAS_TOUCHED_EQUALS_SUPERSET : 0)
| (touchesEqualsHasTouched ?
TOUCHES_EQUALS_HAS_TOUCHED : 0)
| (ownsEqualsTouches ? OWNS_EQUALS_TOUCHES : 0)
| (executesIsNull ? EXECUTES_IS_NULL : 0)
| (executesIsOwns ? EXECUTES_IS_OWNS : 0)
| (waitsOnIsOwns ? WAITSON_IS_OWNS : 0)
);
- if (hasRoute) KeySerializers.route.serialize(t.route(), out);
- if (!hasTouchedEqualsRoute)
KeySerializers.participants.serialize(t.hasTouched(), out);
- if (!touchesEqualsHasTouched)
KeySerializers.participants.serialize(t.touches(), out);
- if (!ownsEqualsTouches)
KeySerializers.participants.serialize(t.owns(), out);
- if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serialize(t.executes(), out);
- if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serialize(t.waitsOn(), out);
+
+ KeySerializers.participants.serialize(superset, out);
+ if (encodeSubsets)
+ {
+ if (hasRoute && !routeEqualsSuperset)
KeySerializers.route.serializeSubset(route, superset, out);
+ if (!hasTouchedEqualsSuperset)
KeySerializers.participants.serializeSubset(hasTouched, superset, out);
+ if (!touchesEqualsHasTouched)
KeySerializers.participants.serializeSubset(touches, superset, out);
+ if (!ownsEqualsTouches)
KeySerializers.participants.serializeSubset(owns, superset, out);
+ if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serializeSubset(executes, superset, out);
+ if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serializeSubset(t.waitsOn(), superset, out);
+ }
+ else
+ {
+ if (hasRoute && !routeEqualsSuperset)
KeySerializers.route.serialize(route, out);
+ if (!hasTouchedEqualsSuperset)
KeySerializers.participants.serialize(hasTouched, out);
+ if (!touchesEqualsHasTouched)
KeySerializers.participants.serialize(touches, out);
+ if (!ownsEqualsTouches)
KeySerializers.participants.serialize(owns, out);
+ if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serialize(executes, out);
+ if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serialize(t.waitsOn(), out);
+ }
}
- public void skip(DataInputPlus in, Version version) throws IOException
+ public void skip(DataInputPlus in) throws IOException
{
int flags = in.readByte();
- if (0 != (flags & HAS_ROUTE)) KeySerializers.route.skip(in);
- if (0 == (flags & HAS_TOUCHED_EQUALS_ROUTE))
KeySerializers.participants.skip(in);
- if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skip(in);
- if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skip(in);
- if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
- if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
+ Unseekables.UnseekablesKind kind =
KeySerializers.participants.readKind(in);
+ int supersetCount = KeySerializers.participants.countAndSkip(kind,
in);
+ boolean skipSubset = kind.domain() == Routable.Domain.Key;
+ if (skipSubset)
+ {
+ if (0 != (flags & HAS_ROUTE) && 0 == (flags &
ROUTE_EQUALS_SUPERSET)) KeySerializers.route.skipSubset(supersetCount, in);
+ if (0 == (flags & HAS_TOUCHED_EQUALS_SUPERSET))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skipSubset(supersetCount, in);
+ if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skipSubset(supersetCount, in);
+ }
+ else
+ {
+ if (0 != (flags & HAS_ROUTE) && 0 == (flags &
ROUTE_EQUALS_SUPERSET)) KeySerializers.route.skip(in);
+ if (0 == (flags & HAS_TOUCHED_EQUALS_SUPERSET))
KeySerializers.participants.skip(in);
+ if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skip(in);
+ if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skip(in);
+ if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
+ if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in);
+ }
}
@Override
- public StoreParticipants deserialize(DataInputPlus in, Version
version) throws IOException
+ public StoreParticipants deserialize(DataInputPlus in) throws
IOException
{
int flags = in.readByte();
- Route<?> route = 0 == (flags & HAS_ROUTE) ? null :
KeySerializers.route.deserialize(in);
- Participants<?> hasTouched = 0 != (flags &
HAS_TOUCHED_EQUALS_ROUTE) ? route : KeySerializers.participants.deserialize(in);
- Participants<?> touches = 0 != (flags &
TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched :
KeySerializers.participants.deserialize(in);
- Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ?
touches : KeySerializers.participants.deserialize(in);
- Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ? null
: 0 != (flags & EXECUTES_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in);
- Participants<?> waitsOn = 0 != (flags & EXECUTES_IS_NULL) ? null :
0 != (flags & WAITSON_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in);
- return StoreParticipants.create(route, owns, executes, waitsOn,
touches, hasTouched);
+ Participants<?> superset =
KeySerializers.participants.deserialize(in);
+ boolean decodeSubset = superset.domain() == Routable.Domain.Key;
+ if (decodeSubset)
+ {
+ Route<?> route = 0 == (flags & HAS_ROUTE) ? null : 0 != (flags
& ROUTE_EQUALS_SUPERSET) ? (Route<?>)superset :
KeySerializers.route.deserializeSubset(superset, in);
+ Participants<?> hasTouched = 0 != (flags &
HAS_TOUCHED_EQUALS_SUPERSET) ? superset :
KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> touches = 0 != (flags &
TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched :
KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ?
touches : KeySerializers.participants.deserializeSubset(superset, in);
+ Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ?
null : 0 != (flags & EXECUTES_IS_OWNS) ? owns :
KeySerializers.participants.deserializeSubset(superset, in);
Review Comment:
Should we add some flags consistency invariants and check that
`EXECUTES_IS_NULL` is consistent with `EXECUTES_IS_OWNS` and `WAITSON_IS_OWNS`
(i.e. they can be true iif `executesIsNull` is false).
##########
src/java/org/apache/cassandra/utils/vint/VIntCoding.java:
##########
@@ -308,6 +308,17 @@ public static <V> long getUnsignedVInt(V input,
ValueAccessor<V> accessor, int r
return retval;
}
+ public static <V> int readLengthOfVInt(V input, ValueAccessor<V> accessor,
int position)
Review Comment:
A super fundamental method; should we fuzz it with _a lot_ of different
values? Like really go through all byte sizes rather than generating a bunch of
longs.
##########
src/java/org/apache/cassandra/service/accord/serializers/IVersionedWithKeysSerializer.java:
##########
@@ -323,44 +277,132 @@ private void serializeLargeSubset(AbstractRanges
serialize, int serializeCount,
}
}
+ public Routables<?> deserializeSubsetInternal(Routables<?> superset,
DataInputPlus in) throws IOException
+ {
+ switch (superset.domain())
+ {
+ default: throw UnhandledEnum.unknown(superset.domain());
+ case Key: return
deserializeRoutingKeySubset((AbstractUnseekableKeys) superset, in, (ks, s) ->
ks == null ? s : RoutingKeys.of(ks));
+ case Range: return deserializeRangeSubset((AbstractRanges)
superset, in, (rs, s) -> rs == null ? s : Ranges.of(rs));
+ }
+ }
+
+ public void skipSubsetInternal(int supersetCount, DataInputPlus in)
throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ if (supersetCount <= 64)
+ return;
+
+ int deserializeCount = supersetCount - (int)encoded;
+ int count = 0;
+ while (count < deserializeCount)
+ {
+ count += in.readUnsignedVInt32();
+ in.readUnsignedVInt32();
+ }
+ }
+
+ public <T, S extends AbstractUnseekableKeys> T
deserializeRoutingKeySubset(S superset, DataInputPlus in,
BiFunction<RoutingKey[], S, T> result) throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ int supersetCount = superset.size();
+ if (encoded == 0L)
+ return result.apply(null, superset);
+ else if (supersetCount >= 64)
+ return result.apply(deserializeLargeRoutingKeySubset(in,
superset, supersetCount, (int) encoded), superset);
+ else
+ return
result.apply(deserializeSmallRoutingKeySubsetArray(encoded, superset,
supersetCount), superset);
Review Comment:
should we remove "array" from the method name here?
##########
src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java:
##########
@@ -315,216 +316,461 @@ private static int flags(Timestamp executeAt, boolean
nullable)
}
// TODO (expected): optimise using subset serializers, or perhaps simply
with some deduping key serializer
- public static class StoreParticipantsSerializer implements
IVersionedSerializer<StoreParticipants>
+ public static class StoreParticipantsSerializer implements
UnversionedSerializer<StoreParticipants>
Review Comment:
Is the TODO above now considered done?
##########
test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java:
##########
@@ -18,6 +18,7 @@
package org.apache.cassandra.service.accord;
+import java.io.IOException;
Review Comment:
This is not the only place where we got import problems, so I'll just leave
it to you to run checkstyle before commit, since there are a few places.
##########
src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java:
##########
@@ -387,18 +365,92 @@ public RS deserialize(DataInputPlus in) throws IOException
return result;
}
+ public RS deserializeSubset(Unseekables<?> superset, DataInputPlus in)
throws IOException
+ {
+ byte b = in.readByte();
+ UnseekablesKind kind;
+ RS result;
+ switch (b)
+ {
+ default: throw new IOException("Corrupted input: expected byte
1, 2, 3, 4 or 5; received " + b);
Review Comment:
"or 6"?
##########
src/java/org/apache/cassandra/service/accord/serializers/IVersionedWithKeysSerializer.java:
##########
@@ -323,44 +277,132 @@ private void serializeLargeSubset(AbstractRanges
serialize, int serializeCount,
}
}
+ public Routables<?> deserializeSubsetInternal(Routables<?> superset,
DataInputPlus in) throws IOException
+ {
+ switch (superset.domain())
+ {
+ default: throw UnhandledEnum.unknown(superset.domain());
+ case Key: return
deserializeRoutingKeySubset((AbstractUnseekableKeys) superset, in, (ks, s) ->
ks == null ? s : RoutingKeys.of(ks));
+ case Range: return deserializeRangeSubset((AbstractRanges)
superset, in, (rs, s) -> rs == null ? s : Ranges.of(rs));
+ }
+ }
+
+ public void skipSubsetInternal(int supersetCount, DataInputPlus in)
throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ if (supersetCount <= 64)
+ return;
+
+ int deserializeCount = supersetCount - (int)encoded;
+ int count = 0;
+ while (count < deserializeCount)
+ {
+ count += in.readUnsignedVInt32();
+ in.readUnsignedVInt32();
+ }
+ }
+
+ public <T, S extends AbstractUnseekableKeys> T
deserializeRoutingKeySubset(S superset, DataInputPlus in,
BiFunction<RoutingKey[], S, T> result) throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ int supersetCount = superset.size();
+ if (encoded == 0L)
+ return result.apply(null, superset);
+ else if (supersetCount >= 64)
+ return result.apply(deserializeLargeRoutingKeySubset(in,
superset, supersetCount, (int) encoded), superset);
+ else
+ return
result.apply(deserializeSmallRoutingKeySubsetArray(encoded, superset,
supersetCount), superset);
+ }
+
+ public <T, S extends AbstractRanges> T deserializeRangeSubset(S
superset, DataInputPlus in, BiFunction<Range[], S, T> result) throws IOException
+ {
+ long encoded = in.readUnsignedVInt();
+ int supersetCount = superset.size();
+ if (encoded == 0L)
+ return result.apply(null, superset);
+ else if (supersetCount >= 64)
+ return result.apply(deserializeLargeRangeSubset(in, superset,
supersetCount, (int) encoded), superset);
+ else
+ return result.apply(deserializeSmallRangeSubsetArray(encoded,
superset, supersetCount), superset);
+ }
+
@DontInline
- private Unseekables<?> deserializeLargeSubset(DataInputPlus in,
Unseekables<?> superset, int supersetCount, int delta) throws IOException
+ private Routables<?> deserializeLargeSubset(DataInputPlus in,
Routables<?> superset, int supersetCount, int delta) throws IOException
Review Comment:
This one is also unused; maybe we should at least cover it with tests to
make sure it works if/when we decide to use it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]