dcapwell commented on code in PR #3954:
URL: https://github.com/apache/cassandra/pull/3954#discussion_r1992437121


##########
src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java:
##########
@@ -55,178 +56,181 @@ static AccordTopologyUpdate 
newTopology(Journal.TopologyUpdate update)
     {
         return new NewTopology(update);
     }
-    class RangesForEpochSerializer implements 
IVersionedSerializer<CommandStores.RangesForEpoch>
+    class RangesForEpochSerializer implements 
org.apache.cassandra.io.Serializer<CommandStores.RangesForEpoch>
     {
         public static final RangesForEpochSerializer instance = new 
RangesForEpochSerializer();
 
         @Override
-        public void serialize(CommandStores.RangesForEpoch from, 
DataOutputPlus out, int version) throws IOException
+        public void serialize(CommandStores.RangesForEpoch from, 
DataOutputPlus out) throws IOException
         {
             out.writeUnsignedVInt32(from.size());
-            from.forEach((epoch, ranges) -> {
-                try
-                {
-                    out.writeLong(epoch);
-                    KeySerializers.ranges.serialize(ranges, out, version);
-                }
-                catch (Throwable t)
-                {
-                    throw new IllegalStateException("Serialization error", t);
-                }
-            });
+            for (int i = 0; i < from.size(); i++)
+            {
+                out.writeLong(from.epochAtIndex(i));
+                KeySerializers.ranges.serialize(from.rangesAtIndex(i), out);
+            }
         }
 
         @Override
-        public CommandStores.RangesForEpoch deserialize(DataInputPlus in, int 
version) throws IOException
+        public CommandStores.RangesForEpoch deserialize(DataInputPlus in) 
throws IOException
         {
             int size = in.readUnsignedVInt32();
             Ranges[] ranges = new Ranges[size];
             long[] epochs = new long[size];
             for (int i = 0; i < ranges.length; i++)
             {
                 epochs[i] = in.readLong();
-                ranges[i] = KeySerializers.ranges.deserialize(in, version);
+                ranges[i] = KeySerializers.ranges.deserialize(in);
             }
             Invariants.require(ranges.length == epochs.length);
             return new CommandStores.RangesForEpoch(epochs, ranges);
         }
 
         @Override
-        public long serializedSize(CommandStores.RangesForEpoch t, int version)
+        public long serializedSize(CommandStores.RangesForEpoch from)
         {
-            return TypeSizes.sizeofUnsignedVInt(t.size());
+            long size = TypeSizes.sizeofUnsignedVInt(from.size());
+            for (int i = 0; i < from.size(); i++)
+            {
+                size += TypeSizes.sizeof(from.epochAtIndex(i));
+                size += 
KeySerializers.ranges.serializedSize(from.rangesAtIndex(i));
+            }
+            return size;
         }
     }
 
-    class TopologyUpdateSerializer implements 
IVersionedSerializer<Journal.TopologyUpdate>
+    class TopologyUpdateSerializer implements 
org.apache.cassandra.io.Serializer<Journal.TopologyUpdate>
     {
         public static final TopologyUpdateSerializer instance = new 
TopologyUpdateSerializer();
 
         @Override
-        public void serialize(Journal.TopologyUpdate from, DataOutputPlus out, 
int version) throws IOException
+        public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) 
throws IOException
         {
             out.writeUnsignedVInt32(from.commandStores.size());
             for (Map.Entry<Integer, CommandStores.RangesForEpoch> e : 
from.commandStores.entrySet())
             {
                 out.writeUnsignedVInt32(e.getKey());
-                RangesForEpochSerializer.instance.serialize(e.getValue(), out, 
version);
+                RangesForEpochSerializer.instance.serialize(e.getValue(), out);
             }
-            TopologySerializers.topology.serialize(from.local, out, version);
-            TopologySerializers.topology.serialize(from.global, out, version);
+            TopologySerializers.topology.serialize(from.local, out);
+            TopologySerializers.topology.serialize(from.global, out);
         }
 
         @Override
