Repository: cassandra Updated Branches: refs/heads/trunk 7b35e3e84 -> 5786b3204
Fix NIODataInputStream varint decoding and EOF behavior patch by ariel; reviewed by benedict for CASSANDRA-9863 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4c9eaeb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4c9eaeb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4c9eaeb Branch: refs/heads/trunk Commit: c4c9eaeb131d4db2c4be3316611efb1ac2b17b23 Parents: 7b35e3e Author: Ariel Weisberg <ar...@weisberg.ws> Authored: Wed Jul 22 17:08:16 2015 -0400 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Jul 23 23:23:16 2015 +0100 ---------------------------------------------------------------------- .../org/apache/cassandra/cache/OHCProvider.java | 3 +- .../apache/cassandra/db/BatchlogManager.java | 6 +- .../cassandra/db/HintedHandOffManager.java | 5 +- .../org/apache/cassandra/db/ReadResponse.java | 3 +- .../org/apache/cassandra/db/SystemKeyspace.java | 3 +- .../db/commitlog/CommitLogReplayer.java | 4 +- .../db/partitions/PartitionUpdate.java | 4 +- .../cassandra/io/util/DataInputBuffer.java | 68 +++++++++++++ .../cassandra/io/util/NIODataInputStream.java | 102 +++++-------------- .../db/commitlog/CommitLogStressTest.java | 5 +- .../org/apache/cassandra/db/PartitionTest.java | 6 +- .../apache/cassandra/db/ReadMessageTest.java | 4 +- .../db/commitlog/CommitLogTestReplayer.java | 3 +- .../apache/cassandra/gms/GossipDigestTest.java | 4 +- .../io/util/NIODataInputStreamTest.java | 100 ++++++++++++++++++ .../cassandra/utils/IntervalTreeTest.java | 4 +- .../apache/cassandra/utils/MerkleTreeTest.java | 3 +- .../cassandra/utils/StreamingHistogramTest.java | 6 +- 18 files changed, 228 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java index 21fc7c7..b0b4521 100644 --- a/src/java/org/apache/cassandra/cache/OHCProvider.java +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.partitions.CachedPartition; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.NIODataInputStream; import org.caffinitas.ohc.OHCache; @@ -171,7 +172,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> { try { - NIODataInputStream in = new NIODataInputStream(buf, false); + NIODataInputStream in = new DataInputBuffer(buf, false); boolean isSentinel = in.readBoolean(); if (isSentinel) return new RowCacheSentinel(in.readLong()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index b6c658b..e8b76be 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; @@ -47,16 +46,15 @@ import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.WriteResponseHandler; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; @@ -318,7 +316,7 @@ public class BatchlogManager implements BatchlogManagerMBean private List<Mutation> replayingMutations() throws IOException { - DataInputPlus in = new NIODataInputStream(data, true); + DataInputPlus in = new DataInputBuffer(data, true); int size = in.readInt(); List<Mutation> mutations = new ArrayList<>(size); for (int i = 0; i < size; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 4501f3c..234ab97 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; @@ -54,8 +53,8 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -410,7 +409,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean Cell cell = hint.getCell(hintColumn); final long timestamp = cell.timestamp(); - DataInputPlus in = new NIODataInputStream(cell.value(), true); + DataInputPlus in = new DataInputBuffer(cell.value(), true); Mutation mutation; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 3737a38..b9928dc 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -116,7 +117,7 @@ public abstract class ReadResponse { try { - DataInputPlus in = new DataInputPlus.DataInputStreamPlus(ByteBufferUtil.inputStream(data)); + DataInputPlus in = new DataInputBuffer(data, true); return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, flag); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index df7e7ef..0957af6 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -48,6 +48,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.locator.IEndpointSnitch; @@ -651,7 +652,7 @@ public final class SystemKeyspace { try { - NIODataInputStream in = new NIODataInputStream(bytes, true); + NIODataInputStream in = new DataInputBuffer(bytes, true); return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index cd8c935..e22e6e3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -38,7 +38,6 @@ import com.google.common.collect.Ordering; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -50,6 +49,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.NIODataInputStream; @@ -474,7 +474,7 @@ public class CommitLogReplayer { final Mutation mutation; - try (NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size)) + try (NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) { mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index e805fd2..366828a 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -31,10 +31,10 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.FBUtilities; @@ -229,7 +229,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition try { - return serializer.deserialize(new NIODataInputStream(bytes, true), + return serializer.deserialize(new DataInputBuffer(bytes, true), version, SerializationHelper.Flag.LOCAL, version < MessagingService.VERSION_30 ? key : null); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/io/util/DataInputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java new file mode 100644 index 0000000..63091d0 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Input stream around a fixed ByteBuffer. Necessary to have this derived class to avoid NIODataInputStream's + * shuffling of bytes behavior in readNext() + * + */ +public class DataInputBuffer extends NIODataInputStream +{ + + private static ByteBuffer slice(byte[] buffer, int offset, int length) + { + ByteBuffer buf = ByteBuffer.wrap(buffer); + if (offset > 0 || length < buf.capacity()) + { + buf.position(offset); + buf.limit(offset + length); + buf = buf.slice(); + } + return buf; + } + + /** + * + * @param buf + * @param duplicate Whether or not to duplicate the buffer to ensure thread safety + */ + public DataInputBuffer(ByteBuffer buf, boolean duplicate) + { + super(buf, duplicate); + } + + public DataInputBuffer(byte[] buffer, int offset, int length) + { + super(slice(buffer, offset, length)); + } + + public DataInputBuffer(byte[] buffer) + { + super(ByteBuffer.wrap(buffer)); + } + + @Override + protected int readNext() throws IOException + { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/io/util/NIODataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java index edbf660..fbe24be 100644 --- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java +++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java @@ -48,7 +48,8 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl private final ByteBuffer buf; /* - * Used when wrapping a fixed buffer of data instead of a channel + * Used when wrapping a fixed buffer of data instead of a channel. Should never attempt + * to read from it. */ private static final ReadableByteChannel emptyReadableByteChannel = new ReadableByteChannel() { @@ -67,7 +68,7 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl @Override public int read(ByteBuffer dst) throws IOException { - return -1; + throw new AssertionError(); } }; @@ -82,19 +83,14 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl buf.limit(0); } - /** - * - * @param buf - * @param duplicate Whether or not to duplicate the buffer to ensure thread safety - */ - public NIODataInputStream(ByteBuffer buf, boolean duplicate) + protected NIODataInputStream(ByteBuffer buf, boolean duplicate) { Preconditions.checkNotNull(buf); - Preconditions.checkArgument(buf.capacity() >= 9, "Buffer size must be large enough to accomadate a varint"); if (duplicate) this.buf = buf.duplicate(); else this.buf = buf; + this.rbc = emptyReadableByteChannel; } @@ -102,33 +98,11 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl * The decision to duplicate or not really needs to conscious since it a real impact * in terms of thread safety so don't expose this constructor with an implicit default. */ - private NIODataInputStream(ByteBuffer buf) + protected NIODataInputStream(ByteBuffer buf) { this(buf, false); } - private static ByteBuffer slice(byte buffer[], int offset, int length) - { - ByteBuffer buf = ByteBuffer.wrap(buffer); - if (offset > 0 || length < buf.capacity()) - { - buf.position(offset); - buf.limit(offset + length); - buf = buf.slice(); - } - return buf; - } - - public NIODataInputStream(byte buffer[], int offset, int length) - { - this(slice(buffer, offset, length)); - } - - public NIODataInputStream(byte buffer[]) - { - this(ByteBuffer.wrap(buffer)); - } - @Override public void readFully(byte[] b) throws IOException { @@ -185,7 +159,7 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl /* * Refill the buffer, preserving any unread bytes remaining in the buffer */ - private int readNext() throws IOException + protected int readNext() throws IOException { Preconditions.checkState(buf.remaining() != buf.capacity()); assert(buf.remaining() < 9); @@ -204,9 +178,12 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl } else if (buf.hasRemaining()) { - ByteBuffer dup = buf.duplicate(); + //FastByteOperations.copy failed to do the copy so inline a simple one here + int position = buf.position(); + int remaining = buf.remaining(); buf.clear(); - buf.put(dup); + for (int ii = 0; ii < remaining; ii++) + buf.put(buf.get(position + ii)); } else { @@ -223,55 +200,32 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl return read; } - /* - * Read at least minimum bytes and throw EOF if that fails - */ - private void readMinimum(int attempt, int require) throws IOException + /* + * Read the minimum number of bytes and throw EOF if the minimum could not be read + */ + private void readMinimum(int minimum) throws IOException { assert(buf.remaining() < 8); - int remaining; - while ((remaining = buf.remaining()) < attempt) + while (buf.remaining() < minimum) { int read = readNext(); if (read == -1) { - if (remaining < require) - { - //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here - buf.position(0); - buf.limit(0); - throw new EOFException(); - } + //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here + buf.position(0); + buf.limit(0); + throw new EOFException(); } } } /* * Ensure the buffer contains the minimum number of readable bytes, throws EOF if enough bytes aren't available - * Add padding if requested and return the limit of the buffer without any padding that is added. - */ - private int prepareReadPaddedPrimitive(int minimum) throws IOException - { - int limitToSet = buf.limit(); - int position = buf.position(); - if (limitToSet - position < minimum) - { - readMinimum(minimum, 1); - limitToSet = buf.limit(); - position = buf.position(); - if (limitToSet - position < minimum) - buf.limit(position + minimum); - } - return limitToSet; - } - - /* - * Ensure the buffer contains the minimum number of readable bytes, throws EOF if enough bytes aren't available */ private void prepareReadPrimitive(int minimum) throws IOException { if (buf.remaining() < minimum) - readMinimum(minimum, minimum); + readMinimum(minimum); } @Override @@ -351,23 +305,23 @@ public class NIODataInputStream extends InputStream implements DataInputPlus, Cl public long readUnsignedVInt() throws IOException { - byte firstByte = readByte(); + //If 9 bytes aren't available use the slow path in VIntCoding + if (buf.remaining() < 9) + return VIntCoding.readUnsignedVInt(this); + + byte firstByte = buf.get(); //Bail out early if this is one byte, necessary or it fails later if (firstByte >= 0) return firstByte; - //If padding was added, the limit to set after to get rid of the padding - int limitToSet = prepareReadPaddedPrimitive(8); + int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); int position = buf.position(); - int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); int extraBits = extraBytes * 8; long retval = buf.getLong(position); buf.position(position + extraBytes); - buf.limit(limitToSet); - // truncate the bytes we read in excess of those we needed retval >>>= 64 - extraBits; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 8a63a27..d3ff082 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -57,7 +57,8 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.io.util.NIODataInputStream; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; public class CommitLogStressTest { @@ -462,7 +463,7 @@ public class CommitLogStressTest // Skip over this mutation. return; - NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size); + DataInputPlus bufIn = new DataInputBuffer(inputBuffer, 0, size); Mutation mutation; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/db/PartitionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java index 9e9f68f..899fee7 100644 --- a/test/unit/org/apache/cassandra/db/PartitionTest.java +++ b/test/unit/org/apache/cassandra/db/PartitionTest.java @@ -31,8 +31,8 @@ import org.apache.cassandra.db.rows.RowStats; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.SchemaLoader; @@ -78,7 +78,7 @@ public class PartitionTest DataOutputBuffer bufOut = new DataOutputBuffer(); CachedPartition.cacheSerializer.serialize(partition, bufOut); - CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData())); + CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData())); assert deserialized != null; assert deserialized.metadata().cfName.equals(CF_STANDARD1); @@ -103,7 +103,7 @@ public class PartitionTest DataOutputBuffer bufOut = new DataOutputBuffer(); CachedPartition.cacheSerializer.serialize(partition, bufOut); - CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new NIODataInputStream(bufOut.getData())); + CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData())); assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount()); assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/db/ReadMessageTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java index 3c53934..d801b32 100644 --- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java +++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java @@ -38,9 +38,9 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; @@ -145,7 +145,7 @@ public class ReadMessageTest rms.serialize(rm, out, MessagingService.current_version); - DataInputPlus dis = new NIODataInputStream(out.getData()); + DataInputPlus dis = new DataInputBuffer(out.getData()); return rms.deserialize(dis, MessagingService.current_version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 3f1918c..994ee19 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -27,6 +27,7 @@ import org.junit.Assert; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.NIODataInputStream; /** @@ -59,7 +60,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer @Override void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) { - NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, size); + NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); Mutation mutation; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/gms/GossipDigestTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java index 6a8a6d3..3191b03 100644 --- a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java +++ b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java @@ -22,9 +22,9 @@ import static org.junit.Assert.*; import java.io.IOException; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.NIODataInputStream; import java.net.InetAddress; @@ -49,7 +49,7 @@ public class GossipDigestTest DataOutputBuffer output = new DataOutputBuffer(); GossipDigest.serializer.serialize(expected, output, MessagingService.current_version); - DataInputPlus input = new NIODataInputStream(output.getData()); + DataInputPlus input = new DataInputBuffer(output.getData()); GossipDigest actual = GossipDigest.serializer.deserialize(input, MessagingService.current_version); assertEquals(0, expected.compareTo(actual)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java index 11ff23a..3aad7e9 100644 --- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java @@ -745,4 +745,104 @@ public class NIODataInputStreamTest assertEquals(totalRead, corpus.capacity()); assertEquals(-1, dis.read()); } + + + @Test + @SuppressWarnings({ "resource"}) + public void testVIntRemainingBytes() throws Exception + { + for(int ii = 0; ii < 10; ii++) + { + for (int zz = 0; zz < 10; zz++) + { + if (zz + ii > 10) + continue; + + ByteBuffer buf = ByteBuffer.allocate(10); + buf.position(ii); + + long value = 0; + if (ii > 0) + value = (1L << 7 * zz) - 1; + + BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buf); + out.writeUnsignedVInt(value); + + buf.position(ii); + NIODataInputStream in = new DataInputBuffer(buf, false); + + assertEquals(value, in.readUnsignedVInt()); + } + } + } + + @Test + @SuppressWarnings({ "resource"}) + public void testVIntSmallBuffer() throws Exception + { + for(int ii = 0; ii < 10; ii++) + { + ByteBuffer buf = ByteBuffer.allocate(Math.max(1, ii)); + + long value = 0; + if (ii > 0) + value = (1L << 7 * ii) - 1; + + BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buf); + out.writeUnsignedVInt(value); + + buf.position(0); + NIODataInputStream in = new DataInputBuffer(buf, false); + + assertEquals(value, in.readUnsignedVInt()); + + boolean threw = false; + try + { + in.readUnsignedVInt(); + } + catch (EOFException e) + { + threw = true; + } + assertTrue(threw); + } + } + + @Test + @SuppressWarnings({ "resource"}) + public void testVIntTruncationEOF() throws Exception + { + for(int ii = 0; ii < 10; ii++) + { + ByteBuffer buf = ByteBuffer.allocate(Math.max(1, ii)); + + long value = 0; + if (ii > 0) + value = (1L << 7 * ii) - 1; + + BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buf); + out.writeUnsignedVInt(value); + + buf.position(0); + + ByteBuffer truncated = ByteBuffer.allocate(buf.capacity() - 1); + buf.limit(buf.limit() - 1); + truncated.put(buf); + truncated.flip(); + + NIODataInputStream in = new DataInputBuffer(truncated, false); + + boolean threw = false; + try + { + in.readUnsignedVInt(); + } + catch (EOFException e) + { + threw = true; + } + assertTrue(threw); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java index bc23f14..7e72098 100644 --- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java @@ -30,10 +30,10 @@ import java.util.List; import org.junit.Test; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.NIODataInputStream; import static org.junit.Assert.assertEquals; @@ -186,7 +186,7 @@ public class IntervalTreeTest serializer.serialize(it, out, 0); - DataInputPlus in = new NIODataInputStream(out.toByteArray()); + DataInputPlus in = new DataInputBuffer(out.toByteArray()); IntervalTree<Integer, String, Interval<Integer, String>> it2 = serializer.deserialize(in, 0); List<Interval<Integer, String>> intervals2 = new ArrayList<Interval<Integer, String>>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java index 03c906c..edb1fb1 100644 --- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java @@ -31,6 +31,7 @@ import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.NIODataInputStream; @@ -400,7 +401,7 @@ public class MerkleTreeTest MerkleTree.serializer.serialize(mt, out, MessagingService.current_version); byte[] serialized = out.toByteArray(); - DataInputPlus in = new NIODataInputStream(serialized); + DataInputPlus in = new DataInputBuffer(serialized); MerkleTree restored = MerkleTree.serializer.deserialize(in, MessagingService.current_version); assertHashEquals(initialhash, restored.hash(full)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java index 0ea25da..b6b1882 100644 --- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java +++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java @@ -17,15 +17,13 @@ */ package org.apache.cassandra.utils; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import org.junit.Test; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.NIODataInputStream; import static org.junit.Assert.assertEquals; @@ -103,7 +101,7 @@ public class StreamingHistogramTest StreamingHistogram.serializer.serialize(hist, out); byte[] bytes = out.toByteArray(); - StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new NIODataInputStream(bytes)); + StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes)); // deserialized histogram should have following values Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);