http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/StreamingHistogram.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java index eb884be..b925395 100644 --- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java +++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.utils; -import java.io.DataInput; import java.io.IOException; import java.util.*; @@ -25,6 +24,7 @@ import com.google.common.base.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -182,7 +182,7 @@ public class StreamingHistogram } } - public StreamingHistogram deserialize(DataInput in) throws IOException + public StreamingHistogram deserialize(DataInputPlus in) throws IOException { int maxBinSize = in.readInt(); int size = in.readInt(); @@ -195,11 +195,11 @@ public class StreamingHistogram return new StreamingHistogram(maxBinSize, tmp); } - public long serializedSize(StreamingHistogram histogram, TypeSizes typeSizes) + public long serializedSize(StreamingHistogram histogram) { - long size = typeSizes.sizeof(histogram.maxBinSize); + long size = TypeSizes.sizeof(histogram.maxBinSize); Map<Double, Long> entries = histogram.getAsMap(); - size += typeSizes.sizeof(entries.size()); + size += TypeSizes.sizeof(entries.size()); // size of entries = size * (8(double) + 8(long)) size += entries.size() * (8L + 8L); return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/UUIDSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDSerializer.java b/src/java/org/apache/cassandra/utils/UUIDSerializer.java index 2aa2b4e..2b174fe 100644 --- a/src/java/org/apache/cassandra/utils/UUIDSerializer.java +++ b/src/java/org/apache/cassandra/utils/UUIDSerializer.java @@ -17,12 +17,12 @@ */ package org.apache.cassandra.utils; -import java.io.DataInput; import java.io.IOException; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; public class UUIDSerializer implements IVersionedSerializer<UUID> @@ -35,13 +35,13 @@ public class UUIDSerializer implements IVersionedSerializer<UUID> out.writeLong(uuid.getLeastSignificantBits()); } - public UUID deserialize(DataInput in, int version) throws IOException + public UUID deserialize(DataInputPlus in, int version) throws IOException { return new UUID(in.readLong(), in.readLong()); } public long serializedSize(UUID uuid, int version) { - return TypeSizes.NATIVE.sizeof(uuid.getMostSignificantBits()) + TypeSizes.NATIVE.sizeof(uuid.getLeastSignificantBits()); + return TypeSizes.sizeof(uuid.getMostSignificantBits()) + TypeSizes.sizeof(uuid.getLeastSignificantBits()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/IBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java index ed7e54b..3b32fdb 100644 --- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java @@ -21,8 +21,6 @@ import java.io.Closeable; import java.io.DataOutput; import java.io.IOException; -import org.apache.cassandra.db.TypeSizes; - public interface IBitSet extends Closeable { public long capacity(); @@ -46,7 +44,7 @@ public interface IBitSet extends Closeable public void serialize(DataOutput out) throws IOException; - public long serializedSize(TypeSizes type); + public long serializedSize(); public void clear(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java index 46c1bd0..00c3e67 100644 --- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java @@ -108,7 +108,7 @@ public class OffHeapBitSet implements IBitSet out.writeInt((int) (bytes.size() / 8)); for (long i = 0; i < bytes.size();) { - long value = ((bytes.getByte(i++) & 0xff) << 0) + long value = ((bytes.getByte(i++) & 0xff) << 0) + ((bytes.getByte(i++) & 0xff) << 8) + ((bytes.getByte(i++) & 0xff) << 16) + ((long) (bytes.getByte(i++) & 0xff) << 24) @@ -120,9 +120,9 @@ public class OffHeapBitSet implements IBitSet } } - public long serializedSize(TypeSizes type) + public long serializedSize() { - return type.sizeof((int) bytes.size()) + bytes.size(); + return TypeSizes.sizeof((int) bytes.size()) + bytes.size(); } @SuppressWarnings("resource") http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java index e793f6c..dc48e5e 100644 --- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java @@ -416,16 +416,16 @@ public class OpenBitSet implements IBitSet } } - public long serializedSize(TypeSizes type) { + public long serializedSize() { int bitLength = getNumWords(); int pageSize = getPageSize(); int pageCount = getPageCount(); - long size = type.sizeof(bitLength); // length + long size = TypeSizes.sizeof(bitLength); // length for (int p = 0; p < pageCount; p++) { long[] bits = getPage(p); for (int i = 0; i < pageSize && bitLength-- > 0; i++) - size += type.sizeof(bits[i]); // bucket + size += TypeSizes.sizeof(bits[i]); // bucket } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java deleted file mode 100644 index 663e176..0000000 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils.vint; - -import java.io.DataInput; -import java.io.IOException; - -import org.apache.cassandra.io.util.AbstractDataInput; - -/** - * Borrows idea from - * https://developers.google.com/protocol-buffers/docs/encoding#varints - * - * Should be used with EncodedDataOutputStream - * - * @deprecated Where possible use NIODataInputStream which has a more efficient implementation of buffered input - * for most read methods - */ -public class EncodedDataInputStream extends AbstractDataInput implements DataInput -{ - private DataInput input; - - public EncodedDataInputStream(DataInput input) - { - this.input = input; - } - - public int skipBytes(int n) throws IOException - { - return input.skipBytes(n); - } - - public int read() throws IOException - { - return input.readByte() & 0xFF; - } - - public void seek(long position) - { - throw new UnsupportedOperationException(); - } - - public long getPosition() - { - throw new UnsupportedOperationException(); - } - - public long getPositionLimit() - { - throw new UnsupportedOperationException(); - } - - protected long length() - { - throw new UnsupportedOperationException(); - } - - /* as all of the integer types could be decoded using VInt we can use single method vintEncode */ - - public int readInt() throws IOException - { - return (int) VIntCoding.readVInt(input); - } - - public long readLong() throws IOException - { - return VIntCoding.readVInt(input); - } - - public int readUnsignedShort() throws IOException - { - return (short) VIntCoding.readVInt(input); - } - - public short readShort() throws IOException - { - return (short) VIntCoding.readVInt(input); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java deleted file mode 100644 index 7f7613f..0000000 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils.vint; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus; - -/** - * Borrows idea from - * https://developers.google.com/protocol-buffers/docs/encoding#varints - */ -public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus -{ - private OutputStream out; - - public EncodedDataOutputStream(OutputStream out) - { - this.out = out; - } - - public void write(int b) throws IOException - { - out.write(b); - } - - public void write(byte[] b) throws IOException - { - out.write(b); - } - - public void write(byte[] b, int off, int len) throws IOException - { - out.write(b, off, len); - } - - /* as all of the integer types could be encoded using VInt we can use single method vintEncode */ - - public void writeInt(int v) throws IOException - { - writeVInt(v); - } - - public void writeLong(long v) throws IOException - { - writeVInt(v); - } - - public void writeShort(int v) throws IOException - { - writeVInt(v); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 9b4dde7..e1dd953 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -21,13 +21,10 @@ package org.apache.cassandra.db.commitlog; * */ -import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -55,14 +52,12 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.io.util.FastByteArrayInputStream; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.io.util.NIODataInputStream; public class CommitLogStressTest { @@ -467,11 +462,11 @@ public class CommitLogStressTest // Skip over this mutation. return; - FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size); Mutation mutation; try { - mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/AbstractSerializationsTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java index ebfa79d..636d673 100644 --- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java +++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java @@ -20,12 +20,14 @@ package org.apache.cassandra; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; -import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -59,11 +61,11 @@ public class AbstractSerializationsTester assert out.getLength() == serializer.serializedSize(obj, getVersion()); } - protected static DataInputStream getInput(String name) throws IOException + protected static DataInputStreamPlus getInput(String name) throws IOException { File f = new File("test/data/serialization/" + CUR_VER + "/" + name); assert f.exists() : f.getPath(); - return new DataInputStream(new FileInputStream(f)); + return new DataInputPlus.DataInputStreamPlus(new FileInputStream(f)); } @SuppressWarnings("resource") http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/PartitionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java index 9aadc2c..515902b 100644 --- a/test/unit/org/apache/cassandra/db/PartitionTest.java +++ b/test/unit/org/apache/cassandra/db/PartitionTest.java @@ -25,7 +25,6 @@ import java.util.Arrays; import org.junit.BeforeClass; import org.junit.Test; - import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.db.rows.RowStats; @@ -33,6 +32,7 @@ import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.SchemaLoader; @@ -40,7 +40,6 @@ import org.apache.cassandra.Util; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.hadoop.io.DataInputBuffer; import static junit.framework.Assert.assertTrue; import static org.junit.Assert.assertEquals; @@ -79,9 +78,7 @@ public class PartitionTest DataOutputBuffer bufOut = new DataOutputBuffer(); CachedPartition.cacheSerializer.serialize(partition, bufOut); - DataInputBuffer bufIn = new DataInputBuffer(); - bufIn.reset(bufOut.getData(), 0, bufOut.getLength()); - CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(bufIn); + CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData())); assert deserialized != null; assert deserialized.metadata().cfName.equals(CF_STANDARD1); @@ -106,9 +103,7 @@ public class PartitionTest DataOutputBuffer bufOut = new DataOutputBuffer(); CachedPartition.cacheSerializer.serialize(partition, bufOut); - DataInputBuffer bufIn = new DataInputBuffer(); - bufIn.reset(bufOut.getData(), 0, bufOut.getLength()); - CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(bufIn); + CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData())); assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount()); assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/ReadMessageTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java index bf6f23d..2475821 100644 --- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java +++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java @@ -23,9 +23,9 @@ import static org.junit.Assert.*; import java.io.*; import com.google.common.base.Predicate; + import org.junit.BeforeClass; import org.junit.Test; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; @@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; @@ -140,12 +142,11 @@ public class ReadMessageTest { IVersionedSerializer<ReadCommand> rms = ReadCommand.serializer; DataOutputBuffer out = new DataOutputBuffer(); - ByteArrayInputStream bis; rms.serialize(rm, out, MessagingService.current_version); - bis = new ByteArrayInputStream(out.getData(), 0, out.getLength()); - return rms.deserialize(new DataInputStream(bis), MessagingService.current_version); + DataInputPlus dis = new NIODataInputStream(out.getData()); + return rms.deserialize(dis, MessagingService.current_version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index fed569f..3f1918c 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -18,18 +18,16 @@ */ package org.apache.cassandra.db.commitlog; -import java.io.DataInputStream; import java.io.File; import java.io.IOException; import com.google.common.base.Predicate; import org.junit.Assert; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.rows.SerializationHelper; -import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.NIODataInputStream; /** * Utility class for tests needing to examine the commitlog contents. @@ -61,11 +59,11 @@ public class CommitLogTestReplayer extends CommitLogReplayer @Override void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) { - FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size); Mutation mutation; try { - mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); Assert.assertTrue(processor.apply(mutation)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/gms/GossipDigestTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java index 2928b12..6a8a6d3 100644 --- a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java +++ b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java @@ -20,11 +20,12 @@ package org.apache.cassandra.gms; import static org.junit.Assert.*; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; + import java.net.InetAddress; import org.apache.cassandra.net.MessagingService; @@ -48,8 +49,8 @@ public class GossipDigestTest DataOutputBuffer output = new DataOutputBuffer(); GossipDigest.serializer.serialize(expected, output, MessagingService.current_version); - ByteArrayInputStream input = new ByteArrayInputStream(output.getData(), 0, output.getLength()); - GossipDigest actual = GossipDigest.serializer.deserialize(new DataInputStream(input), MessagingService.current_version); + DataInputPlus input = new NIODataInputStream(output.getData()); + GossipDigest actual = GossipDigest.serializer.deserialize(input, MessagingService.current_version); assertEquals(0, expected.compareTo(actual)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/gms/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java index 080ae53..bab1ace 100644 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@ -20,12 +20,12 @@ package org.apache.cassandra.gms; import org.apache.cassandra.AbstractSerializationsTester; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.junit.Test; -import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; @@ -58,7 +58,7 @@ public class SerializationsTest extends AbstractSerializationsTester if (EXECUTE_WRITES) testEndpointStateWrite(); - DataInputStream in = getInput("gms.EndpointState.bin"); + DataInputStreamPlus in = getInput("gms.EndpointState.bin"); assert HeartBeatState.serializer.deserialize(in, getVersion()) != null; assert EndpointState.serializer.deserialize(in, getVersion()) != null; assert VersionedValue.serializer.deserialize(in, getVersion()) != null; @@ -98,7 +98,7 @@ public class SerializationsTest extends AbstractSerializationsTester testGossipDigestWrite(); int count = 0; - DataInputStream in = getInput("gms.Gossip.bin"); + DataInputStreamPlus in = getInput("gms.Gossip.bin"); while (count < Statics.Digests.size()) assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null; assert GossipDigestAck.serializer.deserialize(in, getVersion()) != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index 9c8e0fb..b7af1be 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -25,12 +25,13 @@ import java.util.Collections; import java.util.UUID; import org.junit.Test; - import org.apache.cassandra.AbstractSerializationsTester; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -79,7 +80,7 @@ public class SerializationsTest extends AbstractSerializationsTester if (EXECUTE_WRITES) testValidationRequestWrite(); - try (DataInputStream in = getInput("service.ValidationRequest.bin")) + try (DataInputStreamPlus in = getInput("service.ValidationRequest.bin")) { RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion()); assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST; @@ -117,7 +118,7 @@ public class SerializationsTest extends AbstractSerializationsTester if (EXECUTE_WRITES) testValidationCompleteWrite(); - try (DataInputStream in = getInput("service.ValidationComplete.bin")) + try (DataInputStreamPlus in = getInput("service.ValidationComplete.bin")) { // empty validation RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion()); @@ -169,7 +170,7 @@ public class SerializationsTest extends AbstractSerializationsTester InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2}); InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3}); - try (DataInputStream in = getInput("service.SyncRequest.bin")) + try (DataInputStreamPlus in = getInput("service.SyncRequest.bin")) { RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion()); assert message.messageType == RepairMessage.Type.SYNC_REQUEST; @@ -205,7 +206,7 @@ public class SerializationsTest extends AbstractSerializationsTester InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3}); NodePair nodes = new NodePair(src, dest); - try (DataInputStream in = getInput("service.SyncComplete.bin")) + try (DataInputStreamPlus in = getInput("service.SyncComplete.bin")) { // success RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java b/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java deleted file mode 100644 index a2cff63..0000000 --- a/test/unit/org/apache/cassandra/utils/EncodedStreamsTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils; - -import java.io.*; - -import com.google.common.collect.Iterators; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.RowUpdateBuilder; -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.vint.EncodedDataInputStream; -import org.apache.cassandra.utils.vint.EncodedDataOutputStream; - - -public class EncodedStreamsTest -{ - private static final String KEYSPACE1 = "Keyspace1"; - private static final String CF_STANDARD = "Standard1"; - private static final String CF_COUNTER = "Counter1"; - private int version = MessagingService.current_version; - - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD), - SchemaLoader.counterCFMD(KEYSPACE1, CF_COUNTER)); - } - - @Test - public void testStreams() throws IOException - { - ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream(); - EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1); - - ByteArrayOutputStream byteArrayOStream2 = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(byteArrayOStream2); - - for (short i = 0; i < 10000; i++) - { - out.writeShort(i); - odos.writeShort(i); - } - out.flush(); - odos.flush(); - - for (int i = Short.MAX_VALUE; i < ((int)Short.MAX_VALUE + 10000); i++) - { - out.writeInt(i); - odos.writeInt(i); - } - out.flush(); - odos.flush(); - - for (long i = Integer.MAX_VALUE; i < ((long)Integer.MAX_VALUE + 10000);i++) - { - out.writeLong(i); - odos.writeLong(i); - } - out.flush(); - odos.flush(); - Assert.assertTrue(byteArrayOStream1.size() < byteArrayOStream2.size()); - - ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray()); - EncodedDataInputStream idis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1)); - - // assert reading Short - for (int i = 0; i < 10000; i++) - Assert.assertEquals(i, idis.readShort()); - - // assert reading Integer - for (int i = Short.MAX_VALUE; i < ((int)Short.MAX_VALUE + 10000); i++) - Assert.assertEquals(i, idis.readInt()); - - // assert reading Long - for (long i = Integer.MAX_VALUE; i < ((long)Integer.MAX_VALUE) + 1000; i++) - Assert.assertEquals(i, idis.readLong()); - } - - private UnfilteredRowIterator createTable() - { - CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD).metadata; - - RowUpdateBuilder builder = new RowUpdateBuilder(cfm, 0, "key"); - - builder.clustering("vijay").add(cfm.partitionColumns().iterator().next(), "try").build(); - builder.clustering("to").add(cfm.partitionColumns().iterator().next(), "be_nice").build(); - - return builder.unfilteredIterator(); - } - - private UnfilteredRowIterator createCounterTable() - { - CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER).metadata; - RowUpdateBuilder builder = new RowUpdateBuilder(cfm, 0, "key"); - - builder.clustering("vijay").add(cfm.partitionColumns().iterator().next(), 1L).build(); - builder.clustering("wants").add(cfm.partitionColumns().iterator().next(), 1000000L).build(); - - return builder.unfilteredIterator(); - } - - @Test - public void testCFSerialization() throws IOException - { - ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream(); - EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1); - UnfilteredRowIteratorSerializer.serializer.serialize(createTable(), odos, version, 1); - - ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray()); - EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1)); - UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(odis, version, SerializationHelper.Flag.LOCAL); - Assert.assertTrue(Iterators.elementsEqual(partition, createTable())); - Assert.assertEquals(byteArrayOStream1.size(), (int) UnfilteredRowIteratorSerializer.serializer.serializedSize(createTable(), version, 1, TypeSizes.VINT)); - } - - @Test - public void testCounterCFSerialization() throws IOException - { - ByteArrayOutputStream byteArrayOStream1 = new ByteArrayOutputStream(); - EncodedDataOutputStream odos = new EncodedDataOutputStream(byteArrayOStream1); - UnfilteredRowIteratorSerializer.serializer.serialize(createCounterTable(), odos, version, 1); - - ByteArrayInputStream byteArrayIStream1 = new ByteArrayInputStream(byteArrayOStream1.toByteArray()); - EncodedDataInputStream odis = new EncodedDataInputStream(new DataInputStream(byteArrayIStream1)); - UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(odis, version, SerializationHelper.Flag.LOCAL); - Assert.assertTrue(Iterators.elementsEqual(partition, createCounterTable())); - Assert.assertEquals(byteArrayOStream1.size(), (int) UnfilteredRowIteratorSerializer.serializer.serializedSize(createCounterTable(), version, 1, TypeSizes.VINT)); - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java index 01d7bd8..bc23f14 100644 --- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java @@ -21,9 +21,6 @@ package org.apache.cassandra.utils; */ -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; @@ -31,12 +28,12 @@ import java.util.Collections; import java.util.List; import org.junit.Test; - -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.NIODataInputStream; import static org.junit.Assert.assertEquals; @@ -155,12 +152,12 @@ public class IntervalTreeTest out.writeInt(i); } - public Integer deserialize(DataInput in) throws IOException + public Integer deserialize(DataInputPlus in) throws IOException { return in.readInt(); } - public long serializedSize(Integer i, TypeSizes s) + public long serializedSize(Integer i) { return 4; } @@ -172,12 +169,12 @@ public class IntervalTreeTest out.writeUTF(v); } - public String deserialize(DataInput in) throws IOException + public String deserialize(DataInputPlus in) throws IOException { return in.readUTF(); } - public long serializedSize(String v, TypeSizes s) + public long serializedSize(String v) { return v.length(); } @@ -189,7 +186,7 @@ public class IntervalTreeTest serializer.serialize(it, out, 0); - DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray())); + DataInputPlus in = new NIODataInputStream(out.toByteArray()); IntervalTree<Integer, String, Interval<Integer, String>> it2 = serializer.deserialize(in, 0); List<Interval<Integer, String>> intervals2 = new ArrayList<Interval<Integer, String>>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java index fe7f506..03c906c 100644 --- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java @@ -22,18 +22,18 @@ import java.math.BigInteger; import java.util.*; import com.google.common.collect.AbstractIterator; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteStreams; + import org.junit.Before; import org.junit.Test; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.MerkleTree.Hashable; import org.apache.cassandra.utils.MerkleTree.RowHash; @@ -400,7 +400,7 @@ public class MerkleTreeTest MerkleTree.serializer.serialize(mt, out, MessagingService.current_version); byte[] serialized = out.toByteArray(); - ByteArrayDataInput in = ByteStreams.newDataInput(serialized); + DataInputPlus in = new NIODataInputStream(serialized); MerkleTree restored = MerkleTree.serializer.deserialize(in, MessagingService.current_version); assertHashEquals(initialhash, restored.hash(full)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java index 0775246..f3809b3 100644 --- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java @@ -23,9 +23,9 @@ import java.io.IOException; import org.junit.Assert; import org.junit.Test; - import org.apache.cassandra.AbstractSerializationsTester; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.service.StorageService; @@ -87,7 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester if (EXECUTE_WRITES) testEstimatedHistogramWrite(); - try (DataInputStream in = getInput("utils.EstimatedHistogram.bin")) + try (DataInputStreamPlus in = getInput("utils.EstimatedHistogram.bin")) { Assert.assertNotNull(EstimatedHistogram.serializer.deserialize(in)); Assert.assertNotNull(EstimatedHistogram.serializer.deserialize(in)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03f72acd/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java index 38b2f04..0ea25da 100644 --- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java +++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java @@ -24,8 +24,8 @@ import java.util.LinkedHashMap; import java.util.Map; import org.junit.Test; - import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.NIODataInputStream; import static org.junit.Assert.assertEquals; @@ -103,7 +103,7 @@ public class StreamingHistogramTest StreamingHistogram.serializer.serialize(hist, out); byte[] bytes = out.toByteArray(); - StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); + StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new NIODataInputStream(bytes)); // deserialized histogram should have following values Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);