dcapwell commented on code in PR #3954:
URL: https://github.com/apache/cassandra/pull/3954#discussion_r1992437121
##########
src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java:
##########
@@ -55,178 +56,181 @@ static AccordTopologyUpdate
newTopology(Journal.TopologyUpdate update)
{
return new NewTopology(update);
}
- class RangesForEpochSerializer implements
IVersionedSerializer<CommandStores.RangesForEpoch>
+ class RangesForEpochSerializer implements
org.apache.cassandra.io.Serializer<CommandStores.RangesForEpoch>
{
public static final RangesForEpochSerializer instance = new
RangesForEpochSerializer();
@Override
- public void serialize(CommandStores.RangesForEpoch from,
DataOutputPlus out, int version) throws IOException
+ public void serialize(CommandStores.RangesForEpoch from,
DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(from.size());
- from.forEach((epoch, ranges) -> {
- try
- {
- out.writeLong(epoch);
- KeySerializers.ranges.serialize(ranges, out, version);
- }
- catch (Throwable t)
- {
- throw new IllegalStateException("Serialization error", t);
- }
- });
+ for (int i = 0; i < from.size(); i++)
+ {
+ out.writeLong(from.epochAtIndex(i));
+ KeySerializers.ranges.serialize(from.rangesAtIndex(i), out);
+ }
}
@Override
- public CommandStores.RangesForEpoch deserialize(DataInputPlus in, int
version) throws IOException
+ public CommandStores.RangesForEpoch deserialize(DataInputPlus in)
throws IOException
{
int size = in.readUnsignedVInt32();
Ranges[] ranges = new Ranges[size];
long[] epochs = new long[size];
for (int i = 0; i < ranges.length; i++)
{
epochs[i] = in.readLong();
- ranges[i] = KeySerializers.ranges.deserialize(in, version);
+ ranges[i] = KeySerializers.ranges.deserialize(in);
}
Invariants.require(ranges.length == epochs.length);
return new CommandStores.RangesForEpoch(epochs, ranges);
}
@Override
- public long serializedSize(CommandStores.RangesForEpoch t, int version)
+ public long serializedSize(CommandStores.RangesForEpoch from)
{
- return TypeSizes.sizeofUnsignedVInt(t.size());
+ long size = TypeSizes.sizeofUnsignedVInt(from.size());
+ for (int i = 0; i < from.size(); i++)
+ {
+ size += TypeSizes.sizeof(from.epochAtIndex(i));
+ size +=
KeySerializers.ranges.serializedSize(from.rangesAtIndex(i));
+ }
+ return size;
}
}
- class TopologyUpdateSerializer implements
IVersionedSerializer<Journal.TopologyUpdate>
+ class TopologyUpdateSerializer implements
org.apache.cassandra.io.Serializer<Journal.TopologyUpdate>
{
public static final TopologyUpdateSerializer instance = new
TopologyUpdateSerializer();
@Override
- public void serialize(Journal.TopologyUpdate from, DataOutputPlus out,
int version) throws IOException
+ public void serialize(Journal.TopologyUpdate from, DataOutputPlus out)
throws IOException
{
out.writeUnsignedVInt32(from.commandStores.size());
for (Map.Entry<Integer, CommandStores.RangesForEpoch> e :
from.commandStores.entrySet())
{
out.writeUnsignedVInt32(e.getKey());
- RangesForEpochSerializer.instance.serialize(e.getValue(), out,
version);
+ RangesForEpochSerializer.instance.serialize(e.getValue(), out);
}
- TopologySerializers.topology.serialize(from.local, out, version);
- TopologySerializers.topology.serialize(from.global, out, version);
+ TopologySerializers.topology.serialize(from.local, out);
+ TopologySerializers.topology.serialize(from.global, out);
}
@Override
- public Journal.TopologyUpdate deserialize(DataInputPlus in, int
version) throws IOException
+ public Journal.TopologyUpdate deserialize(DataInputPlus in) throws
IOException
{
int commandStoresSize = in.readUnsignedVInt32();
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores =
new Int2ObjectHashMap<>();
for (int j = 0; j < commandStoresSize; j++)
{
int commandStoreId = in.readUnsignedVInt32();
- CommandStores.RangesForEpoch rangesForEpoch =
RangesForEpochSerializer.instance.deserialize(in, version);
+ CommandStores.RangesForEpoch rangesForEpoch =
RangesForEpochSerializer.instance.deserialize(in);
commandStores.put(commandStoreId, rangesForEpoch);
}
- Topology local = TopologySerializers.topology.deserialize(in,
version);
- Topology global = TopologySerializers.topology.deserialize(in,
version);
+ Topology local = TopologySerializers.topology.deserialize(in);
+ Topology global = TopologySerializers.topology.deserialize(in);
return new Journal.TopologyUpdate(commandStores, local, global);
}
@Override
- public long serializedSize(Journal.TopologyUpdate from, int version)
+ public long serializedSize(Journal.TopologyUpdate from)
{
long size =
TypeSizes.sizeofUnsignedVInt(from.commandStores.size());
for (Map.Entry<Integer, CommandStores.RangesForEpoch> e :
from.commandStores.entrySet())
{
size += TypeSizes.sizeofUnsignedVInt(e.getKey());
- size +=
RangesForEpochSerializer.instance.serializedSize(e.getValue(), version);
+ size +=
RangesForEpochSerializer.instance.serializedSize(e.getValue());
}
- size += TopologySerializers.topology.serializedSize(from.local,
version);
- size += TopologySerializers.topology.serializedSize(from.global,
version);
+ size += TopologySerializers.topology.serializedSize(from.local);
+ size += TopologySerializers.topology.serializedSize(from.global);
return size;
}
}
- class Serializer implements IVersionedSerializer<AccordTopologyUpdate>
+ class Serializer implements
org.apache.cassandra.io.Serializer<AccordTopologyUpdate>
{
public static Serializer instance = new Serializer();
- public void serialize(AccordTopologyUpdate t, DataOutputPlus out, int
version) throws IOException
+ @Override
+ public void serialize(AccordTopologyUpdate t, DataOutputPlus out)
throws IOException
{
out.writeUnsignedVInt(t.epoch());
out.writeUnsignedVInt32(t.kind().ordinal());
switch (t.kind())
{
case NewTopology:
- TopologyUpdateSerializer.instance.serialize(((NewTopology)
t).update, out, version);
+ TopologyUpdateSerializer.instance.serialize(((NewTopology)
t).update, out);
break;
case Topologies:
TopologyImage image = (TopologyImage) t;
out.writeBoolean(image.update != null);
if (image.update != null)
-
TopologyUpdateSerializer.instance.serialize(image.update, out, version);
+
TopologyUpdateSerializer.instance.serialize(image.update, out);
if (image.syncStatus == null)
out.writeByte(Byte.MAX_VALUE);
else
out.writeByte(image.syncStatus.ordinal());
- KeySerializers.ranges.serialize(image.closed, out,
version);
- KeySerializers.ranges.serialize(image.retired, out,
version);
+ KeySerializers.ranges.serialize(image.closed, out);
+ KeySerializers.ranges.serialize(image.retired, out);
break;
default:
throw new UnhandledEnum(t.kind());
}
}
- public AccordTopologyUpdate deserialize(DataInputPlus in, int version)
throws IOException
+ @Override
+ public AccordTopologyUpdate deserialize(DataInputPlus in) throws
IOException
{
- int epoch = in.readUnsignedVInt32();
+ long epoch = in.readUnsignedVInt();
Review Comment:
bug, we read an epoch > int max which then failed in the tests. We write
long, sizeOf is long, yet read int?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]