Repository: cassandra
Updated Branches:
  refs/heads/trunk 7b35e3e84 -> 5786b3204


Fix NIODataInputStream varint decoding and EOF behavior

patch by ariel; reviewed by benedict for CASSANDRA-9863


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4c9eaeb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4c9eaeb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4c9eaeb

Branch: refs/heads/trunk
Commit: c4c9eaeb131d4db2c4be3316611efb1ac2b17b23
Parents: 7b35e3e
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Wed Jul 22 17:08:16 2015 -0400
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Jul 23 23:23:16 2015 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/cache/OHCProvider.java |   3 +-
 .../apache/cassandra/db/BatchlogManager.java    |   6 +-
 .../cassandra/db/HintedHandOffManager.java      |   5 +-
 .../org/apache/cassandra/db/ReadResponse.java   |   3 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   3 +-
 .../db/commitlog/CommitLogReplayer.java         |   4 +-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../cassandra/io/util/DataInputBuffer.java      |  68 +++++++++++++
 .../cassandra/io/util/NIODataInputStream.java   | 102 +++++--------------
 .../db/commitlog/CommitLogStressTest.java       |   5 +-
 .../org/apache/cassandra/db/PartitionTest.java  |   6 +-
 .../apache/cassandra/db/ReadMessageTest.java    |   4 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   3 +-
 .../apache/cassandra/gms/GossipDigestTest.java  |   4 +-
 .../io/util/NIODataInputStreamTest.java         | 100 ++++++++++++++++++
 .../cassandra/utils/IntervalTreeTest.java       |   4 +-
 .../apache/cassandra/utils/MerkleTreeTest.java  |   3 +-
 .../cassandra/utils/StreamingHistogramTest.java |   6 +-
 18 files changed, 228 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java 
b/src/java/org/apache/cassandra/cache/OHCProvider.java
index 21fc7c7..b0b4521 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.partitions.CachedPartition;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.NIODataInputStream;
 import org.caffinitas.ohc.OHCache;