-        public Journal.TopologyUpdate deserialize(DataInputPlus in, int 
version) throws IOException
+        public Journal.TopologyUpdate deserialize(DataInputPlus in) throws 
IOException
         {
             int commandStoresSize = in.readUnsignedVInt32();
             Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores = 
new Int2ObjectHashMap<>();
             for (int j = 0; j < commandStoresSize; j++)
             {
                 int commandStoreId = in.readUnsignedVInt32();
-                CommandStores.RangesForEpoch rangesForEpoch = 
RangesForEpochSerializer.instance.deserialize(in, version);
+                CommandStores.RangesForEpoch rangesForEpoch = 
RangesForEpochSerializer.instance.deserialize(in);
                 commandStores.put(commandStoreId, rangesForEpoch);
             }
-            Topology local = TopologySerializers.topology.deserialize(in, 
version);
-            Topology global = TopologySerializers.topology.deserialize(in, 
version);
+            Topology local = TopologySerializers.topology.deserialize(in);
+            Topology global = TopologySerializers.topology.deserialize(in);
             return new Journal.TopologyUpdate(commandStores, local, global);
         }
 
         @Override
-        public long serializedSize(Journal.TopologyUpdate from, int version)
+        public long serializedSize(Journal.TopologyUpdate from)
         {
             long size = 
TypeSizes.sizeofUnsignedVInt(from.commandStores.size());
             for (Map.Entry<Integer, CommandStores.RangesForEpoch> e : 
from.commandStores.entrySet())
             {
                 size += TypeSizes.sizeofUnsignedVInt(e.getKey());
-                size += 
RangesForEpochSerializer.instance.serializedSize(e.getValue(), version);
+                size += 
RangesForEpochSerializer.instance.serializedSize(e.getValue());
             }
 
-            size += TopologySerializers.topology.serializedSize(from.local, 
version);
-            size += TopologySerializers.topology.serializedSize(from.global, 
version);
+            size += TopologySerializers.topology.serializedSize(from.local);
+            size += TopologySerializers.topology.serializedSize(from.global);
             return size;
         }
     }
 
-    class Serializer implements IVersionedSerializer<AccordTopologyUpdate>
+    class Serializer implements 
org.apache.cassandra.io.Serializer<AccordTopologyUpdate>
     {
         public static Serializer instance = new Serializer();
 
-        public void serialize(AccordTopologyUpdate t, DataOutputPlus out, int 
version) throws IOException
+        @Override
+        public void serialize(AccordTopologyUpdate t, DataOutputPlus out) 
throws IOException
         {
             out.writeUnsignedVInt(t.epoch());
             out.writeUnsignedVInt32(t.kind().ordinal());
             switch (t.kind())
             {
                 case NewTopology:
-                    TopologyUpdateSerializer.instance.serialize(((NewTopology) 
t).update, out, version);
+                    TopologyUpdateSerializer.instance.serialize(((NewTopology) 
t).update, out);
                     break;
                 case Topologies:
                     TopologyImage image = (TopologyImage) t;
 
                     out.writeBoolean(image.update != null);
                     if (image.update != null)
-                        
TopologyUpdateSerializer.instance.serialize(image.update, out, version);
+                        
TopologyUpdateSerializer.instance.serialize(image.update, out);
                     if (image.syncStatus == null)
                         out.writeByte(Byte.MAX_VALUE);
                     else
                         out.writeByte(image.syncStatus.ordinal());
 
-                    KeySerializers.ranges.serialize(image.closed, out, 
version);
-                    KeySerializers.ranges.serialize(image.retired, out, 
version);
+                    KeySerializers.ranges.serialize(image.closed, out);
+                    KeySerializers.ranges.serialize(image.retired, out);
                     break;
                 default:
                     throw new UnhandledEnum(t.kind());
             }
         }
 
-        public AccordTopologyUpdate deserialize(DataInputPlus in, int version) 
throws IOException
+        @Override
+        public AccordTopologyUpdate deserialize(DataInputPlus in) throws 
IOException
         {
-            int epoch = in.readUnsignedVInt32();
+            long epoch = in.readUnsignedVInt();

Review Comment:
   bug, we read an epoch > int max which then failed in the tests.  We write 
long, sizeOf is long, yet read int?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to