http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/db/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java index aea609e..a340a94 100644 --- a/test/unit/org/apache/cassandra/db/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java @@ -30,6 +30,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessageSerializer; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -68,22 +69,21 @@ public class SerializationsTest extends AbstractSerializationsTester IPartitioner part = StorageService.getPartitioner(); AbstractBounds<RowPosition> bounds = new Range<Token>(part.getRandomToken(), part.getRandomToken()).toRowBounds(); - Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage(MessagingService.current_version); - Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage(MessagingService.current_version); - Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, nonEmptyRangePred, bounds, 100).getMessage(MessagingService.current_version); - Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage(MessagingService.current_version); - Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage(MessagingService.current_version); - Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, nonEmptyRangePred, bounds, 100).getMessage(MessagingService.current_version); - - DataOutputStream dout = getOutput("db.RangeSliceCommand.bin"); - - messageSerializer.serialize(namesCmd, dout, getVersion()); - messageSerializer.serialize(emptyRangeCmd, dout, getVersion()); - messageSerializer.serialize(regRangeCmd, dout, getVersion()); - messageSerializer.serialize(namesCmdSup, dout, getVersion()); - messageSerializer.serialize(emptyRangeCmdSup, dout, getVersion()); - messageSerializer.serialize(regRangeCmdSup, dout, getVersion()); - dout.close(); + MessageOut<RangeSliceCommand> namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).createMessage(); + MessageOut<RangeSliceCommand> emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).createMessage(); + MessageOut<RangeSliceCommand> regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, nonEmptyRangePred, bounds, 100).createMessage(); + MessageOut<RangeSliceCommand> namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).createMessage(); + MessageOut<RangeSliceCommand> emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).createMessage(); + MessageOut<RangeSliceCommand> regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, nonEmptyRangePred, bounds, 100).createMessage(); + + DataOutputStream out = getOutput("db.RangeSliceCommand.bin"); + namesCmd.serialize(out, getVersion()); + emptyRangeCmd.serialize(out, getVersion()); + regRangeCmd.serialize(out, getVersion()); + namesCmdSup.serialize(out, getVersion()); + emptyRangeCmdSup.serialize(out, getVersion()); + regRangeCmdSup.serialize(out, getVersion()); + out.close(); } @Test @@ -111,8 +111,8 @@ public class SerializationsTest extends AbstractSerializationsTester SliceByNamesReadCommand.serializer().serialize(superCmd, out, getVersion()); ReadCommand.serializer().serialize(standardCmd, out, getVersion()); ReadCommand.serializer().serialize(superCmd, out, getVersion()); - messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(superCmd.getMessage(getVersion()), out, getVersion()); + standardCmd.createMessage().serialize(out, getVersion()); + superCmd.createMessage().serialize(out, getVersion()); out.close(); } @@ -141,8 +141,8 @@ public class SerializationsTest extends AbstractSerializationsTester SliceFromReadCommand.serializer().serialize(superCmd, out, getVersion()); ReadCommand.serializer().serialize(standardCmd, out, getVersion()); ReadCommand.serializer().serialize(superCmd, out, getVersion()); - messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(superCmd.getMessage(getVersion()), out, getVersion()); + standardCmd.createMessage().serialize(out, getVersion()); + superCmd.createMessage().serialize(out, getVersion()); out.close(); } @@ -205,12 +205,14 @@ public class SerializationsTest extends AbstractSerializationsTester RowMutation.serializer().serialize(standardRm, out, getVersion()); RowMutation.serializer().serialize(superRm, out, getVersion()); RowMutation.serializer().serialize(mixedRm, out, getVersion()); - messageSerializer.serialize(emptyRm.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(standardRowRm.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(superRowRm.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(standardRm.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(superRm.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(mixedRm.getMessage(getVersion()), out, getVersion()); + + emptyRm.createMessage().serialize(out, getVersion()); + standardRowRm.createMessage().serialize(out, getVersion()); + superRowRm.createMessage().serialize(out, getVersion()); + standardRm.createMessage().serialize(out, getVersion()); + superRm.createMessage().serialize(out, getVersion()); + mixedRm.createMessage().serialize(out, getVersion()); + out.close(); } @@ -245,9 +247,10 @@ public class SerializationsTest extends AbstractSerializationsTester Truncation.serializer().serialize(tr, out, getVersion()); TruncateResponse.serializer().serialize(aff, out, getVersion()); TruncateResponse.serializer().serialize(neg, out, getVersion()); - messageSerializer.serialize(tr.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()), aff), out, getVersion()); - messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()), neg), out, getVersion()); + + tr.createMessage().serialize(out, getVersion()); + aff.createMessage().serialize(out, getVersion()); + neg.createMessage().serialize(out, getVersion()); // todo: notice how CF names weren't validated. out.close(); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/net/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessageSerializer.java b/test/unit/org/apache/cassandra/net/MessageSerializer.java index e8b7874..be9eda5 100644 --- a/test/unit/org/apache/cassandra/net/MessageSerializer.java +++ b/test/unit/org/apache/cassandra/net/MessageSerializer.java @@ -27,6 +27,7 @@ import org.apache.cassandra.io.IVersionedSerializer; public class MessageSerializer implements IVersionedSerializer<Message> { + // TODO imitate backwards-compatibility code from OutboundTcpConnection here public void serialize(Message t, DataOutput dos, int version) throws IOException { Header.serializer().serialize(t.header, dos, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index e8ccfba..d3cd62d 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.sink.IMessageSink; import org.apache.cassandra.net.sink.SinkManager; @@ -152,7 +153,8 @@ public class RemoveTest for (InetAddress host : hosts) { - Message msg = new Message(host, StorageService.Verb.REPLICATION_FINISHED, new byte[0], MessagingService.current_version); + // TODO how to spoof host here? + MessageOut msg = new MessageOut(StorageService.Verb.REPLICATION_FINISHED); MessagingService.instance().sendRR(msg, FBUtilities.getBroadcastAddress()); } @@ -162,6 +164,9 @@ public class RemoveTest assertTrue(tmd.getLeavingEndpoints().isEmpty()); } + /** + * sink that captures STREAM_REQUEST messages and calls finishStreamRequest on it + */ class ReplicationSink implements IMessageSink { public Message handleMessage(Message msg, String id, InetAddress to) @@ -173,5 +178,10 @@ public class RemoveTest return null; } + + public MessageOut handleMessage(MessageOut msg, String id, InetAddress to) + { + return msg; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index d736a95..6ab8685 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -52,8 +52,8 @@ public class SerializationsTest extends AbstractSerializationsTester private void testTreeRequestWrite() throws IOException { DataOutputStream out = getOutput("service.TreeRequest.bin"); - AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out, getVersion()); - messageSerializer.serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req, getVersion()), out, getVersion()); + AntiEntropyService.TreeRequest.serializer.serialize(Statics.req, out, getVersion()); + Statics.req.createMessage().serialize(out, getVersion()); out.close(); } @@ -64,7 +64,7 @@ public class SerializationsTest extends AbstractSerializationsTester testTreeRequestWrite(); DataInputStream in = getInput("service.TreeRequest.bin"); - assert AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.deserialize(in, getVersion()) != null; + assert AntiEntropyService.TreeRequest.serializer.deserialize(in, getVersion()) != null; assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); } @@ -82,10 +82,10 @@ public class SerializationsTest extends AbstractSerializationsTester AntiEntropyService.Validator v1 = new AntiEntropyService.Validator(Statics.req, mt); DataOutputStream out = getOutput("service.TreeResponse.bin"); - AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v0, out, getVersion()); - AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v1, out, getVersion()); - messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(), v0), out, getVersion()); - messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(), v1), out, getVersion()); + AntiEntropyService.Validator.serializer.serialize(v0, out, getVersion()); + AntiEntropyService.Validator.serializer.serialize(v1, out, getVersion()); + v0.createMessage().serialize(out, getVersion()); + v1.createMessage().serialize(out, getVersion()); out.close(); } @@ -96,8 +96,8 @@ public class SerializationsTest extends AbstractSerializationsTester testTreeResponseWrite(); DataInputStream in = getInput("service.TreeResponse.bin"); - assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, getVersion()) != null; - assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, getVersion()) != null; + assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null; + assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null; assert messageSerializer.deserialize(in, getVersion()) != null; assert messageSerializer.deserialize(in, getVersion()) != null; in.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/streaming/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java index d549457..f2ed9eb 100644 --- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java @@ -21,6 +21,17 @@ package org.apache.cassandra.streaming; */ +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.junit.Test; + import org.apache.cassandra.AbstractSerializationsTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowMutation; @@ -36,13 +47,6 @@ import org.apache.cassandra.net.MessageSerializer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.junit.Test; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.*; public class SerializationsTest extends AbstractSerializationsTester { @@ -117,7 +121,7 @@ public class SerializationsTest extends AbstractSerializationsTester StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED); DataOutputStream out = getOutput("streaming.StreamReply.bin"); StreamReply.serializer.serialize(rep, out, getVersion()); - messageSerializer.serialize(rep.getMessage(getVersion()), out, getVersion()); + rep.createMessage().serialize(out, getVersion()); out.close(); } @@ -156,9 +160,9 @@ public class SerializationsTest extends AbstractSerializationsTester StreamRequestMessage.serializer().serialize(msg0, out, getVersion()); StreamRequestMessage.serializer().serialize(msg1, out, getVersion()); StreamRequestMessage.serializer().serialize(msg2, out, getVersion()); - messageSerializer.serialize(msg0.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(msg1.getMessage(getVersion()), out, getVersion()); - messageSerializer.serialize(msg2.getMessage(getVersion()), out, getVersion()); + msg0.createMessage().serialize(out, getVersion()); + msg1.createMessage().serialize(out, getVersion()); + msg2.createMessage().serialize(out, getVersion()); out.close(); }