@@ -171,7 +172,7 @@ public class OHCProvider implements 
CacheProvider<RowCacheKey, IRowCacheEntry>
         {
             try
             {
-                NIODataInputStream in = new NIODataInputStream(buf, false);
+                NIODataInputStream in = new DataInputBuffer(buf, false);
                 boolean isSentinel = in.readBoolean();
                 if (isSentinel)
                     return new RowCacheSentinel(in.readLong());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index b6c658b..e8b76be 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
@@ -47,16 +46,15 @@ import 
org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -318,7 +316,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         private List<Mutation> replayingMutations() throws IOException
         {
-            DataInputPlus in = new NIODataInputStream(data, true);
+            DataInputPlus in = new DataInputBuffer(data, true);
             int size = in.readInt();
             List<Mutation> mutations = new ArrayList<>(size);
             for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 4501f3c..234ab97 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
@@ -54,8 +53,8 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -410,7 +409,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                 Cell cell = hint.getCell(hintColumn);
 
                 final long timestamp = cell.timestamp();
-                DataInputPlus in = new NIODataInputStream(cell.value(), true);
+                DataInputPlus in = new DataInputBuffer(cell.value(), true);
                 Mutation mutation;
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index 3737a38..b9928dc 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -116,7 +117,7 @@ public abstract class ReadResponse
         {
             try
             {
-                DataInputPlus in = new 
DataInputPlus.DataInputStreamPlus(ByteBufferUtil.inputStream(data));
+                DataInputPlus in = new DataInputBuffer(data, true);
                 return 
UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, 
MessagingService.current_version, flag);
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index df7e7ef..0957af6 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.locator.IEndpointSnitch;
@@ -651,7 +652,7 @@ public final class SystemKeyspace
     {
         try
         {
-            NIODataInputStream in = new NIODataInputStream(bytes, true);
+            NIODataInputStream in = new DataInputBuffer(bytes, true);
             return Pair.create(ReplayPosition.serializer.deserialize(in), 
in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index cd8c935..e22e6e3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -38,7 +38,6 @@ import com.google.common.collect.Ordering;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -50,6 +49,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.NIODataInputStream;
@@ -474,7 +474,7 @@ public class CommitLogReplayer
     {
 
         final Mutation mutation;
-        try (NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, 
size))
+        try (NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, 
size))
         {
             mutation = Mutation.serializer.deserialize(bufIn,
                                                        
desc.getMessagingVersion(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index e805fd2..366828a 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -31,10 +31,10 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
@@ -229,7 +229,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
 
         try
         {
-            return serializer.deserialize(new NIODataInputStream(bytes, true),
+            return serializer.deserialize(new DataInputBuffer(bytes, true),
                                           version,
                                           SerializationHelper.Flag.LOCAL,
                                           version < 
MessagingService.VERSION_30 ? key : null);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java 
b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
new file mode 100644
index 0000000..63091d0
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Input stream around a fixed ByteBuffer. Necessary to have this derived 
class to avoid NIODataInputStream's
+ * shuffling of bytes behavior in readNext()
+ *
+ */
+public class DataInputBuffer extends NIODataInputStream
+{
+
+    private static ByteBuffer slice(byte[] buffer, int offset, int length)
+    {
+        ByteBuffer buf = ByteBuffer.wrap(buffer);
+        if (offset > 0 || length < buf.capacity())
+        {
+            buf.position(offset);
+            buf.limit(offset + length);
+            buf = buf.slice();
+        }
+        return buf;
+    }
+
+    /**
+     *
+     * @param buf
+     * @param duplicate Whether or not to duplicate the buffer to ensure 
thread safety
+     */
+    public DataInputBuffer(ByteBuffer buf, boolean duplicate)
+    {
+        super(buf, duplicate);
+    }
+
+    public DataInputBuffer(byte[] buffer, int offset, int length)
+    {
+        super(slice(buffer, offset, length));
+    }
+
+    public DataInputBuffer(byte[] buffer)
+    {
+        super(ByteBuffer.wrap(buffer));
+    }
+
+    @Override
+    protected int readNext() throws IOException
+    {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java 
b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
index edbf660..fbe24be 100644
--- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -48,7 +48,8 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
     private final ByteBuffer buf;
 
     /*
-     *  Used when wrapping a fixed buffer of data instead of a channel
+     *  Used when wrapping a fixed buffer of data instead of a channel. Should 
never attempt
+     *  to read from it.
      */
     private static final ReadableByteChannel emptyReadableByteChannel = new 
ReadableByteChannel()
     {
@@ -67,7 +68,7 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
         @Override
         public int read(ByteBuffer dst) throws IOException
         {
-            return -1;
+            throw new AssertionError();
         }
 
     };
@@ -82,19 +83,14 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
         buf.limit(0);
     }
 
-    /**
-     *
-     * @param buf
-     * @param duplicate Whether or not to duplicate the buffer to ensure 
thread safety
-     */
-    public NIODataInputStream(ByteBuffer buf, boolean duplicate)
+    protected NIODataInputStream(ByteBuffer buf, boolean duplicate)
     {
         Preconditions.checkNotNull(buf);
-        Preconditions.checkArgument(buf.capacity() >= 9, "Buffer size must be 
large enough to accomadate a varint");
         if (duplicate)
             this.buf = buf.duplicate();
         else
             this.buf = buf;
+
         this.rbc = emptyReadableByteChannel;
     }
 
@@ -102,33 +98,11 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
      * The decision to duplicate or not really needs to conscious since it a 
real impact
      * in terms of thread safety so don't expose this constructor with an 
implicit default.
      */
-    private NIODataInputStream(ByteBuffer buf)
+    protected NIODataInputStream(ByteBuffer buf)
     {
         this(buf, false);
     }
 
-    private static ByteBuffer slice(byte buffer[], int offset, int length)
-    {
-        ByteBuffer buf = ByteBuffer.wrap(buffer);
-        if (offset > 0 || length < buf.capacity())
-        {
-            buf.position(offset);
-            buf.limit(offset + length);
-            buf = buf.slice();
-        }
-        return buf;
-    }
-
-    public NIODataInputStream(byte buffer[], int offset, int length)
-    {
-        this(slice(buffer, offset, length));
-    }
-
-    public NIODataInputStream(byte buffer[])
-    {
-        this(ByteBuffer.wrap(buffer));
-    }
-
     @Override
     public void readFully(byte[] b) throws IOException
     {
@@ -185,7 +159,7 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
     /*
      * Refill the buffer, preserving any unread bytes remaining in the buffer
      */
-    private int readNext() throws IOException
+    protected int readNext() throws IOException
     {
         Preconditions.checkState(buf.remaining() != buf.capacity());
         assert(buf.remaining() < 9);
@@ -204,9 +178,12 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
         }
         else if (buf.hasRemaining())
         {
-            ByteBuffer dup = buf.duplicate();
+            //FastByteOperations.copy failed to do the copy so inline a simple 
one here
+            int position = buf.position();
+            int remaining  = buf.remaining();
             buf.clear();
-            buf.put(dup);
+            for (int ii = 0; ii < remaining; ii++)
+                buf.put(buf.get(position + ii));
         }
         else
         {
@@ -223,55 +200,32 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
         return read;
     }
 
-   /*
-    * Read at least minimum bytes and throw EOF if that fails
-    */
-    private void readMinimum(int attempt, int require) throws IOException
+    /*
+     * Read the minimum number of bytes and throw EOF if the minimum could not 
be read
+     */
+    private void readMinimum(int minimum) throws IOException
     {
         assert(buf.remaining() < 8);
-        int remaining;
-        while ((remaining = buf.remaining()) < attempt)
+        while (buf.remaining() < minimum)
         {
             int read = readNext();
             if (read == -1)
             {
-                if (remaining < require)
-                {
-                    //DataInputStream consumes the bytes even if it doesn't 
get the entire value, match the behavior here
-                    buf.position(0);
-                    buf.limit(0);
-                    throw new EOFException();
-                }
+                //DataInputStream consumes the bytes even if it doesn't get 
the entire value, match the behavior here
+                buf.position(0);
+                buf.limit(0);
+                throw new EOFException();
             }
         }
     }
 
     /*
      * Ensure the buffer contains the minimum number of readable bytes, throws 
EOF if enough bytes aren't available
-     * Add padding if requested and return the limit of the buffer without any 
padding that is added.
-     */
-    private int prepareReadPaddedPrimitive(int minimum) throws IOException
-    {
-        int limitToSet = buf.limit();
-        int position = buf.position();
-        if (limitToSet - position < minimum)
-        {
-            readMinimum(minimum, 1);
-            limitToSet = buf.limit();
-            position = buf.position();
-            if (limitToSet - position < minimum)
-                buf.limit(position + minimum);
-        }
-        return limitToSet;
-    }
-
-    /*
-     * Ensure the buffer contains the minimum number of readable bytes, throws 
EOF if enough bytes aren't available
      */
     private void prepareReadPrimitive(int minimum) throws IOException
     {
         if (buf.remaining() < minimum)
-            readMinimum(minimum, minimum);
+            readMinimum(minimum);
     }
 
     @Override
@@ -351,23 +305,23 @@ public class NIODataInputStream extends InputStream 
implements DataInputPlus, Cl
 
     public long readUnsignedVInt() throws IOException
     {
-        byte firstByte = readByte();
+        //If 9 bytes aren't available use the slow path in VIntCoding
+        if (buf.remaining() < 9)
+            return VIntCoding.readUnsignedVInt(this);
+
+        byte firstByte = buf.get();
 
         //Bail out early if this is one byte, necessary or it fails later
         if (firstByte >= 0)
             return firstByte;
 
-        //If padding was added, the limit to set after to get rid of the 
padding
-        int limitToSet = prepareReadPaddedPrimitive(8);
+        int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
 
         int position = buf.position();
-        int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
         int extraBits = extraBytes * 8;
 
         long retval = buf.getLong(position);
         buf.position(position + extraBytes);
-        buf.limit(limitToSet);
-
 
         // truncate the bytes we read in excess of those we needed
         retval >>>= 64 - extraBits;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java 
b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 8a63a27..d3ff082 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -57,7 +57,8 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
 
 public class CommitLogStressTest
 {
@@ -462,7 +463,7 @@ public class CommitLogStressTest
                 // Skip over this mutation.
                 return;
 
-            NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, 
size);
+            DataInputPlus bufIn = new DataInputBuffer(inputBuffer, 0, size);
             Mutation mutation;
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java 
b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 9e9f68f..899fee7 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -31,8 +31,8 @@ import org.apache.cassandra.db.rows.RowStats;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.SchemaLoader;
@@ -78,7 +78,7 @@ public class PartitionTest
         DataOutputBuffer bufOut = new DataOutputBuffer();
         CachedPartition.cacheSerializer.serialize(partition, bufOut);
 
-        CachedPartition deserialized = 
CachedPartition.cacheSerializer.deserialize(new 
NIODataInputStream(bufOut.getData()));
+        CachedPartition deserialized = 
CachedPartition.cacheSerializer.deserialize(new 
DataInputBuffer(bufOut.getData()));
 
         assert deserialized != null;
         assert deserialized.metadata().cfName.equals(CF_STANDARD1);
@@ -103,7 +103,7 @@ public class PartitionTest
         DataOutputBuffer bufOut = new DataOutputBuffer();
         CachedPartition.cacheSerializer.serialize(partition, bufOut);
 
-        CachedPartition deserialized = 
CachedPartition.cacheSerializer.deserialize(new 
NIODataInputStream(bufOut.getData()));
+        CachedPartition deserialized = 
CachedPartition.cacheSerializer.deserialize(new 
DataInputBuffer(bufOut.getData()));
 
         assertEquals(partition.columns().regulars.columnCount(), 
deserialized.columns().regulars.columnCount());
         
assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java 
b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index 3c53934..d801b32 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -38,9 +38,9 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -145,7 +145,7 @@ public class ReadMessageTest
 
         rms.serialize(rm, out, MessagingService.current_version);
 
-        DataInputPlus dis = new NIODataInputStream(out.getData());
+        DataInputPlus dis = new DataInputBuffer(out.getData());
         return rms.deserialize(dis, MessagingService.current_version);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 3f1918c..994ee19 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -27,6 +27,7 @@ import org.junit.Assert;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.NIODataInputStream;
 
 /**
@@ -59,7 +60,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
     @Override
     void replayMutation(byte[] inputBuffer, int size, final long 
entryLocation, final CommitLogDescriptor desc)
     {
-        NIODataInputStream bufIn = new NIODataInputStream(inputBuffer, 0, 
size);
+        NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
         Mutation mutation;
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java 
b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
index 6a8a6d3..3191b03 100644
--- a/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
@@ -22,9 +22,9 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.NIODataInputStream;
 
 import java.net.InetAddress;
 
@@ -49,7 +49,7 @@ public class GossipDigestTest
         DataOutputBuffer output = new DataOutputBuffer();
         GossipDigest.serializer.serialize(expected, output, 
MessagingService.current_version);
 
-        DataInputPlus input = new NIODataInputStream(output.getData());
+        DataInputPlus input = new DataInputBuffer(output.getData());
         GossipDigest actual = GossipDigest.serializer.deserialize(input, 
MessagingService.current_version);
         assertEquals(0, expected.compareTo(actual));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java 
b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
index 11ff23a..3aad7e9 100644
--- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@ -745,4 +745,104 @@ public class NIODataInputStreamTest
         assertEquals(totalRead, corpus.capacity());
         assertEquals(-1, dis.read());
     }
+
+
+    @Test
+    @SuppressWarnings({ "resource"})
+    public void testVIntRemainingBytes() throws Exception
+    {
+        for(int ii = 0; ii < 10; ii++)
+        {
+            for (int zz = 0; zz < 10; zz++)
+            {
+                if (zz + ii > 10)
+                    continue;
+
+                ByteBuffer buf = ByteBuffer.allocate(10);
+                buf.position(ii);
+
+                long value = 0;
+                if (ii > 0)
+                    value = (1L << 7 * zz) - 1;
+
+                BufferedDataOutputStreamPlus out = new 
DataOutputBufferFixed(buf);
+                out.writeUnsignedVInt(value);
+
+                buf.position(ii);
+                NIODataInputStream in = new DataInputBuffer(buf, false);
+
+                assertEquals(value, in.readUnsignedVInt());
+            }
+        }
+    }
+
+    @Test
+    @SuppressWarnings({ "resource"})
+    public void testVIntSmallBuffer() throws Exception
+    {
+        for(int ii = 0; ii < 10; ii++)
+        {
+            ByteBuffer buf = ByteBuffer.allocate(Math.max(1,  ii));
+
+            long value = 0;
+            if (ii > 0)
+                value = (1L << 7 * ii) - 1;
+
+            BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buf);
+            out.writeUnsignedVInt(value);
+
+            buf.position(0);
+            NIODataInputStream in = new DataInputBuffer(buf, false);
+
+            assertEquals(value, in.readUnsignedVInt());
+
+            boolean threw = false;
+            try
+            {
+                in.readUnsignedVInt();
+            }
+            catch (EOFException e)
+            {
+                threw = true;
+            }
+            assertTrue(threw);
+        }
+    }
+
+    @Test
+    @SuppressWarnings({ "resource"})
+    public void testVIntTruncationEOF() throws Exception
+    {
+        for(int ii = 0; ii < 10; ii++)
+        {
+            ByteBuffer buf = ByteBuffer.allocate(Math.max(1,  ii));
+
+            long value = 0;
+            if (ii > 0)
+                value = (1L << 7 * ii) - 1;
+
+            BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buf);
+            out.writeUnsignedVInt(value);
+
+            buf.position(0);
+
+            ByteBuffer truncated = ByteBuffer.allocate(buf.capacity() - 1);
+            buf.limit(buf.limit() - 1);
+            truncated.put(buf);
+            truncated.flip();
+
+            NIODataInputStream in = new DataInputBuffer(truncated, false);
+
+            boolean threw = false;
+            try
+            {
+                in.readUnsignedVInt();
+            }
+            catch (EOFException e)
+            {
+                threw = true;
+            }
+            assertTrue(threw);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java 
b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
index bc23f14..7e72098 100644
--- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
@@ -30,10 +30,10 @@ import java.util.List;
 import org.junit.Test;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
 
 import static org.junit.Assert.assertEquals;
 
@@ -186,7 +186,7 @@ public class IntervalTreeTest
 
         serializer.serialize(it, out, 0);
 
-        DataInputPlus in = new NIODataInputStream(out.toByteArray());
+        DataInputPlus in = new DataInputBuffer(out.toByteArray());
 
         IntervalTree<Integer, String, Interval<Integer, String>> it2 = 
serializer.deserialize(in, 0);
         List<Interval<Integer, String>> intervals2 = new 
ArrayList<Interval<Integer, String>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java 
b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index 03c906c..edb1fb1 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.NIODataInputStream;
@@ -400,7 +401,7 @@ public class MerkleTreeTest
         MerkleTree.serializer.serialize(mt, out, 
MessagingService.current_version);
         byte[] serialized = out.toByteArray();
 
-        DataInputPlus in = new NIODataInputStream(serialized);
+        DataInputPlus in = new DataInputBuffer(serialized);
         MerkleTree restored = MerkleTree.serializer.deserialize(in, 
MessagingService.current_version);
 
         assertHashEquals(initialhash, restored.hash(full));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4c9eaeb/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java 
b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index 0ea25da..b6b1882 100644
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -17,15 +17,13 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.junit.Test;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.NIODataInputStream;
 
 import static org.junit.Assert.assertEquals;
 
@@ -103,7 +101,7 @@ public class StreamingHistogramTest
         StreamingHistogram.serializer.serialize(hist, out);
         byte[] bytes = out.toByteArray();
 
-        StreamingHistogram deserialized = 
StreamingHistogram.serializer.deserialize(new NIODataInputStream(bytes));
+        StreamingHistogram deserialized = 
StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes));
 
         // deserialized histogram should have following values
         Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);

Reply via email to