AnticompactionRequestSerializer serializedSize is incorrect patch by jasobrown; reviewed by pcmanus for CASSANDRA-12934
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3fd4c688 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3fd4c688 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3fd4c688 Branch: refs/heads/trunk Commit: 3fd4c68803ddf0d20d23b37d4b936258f8420209 Parents: 59b40b3 Author: Jason Brown <jasedbr...@gmail.com> Authored: Fri Nov 18 18:13:45 2016 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Mon Nov 21 06:37:56 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../repair/messages/AnticompactionRequest.java | 19 ++ .../repair/messages/CleanupMessage.java | 17 ++ .../repair/messages/PrepareMessage.java | 22 +++ .../repair/messages/SnapshotMessage.java | 16 ++ .../cassandra/repair/messages/SyncComplete.java | 19 ++ .../cassandra/repair/messages/SyncRequest.java | 21 +++ .../repair/messages/ValidationComplete.java | 18 ++ .../RepairMessageSerializationsTest.java | 187 +++++++++++++++++++ 9 files changed, 320 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bcd0b5c..e613d7c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934) * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535) Merged from 2.2: * Avoid blocking gossip during pending range calculation (CASSANDRA-12281) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java index 3e47374..a29cc87 100644 --- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.UUID; import org.apache.cassandra.dht.Range; @@ -46,6 +47,23 @@ public class AnticompactionRequest extends RepairMessage this.successfulRanges = ranges; } + @Override + public boolean equals(Object o) + { + if (!(o instanceof AnticompactionRequest)) + return false; + AnticompactionRequest other = (AnticompactionRequest)o; + return messageType == other.messageType && + parentRepairSession.equals(other.parentRepairSession) && + successfulRanges.equals(other.successfulRanges); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, parentRepairSession, successfulRanges); + } + public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest> { public void serialize(AnticompactionRequest message, DataOutputPlus out, int version) throws IOException @@ -72,6 +90,7 @@ public class AnticompactionRequest extends RepairMessage public long serializedSize(AnticompactionRequest message, int version) { long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); + size += Integer.BYTES; // count of items in successfulRanges for (Range<Token> r : message.successfulRanges) size += Range.tokenSerializer.serializedSize(r, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java index 43a8f02..69d147a 100644 --- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java @@ -18,6 +18,7 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; +import java.util.Objects; import java.util.UUID; import org.apache.cassandra.io.util.DataInputPlus; @@ -40,6 +41,22 @@ public class CleanupMessage extends RepairMessage this.parentRepairSession = parentRepairSession; } + @Override + public boolean equals(Object o) + { + if (!(o instanceof CleanupMessage)) + return false; + CleanupMessage other = (CleanupMessage) o; + return messageType == other.messageType && + parentRepairSession.equals(other.parentRepairSession); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, parentRepairSession); + } + public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage> { public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 8909f1b..b3efeae 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; @@ -54,6 +55,27 @@ public class PrepareMessage extends RepairMessage this.isGlobal = isGlobal; } + @Override + public boolean equals(Object o) + { + if (!(o instanceof PrepareMessage)) + return false; + PrepareMessage other = (PrepareMessage) o; + return messageType == other.messageType && + parentRepairSession.equals(other.parentRepairSession) && + isIncremental == other.isIncremental && + isGlobal == other.isGlobal && + timestamp == other.timestamp && + cfIds.equals(other.cfIds) && + ranges.equals(other.ranges); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, parentRepairSession, isGlobal, isIncremental, timestamp, cfIds, ranges); + } + public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage> { public void serialize(PrepareMessage message, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java index 1b15126..d4737d3 100644 --- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java @@ -18,6 +18,7 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; +import java.util.Objects; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -32,6 +33,21 @@ public class SnapshotMessage extends RepairMessage super(Type.SNAPSHOT, desc); } + @Override + public boolean equals(Object o) + { + if (!(o instanceof SnapshotMessage)) + return false; + SnapshotMessage other = (SnapshotMessage) o; + return messageType == other.messageType; + } + + @Override + public int hashCode() + { + return Objects.hash(messageType); + } + public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage> { public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SyncComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java index 35cf5d4..178e710 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java @@ -19,6 +19,7 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; import java.net.InetAddress; +import java.util.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; @@ -53,6 +54,24 @@ public class SyncComplete extends RepairMessage this.success = success; } + @Override + public boolean equals(Object o) + { + if (!(o instanceof SyncComplete)) + return false; + SyncComplete other = (SyncComplete)o; + return messageType == other.messageType && + desc.equals(other.desc) && + success == other.success && + nodes.equals(other.nodes); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, desc, success, nodes); + } + private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete> { public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/SyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index 2c9799e..f79f482 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.AbstractBounds; @@ -57,6 +58,26 @@ public class SyncRequest extends RepairMessage this.ranges = ranges; } + @Override + public boolean equals(Object o) + { + if (!(o instanceof SyncRequest)) + return false; + SyncRequest req = (SyncRequest)o; + return messageType == req.messageType && + desc.equals(req.desc) && + initiator.equals(req.initiator) && + src.equals(req.src) && + dst.equals(req.dst) && + ranges.equals(ranges); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, desc, initiator, src, dst, ranges); + } + public static class SyncRequestSerializer implements MessageSerializer<SyncRequest> { public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java index 90be8e5..704bffb 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java @@ -18,6 +18,7 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; +import java.util.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; @@ -55,6 +56,23 @@ public class ValidationComplete extends RepairMessage return trees != null; } + @Override + public boolean equals(Object o) + { + if (!(o instanceof ValidationComplete)) + return false; + + ValidationComplete other = (ValidationComplete)o; + return messageType == other.messageType && + desc.equals(other.desc); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, desc); + } + private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete> { public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/3fd4c688/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java new file mode 100644 index 0000000..5dbed3f --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBufferFixed; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.NodePair; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.MerkleTrees; + +public class RepairMessageSerializationsTest +{ + private static final int PROTOCOL_VERSION = MessagingService.current_version; + private static final int GC_BEFORE = 1000000; + + private static IPartitioner originalPartitioner; + + @BeforeClass + public static void before() + { + originalPartitioner = StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); + } + + @AfterClass + public static void after() + { + DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner); + } + + @Test + public void validationRequestMessage() throws IOException + { + RepairJobDesc jobDesc = buildRepairJobDesc(); + ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE); + ValidationRequest deserialized = serializeRoundTrip(msg, ValidationRequest.serializer); + Assert.assertEquals(jobDesc, deserialized.desc); + } + + private RepairJobDesc buildRepairJobDesc() + { + List<Range<Token>> tokenRanges = buildTokenRanges(); + return new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "serializationsTestKeyspace", "repairMessages", tokenRanges); + } + + private List<Range<Token>> buildTokenRanges() + { + List<Range<Token>> tokenRanges = new ArrayList<>(4); + tokenRanges.add(new Range<>(new LongToken(1000), new LongToken(1001))); + tokenRanges.add(new Range<>(new LongToken(2000), new LongToken(2001))); + tokenRanges.add(new Range<>(new LongToken(3000), new LongToken(3001))); + tokenRanges.add(new Range<>(new LongToken(4000), new LongToken(4001))); + return tokenRanges; + } + + private <T extends RepairMessage> T serializeRoundTrip(T msg, IVersionedSerializer<T> serializer) throws IOException + { + long size = serializer.serializedSize(msg, PROTOCOL_VERSION); + + ByteBuffer buf = ByteBuffer.allocate((int)size); + DataOutputPlus out = new DataOutputBufferFixed(buf); + serializer.serialize(msg, out, PROTOCOL_VERSION); + Assert.assertEquals(size, buf.position()); + + buf.flip(); + DataInputPlus in = new DataInputBuffer(buf, false); + T deserialized = serializer.deserialize(in, PROTOCOL_VERSION); + Assert.assertEquals(msg, deserialized); + Assert.assertEquals(msg.hashCode(), deserialized.hashCode()); + return deserialized; + } + + @Test + public void validationCompleteMessage_NoMerkleTree() throws IOException + { + ValidationComplete deserialized = validationCompleteMessage(null); + Assert.assertNull(deserialized.trees); + } + + @Test + public void validationCompleteMessage_WithMerkleTree() throws IOException + { + MerkleTrees trees = new MerkleTrees(Murmur3Partitioner.instance); + trees.addMerkleTree(256, new Range<>(new LongToken(1000), new LongToken(1001))); + ValidationComplete deserialized = validationCompleteMessage(trees); + + // a simple check to make sure we got some merkle trees back. + Assert.assertEquals(trees.size(), deserialized.trees.size()); + } + + private ValidationComplete validationCompleteMessage(MerkleTrees trees) throws IOException + { + RepairJobDesc jobDesc = buildRepairJobDesc(); + ValidationComplete msg = trees == null ? + new ValidationComplete(jobDesc) : + new ValidationComplete(jobDesc, trees); + ValidationComplete deserialized = serializeRoundTrip(msg, ValidationComplete.serializer); + return deserialized; + } + + @Test + public void syncRequestMessage() throws IOException + { + InetAddress initiator = InetAddress.getByName("127.0.0.1"); + InetAddress src = InetAddress.getByName("127.0.0.2"); + InetAddress dst = InetAddress.getByName("127.0.0.3"); + + SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges()); + serializeRoundTrip(msg, SyncRequest.serializer); + } + + @Test + public void syncCompleteMessage() throws IOException + { + InetAddress src = InetAddress.getByName("127.0.0.2"); + InetAddress dst = InetAddress.getByName("127.0.0.3"); + SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true); + serializeRoundTrip(msg, SyncComplete.serializer); + } + + @Test + public void antiCompactionRequestMessage() throws IOException + { + AnticompactionRequest msg = new AnticompactionRequest(UUID.randomUUID(), buildTokenRanges()); + serializeRoundTrip(msg, AnticompactionRequest.serializer); + } + + @Test + public void prepareMessage() throws IOException + { + PrepareMessage msg = new PrepareMessage(UUID.randomUUID(), new ArrayList<UUID>() {{add(UUID.randomUUID());}}, + buildTokenRanges(), true, 100000L, false); + serializeRoundTrip(msg, PrepareMessage.serializer); + } + + @Test + public void snapshotMessage() throws IOException + { + SnapshotMessage msg = new SnapshotMessage(buildRepairJobDesc()); + serializeRoundTrip(msg, SnapshotMessage.serializer); + } + + @Test + public void cleanupMessage() throws IOException + { + CleanupMessage msg = new CleanupMessage(UUID.randomUUID()); + serializeRoundTrip(msg, CleanupMessage.serializer); + } +}