Repository: cassandra
Updated Branches:
  refs/heads/trunk 6092b01e3 -> 03f72acd5


add vInt encoding to Data(Input|Output)Plus

patch by ariel and benedict for CASSANDRA-9499


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1491a40b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1491a40b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1491a40b

Branch: refs/heads/trunk
Commit: 1491a40b7b4ea2723bcf22d870ee514b47ea901b
Parents: 6092b01
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Mon Jun 15 14:31:03 2015 -0400
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Jul 2 09:39:57 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 NOTICE.txt                                      |   5 +
 src/java/org/apache/cassandra/db/TypeSizes.java |  23 +--
 .../cassandra/io/util/AbstractDataInput.java    |  34 ++++
 .../io/util/BufferedDataOutputStreamPlus.java   |  23 ++-
 .../cassandra/io/util/DataOutputPlus.java       |  19 ++
 .../cassandra/io/util/NIODataInputStream.java   |  86 +++++++--
 .../io/util/UnbufferedDataOutputStreamPlus.java |   1 -
 .../utils/vint/EncodedDataInputStream.java      |  47 +----
 .../utils/vint/EncodedDataOutputStream.java     |  35 +---
 .../apache/cassandra/utils/vint/VIntCoding.java | 183 +++++++++++++++++++
 .../io/util/BufferedDataOutputStreamTest.java   |  95 +++++++++-
 .../io/util/NIODataInputStreamTest.java         | 120 ++++++++++--
 .../cassandra/utils/vint/VIntCodingTest.java    |  85 +++++++++
 14 files changed, 632 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6895395..7561e4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,7 +10,7 @@
  * Change default garbage collector to G1 (CASSANDRA-7486)
  * Populate TokenMetadata early during startup (CASSANDRA-9317)
  * undeprecate cache recentHitRate (CASSANDRA-6591)
-
+ * Add support for selectively varint encoding fields (CASSANDRA-9499)
 
 2.2.0-rc2
  * (cqlsh) Allow setting the initial connection timeout (CASSANDRA-9601)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index a71d822..0ad792f 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -74,3 +74,8 @@ OHC
 (https://github.com/snazy/ohc)
 Java Off-Heap-Cache, licensed under APLv2
 Copyright 2014-2015 Robert Stupp, Germany.
+
+Protocol buffers for varint encoding
+https://developers.google.com/protocol-buffers/
+Copyright 2008 Google Inc.  All rights reserved.
+BSD 3-clause

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java 
b/src/java/org/apache/cassandra/db/TypeSizes.java
index efae762..79d5774 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import org.apache.cassandra.utils.vint.VIntCoding;
+
 public abstract class TypeSizes
 {
     public static final TypeSizes NATIVE = new NativeDBTypeSizes();
@@ -106,26 +108,7 @@ public abstract class TypeSizes
 
         public int sizeofVInt(long i)
         {
-            if (i >= -112 && i <= 127)
-                return 1;
-
-            int size = 0;
-            int len = -112;
-            if (i < 0)
-            {
-                i ^= -1L; // take one's complement'
-                len = -120;
-            }
-            long tmp = i;
-            while (tmp != 0)
-            {
-                tmp = tmp >> 8;
-                len--;
-            }
-            size++;
-            len = (len < -120) ? -(len + 120) : -(len + 112);
-            size += len;
-            return size;
+            return VIntCoding.computeVIntSize(i);
         }
 
         public int sizeof(long i)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java 
b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
index 588540d..935a06d 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 
+import org.apache.cassandra.utils.vint.VIntCoding;
+
 public abstract class AbstractDataInput extends InputStream implements 
DataInput
 {
     public abstract void seek(long position) throws IOException;
@@ -265,6 +267,38 @@ public abstract class AbstractDataInput extends 
InputStream implements DataInput
     }
 
     /**
+     * Reads a varint encoded integer from the current position in this file. 
Blocks until
+     * the end of the varint is reached, the end of the file is reached, or an 
exception is
+     * thrown.
+     *
+     * @return the next varint value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public long readVInt() throws IOException
+    {
+        return VIntCoding.readVInt(this);
+    }
+
+    /**
+     * Reads an unsigned varint encoded integer from the current position in 
this file. Blocks until
+     * the end of the varint is reached, the end of the file is reached, or an 
exception is
+     * thrown.
+     *
+     * @return the next unsigned varint value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public long readUnsignedVInt() throws IOException
+    {
+        return VIntCoding.readUnsignedVInt(this);
+    }
+
+    /**
      * Reads a 16-bit short from the current position in this file. Blocks 
until
      * two bytes have been read, the end of the file is reached or an exception
      * is thrown.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index 5669a8d..b6f3231 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -29,7 +29,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.utils.memory.MemoryUtil;
-
+import org.apache.cassandra.utils.vint.VIntCoding;
 
 /**
  * An implementation of the DataOutputStreamPlus interface using a ByteBuffer 
to stage writes
@@ -214,6 +214,27 @@ public class BufferedDataOutputStreamPlus extends 
DataOutputStreamPlus
     }
 
     @Override
+    public void writeVInt(long value) throws IOException
+    {
+        writeUnsignedVInt(VIntCoding.encodeZigZag64(value));
+    }
+
+    @Override
+    public void writeUnsignedVInt(long value) throws IOException
+    {
+        int size = VIntCoding.computeUnsignedVIntSize(value);
+        if (size == 1)
+        {
+            ensureRemaining(1);
+            buffer.put((byte) value);
+            return;
+        }
+
+        ensureRemaining(size);
+        buffer.put(VIntCoding.encodeVInt(value, size), 0, size);
+    }
+
+    @Override
     public void writeFloat(float v) throws IOException
     {
         ensureRemaining(4);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java 
b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index f63c1e5..f6a3648 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
+import org.apache.cassandra.utils.vint.VIntCoding;
+
 import com.google.common.base.Function;
 
 /**
@@ -40,4 +42,21 @@ public interface DataOutputPlus extends DataOutput
      * and forget to flush
      */
     <R> R applyToChannel(Function<WritableByteChannel, R> c) throws 
IOException;
+
+    default void writeVInt(long i) throws IOException
+    {
+        VIntCoding.writeVInt(i, this);
+    }
+
+    /**
+     * Think hard before opting for an unsigned encoding. Is this going to 
bite someone because some day
+     * they might need to pass in a sentinel value using negative numbers? Is 
the risk worth it
+     * to save a few bytes?
+     *
+     * Signed, not a fan of unsigned values in protocols and formats
+     */
+    default void writeUnsignedVInt(long i) throws IOException
+    {
+        VIntCoding.writeUnsignedVInt(i, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java 
b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
index 94ba9ed..4816379 100644
--- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -27,6 +27,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 
+import org.apache.cassandra.utils.vint.VIntCoding;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -50,7 +52,7 @@ public class NIODataInputStream extends InputStream 
implements DataInput, Closea
     public NIODataInputStream(ReadableByteChannel rbc, int bufferSize)
     {
         Preconditions.checkNotNull(rbc);
-        Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be 
large enough to accomadate a long/double");
+        Preconditions.checkArgument(bufferSize >= 9, "Buffer size must be 
large enough to accomadate a varint");
         this.rbc = rbc;
         buf = ByteBuffer.allocateDirect(bufferSize);
         buf.position(0);
@@ -116,7 +118,7 @@ public class NIODataInputStream extends InputStream 
implements DataInput, Closea
     private int readNext() throws IOException
     {
         Preconditions.checkState(buf.remaining() != buf.capacity());
-        assert(buf.remaining() < 8);
+        assert(buf.remaining() < 9);
 
         /*
          * If there is data already at the start of the buffer, move the 
position to the end
@@ -151,31 +153,55 @@ public class NIODataInputStream extends InputStream 
implements DataInput, Closea
         return read;
     }
 
-    /*
-     * Read at least minimum bytes and throw EOF if that fails
-     */
-    private void readMinimum(int minimum) throws IOException
+   /*
+    * Read at least minimum bytes and throw EOF if that fails
+    */
+    private void readMinimum(int attempt, int require) throws IOException
     {
         assert(buf.remaining() < 8);
-        while (buf.remaining() < minimum)
+        int remaining;
+        while ((remaining = buf.remaining()) < attempt)
         {
             int read = readNext();
             if (read == -1)
             {
-                //DataInputStream consumes the bytes even if it doesn't get 
the entire value, match the behavior here
-                buf.position(0);
-                buf.limit(0);
-                throw new EOFException();
+                if (remaining < require)
+                {
+                    //DataInputStream consumes the bytes even if it doesn't 
get the entire value, match the behavior here
+                    buf.position(0);
+                    buf.limit(0);
+                    throw new EOFException();
+                }
             }
         }
     }
 
     /*
-     * Ensure the buffer contains the minimum number of readable bytes
+     * Ensure the buffer contains the minimum number of readable bytes, throws 
EOF if enough bytes aren't available
+     * Add padding if requested and return the limit of the buffer without any 
padding that is added.
+     */
+    private int prepareReadPaddedPrimitive(int minimum) throws IOException
+    {
+        int limitToSet = buf.limit();
+        int position = buf.position();
+        if (limitToSet - position < minimum)
+        {
+            readMinimum(minimum, 1);
+            limitToSet = buf.limit();
+            position = buf.position();
+            if (limitToSet - position < minimum)
+                buf.limit(position + minimum);
+        }
+        return limitToSet;
+    }
+
+    /*
+     * Ensure the buffer contains the minimum number of readable bytes, throws 
EOF if enough bytes aren't available
      */
     private void prepareReadPrimitive(int minimum) throws IOException
     {
-        if (buf.remaining() < minimum) readMinimum(minimum);
+        if (buf.remaining() < minimum)
+            readMinimum(minimum, minimum);
     }
 
     @Override
@@ -248,6 +274,40 @@ public class NIODataInputStream extends InputStream 
implements DataInput, Closea
         return buf.getLong();
     }
 
+    public long readVInt() throws IOException
+    {
+        return VIntCoding.decodeZigZag64(readUnsignedVInt());
+    }
+
+    public long readUnsignedVInt() throws IOException
+    {
+        byte firstByte = readByte();
+
+        //Bail out early if this is one byte, necessary or it fails later
+        if (firstByte >= 0)
+            return firstByte;
+
+        //If padding was added, the limit to set after to get rid of the 
padding
+        int limitToSet = prepareReadPaddedPrimitive(8);
+
+        int position = buf.position();
+        int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
+        int extraBits = extraBytes * 8;
+
+        long retval = buf.getLong(position);
+        buf.position(position + extraBytes);
+        buf.limit(limitToSet);
+
+
+        // truncate the bytes we read in excess of those we needed
+        retval >>>= 64 - extraBits;
+        // remove the non-value bits from the first byte
+        firstByte &= VIntCoding.firstByteValueMask(extraBytes);
+        // shift the first byte up to its correct position
+        retval |= (long) firstByte << extraBits;
+        return retval;
+    }
+
     @Override
     public float readFloat() throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
index 9137ba2..b8f0884 100644
--- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -23,7 +23,6 @@ import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java 
b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
index bee8ab0..663e176 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
@@ -27,6 +27,9 @@ import org.apache.cassandra.io.util.AbstractDataInput;
  * https://developers.google.com/protocol-buffers/docs/encoding#varints
  *
  * Should be used with EncodedDataOutputStream
+ *
+ * @deprecated Where possible use NIODataInputStream which has a more 
efficient implementation of buffered input
+ *             for most read methods
  */
 public class EncodedDataInputStream extends AbstractDataInput implements 
DataInput
 {
@@ -71,55 +74,21 @@ public class EncodedDataInputStream extends 
AbstractDataInput implements DataInp
 
     public int readInt() throws IOException
     {
-        return (int) vintDecode();
+        return (int) VIntCoding.readVInt(input);
     }
 
     public long readLong() throws IOException
     {
-        return vintDecode();
+        return VIntCoding.readVInt(input);
     }
 
     public int readUnsignedShort() throws IOException
     {
-        return (short) vintDecode();
-    }
-    
-    public short readShort() throws IOException
-    {
-        return (short) vintDecode();
+        return (short) VIntCoding.readVInt(input);
     }
 
-    private long vintDecode() throws IOException
-    {
-        byte firstByte = input.readByte();
-        int len = vintDecodeSize(firstByte);
-        if (len == 1)
-            return firstByte;
-        long i = 0;
-        for (int idx = 0; idx < len - 1; idx++)
-        {
-            byte b = input.readByte();
-            i = i << 8;
-            i = i | (b & 0xFF);
-        }
-        return (vintIsNegative(firstByte) ? (i ^ -1L) : i);
-    }
-
-    private int vintDecodeSize(byte value)
-    {
-        if (value >= -112)
-        {
-            return 1;
-        }
-        else if (value < -120)
-        {
-            return -119 - value;
-        }
-        return -111 - value;
-    }
-
-    private boolean vintIsNegative(byte value)
+    public short readShort() throws IOException
     {
-        return value < -120 || (value >= -112 && value < 0);
+        return (short) VIntCoding.readVInt(input);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java 
b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
index fe43ff2..7f7613f 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -54,45 +54,16 @@ public class EncodedDataOutputStream extends 
UnbufferedDataOutputStreamPlus
 
     public void writeInt(int v) throws IOException
     {
-        vintEncode(v);
+        writeVInt(v);
     }
 
     public void writeLong(long v) throws IOException
     {
-        vintEncode(v);
+        writeVInt(v);
     }
 
     public void writeShort(int v) throws IOException
     {
-        vintEncode(v);
-    }
-
-    private void vintEncode(long i) throws IOException
-    {
-        if (i >= -112 && i <= 127)
-        {
-            writeByte((byte) i);
-            return;
-        }
-        int len = -112;
-        if (i < 0)
-        {
-            i ^= -1L; // take one's complement'
-            len = -120;
-        }
-        long tmp = i;
-        while (tmp != 0)
-        {
-            tmp = tmp >> 8;
-            len--;
-        }
-        writeByte((byte) len);
-        len = (len < -120) ? -(len + 120) : -(len + 112);
-        for (int idx = len; idx != 0; idx--)
-        {
-            int shiftbits = (idx - 1) * 8;
-            long mask = 0xFFL << shiftbits;
-            writeByte((byte) ((i & mask) >> shiftbits));
-        }
+        writeVInt(v);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java 
b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
new file mode 100644
index 0000000..0ac4124
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Protocol Buffers - Google's data interchange format
+// Copyright 2008 Google Inc.  All rights reserved.
+// https://developers.google.com/protocol-buffers/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+package org.apache.cassandra.utils.vint;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import net.nicoulaj.compilecommand.annotations.Inline;
+
+/**
+ * Borrows idea from
+ * https://developers.google.com/protocol-buffers/docs/encoding#varints
+ */
+public class VIntCoding
+{
+
+    public static long readUnsignedVInt(DataInput input) throws IOException {
+        int firstByte = input.readByte();
+
+        //Bail out early if this is one byte, necessary or it fails later
+        if (firstByte >= 0)
+            return firstByte;
+
+        int size = numberOfExtraBytesToRead(firstByte);
+        long retval = firstByte & firstByteValueMask(size);;
+        for (int ii = 0; ii < size; ii++)
+        {
+            byte b = input.readByte();
+            retval <<= 8;
+            retval |= b & 0xff;
+        }
+
+        return retval;
+    }
+
+    public static long readVInt(DataInput input) throws IOException {
+        return decodeZigZag64(readUnsignedVInt(input));
+    }
+
+    // & this with the first byte to give the value part for a given 
extraBytesToRead encoded in the byte
+    public static int firstByteValueMask(int extraBytesToRead)
+    {
+        // by including the known 0bit in the mask, we can use this for 
encodeExtraBytesToRead
+        return 0xff >> extraBytesToRead;
+    }
+
+    public static int encodeExtraBytesToRead(int extraBytesToRead)
+    {
+        // because we have an extra bit in the value mask, we just need to 
invert it
+        return ~firstByteValueMask(extraBytesToRead);
+    }
+
+    public static int numberOfExtraBytesToRead(int firstByte)
+    {
+        // we count number of set upper bits; so if we simply invert all of 
the bits, we're golden
+        // this is aided by the fact that we only work with negative numbers, 
so when upcast to an int all
+        // of the new upper bits are also set, so by inverting we set all of 
them to zero
+        return Integer.numberOfLeadingZeros(~firstByte) - 24;
+    }
+
+    protected static final ThreadLocal<byte[]> encodingBuffer = new 
ThreadLocal<byte[]>()
+    {
+        @Override
+        public byte[] initialValue()
+        {
+            return new byte[9];
+        }
+    };
+
+    public static void writeUnsignedVInt(long value, DataOutput output) throws 
IOException {
+        int size = VIntCoding.computeUnsignedVIntSize(value);
+        if (size == 1)
+        {
+            output.write((int)value);
+            return;
+        }
+
+        output.write(VIntCoding.encodeVInt(value, size), 0, size);
+    }
+
+    @Inline
+    public static byte[] encodeVInt(long value, int size) {
+        byte encodingSpace[] = encodingBuffer.get();
+        int extraBytes = size - 1;
+
+        for (int i = extraBytes ; i >= 0; --i)
+        {
+            encodingSpace[i] = (byte) value;
+            value >>= 8;
+        }
+        encodingSpace[0] |= VIntCoding.encodeExtraBytesToRead(extraBytes);
+        return encodingSpace;
+    }
+
+    public static void writeVInt(long value, DataOutput output) throws 
IOException {
+        writeUnsignedVInt(encodeZigZag64(value), output);
+    }
+
+    /**
+     * Decode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n An unsigned 64-bit integer, stored in a signed int because
+     *          Java has no explicit unsigned support.
+     * @return A signed 64-bit integer.
+     */
+    public static long decodeZigZag64(final long n) {
+        return (n >>> 1) ^ -(n & 1);
+    }
+
+    /**
+     * Encode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n A signed 64-bit integer.
+     * @return An unsigned 64-bit integer, stored in a signed int because
+     *         Java has no explicit unsigned support.
+     */
+    public static long encodeZigZag64(final long n) {
+        // Note:  the right-shift must be arithmetic
+        return (n << 1) ^ (n >> 63);
+    }
+
+    /** Compute the number of bytes that would be needed to encode a varint. */
+    public static int computeVIntSize(final long param) {
+        return computeUnsignedVIntSize(encodeZigZag64(param));
+    }
+
+    /** Compute the number of bytes that would be needed to encode an unsigned 
varint. */
+    public static int computeUnsignedVIntSize(final long value) {
+        int magnitude = Long.numberOfLeadingZeros(value | 1); // | with 1 to 
ensure magntiude <= 63, so (63 - 1) / 7 <= 8
+        return 9 - ((magnitude - 1) / 7);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java 
b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index d0819fe..acef1ec 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -8,10 +8,16 @@ import java.lang.reflect.Field;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.cassandra.utils.vint.VIntCoding;
 import org.junit.Test;
 
+import com.google.common.primitives.UnsignedBytes;
+import com.google.common.primitives.UnsignedInteger;
+import com.google.common.primitives.UnsignedLong;
+
 import static org.junit.Assert.*;
 
 public class BufferedDataOutputStreamTest
@@ -185,7 +191,7 @@ public class BufferedDataOutputStreamTest
         int action = 0;
         while (generated.size() < 1024 * 1024 * 8)
         {
-            action = r.nextInt(19);
+            action = r.nextInt(21);
 
             //System.out.println("Action " + action + " iteration " + 
iteration);
             iteration++;
@@ -362,6 +368,20 @@ public class BufferedDataOutputStreamTest
                 }
                 break;
             }
+            case 19:
+            {
+                long val = r.nextLong();
+                VIntCoding.writeVInt(val, dosp);
+                ndosp.writeVInt(val);
+                break;
+            }
+            case 20:
+            {
+                long val = r.nextLong();
+                VIntCoding.writeUnsignedVInt(val, dosp);
+                ndosp.writeUnsignedVInt(val);
+                break;
+            }
             default:
                 fail("Shouldn't reach here");
             }
@@ -442,4 +462,77 @@ public class BufferedDataOutputStreamTest
         }
         return count;
     }
+
+    /*
+     * Add values to the array with a bit set in every position
+     */
+    public static long[] enrich(long vals[])
+    {
+        long retval[] = Arrays.copyOf(vals, vals.length + 64);
+        for (int ii = 0; ii < 64; ii++)
+            retval[vals.length + ii] = 1L << ii;
+        return retval;
+   }
+
+    @Test
+    public void testVInt() throws Exception
+    {
+        setUp();
+        long testValues[] = new long[] {
+                0, 1, -1
+                ,Long.MIN_VALUE, Long.MIN_VALUE + 1, Long.MAX_VALUE, 
Long.MAX_VALUE - 1
+                ,Integer.MIN_VALUE, Integer.MIN_VALUE + 1, Integer.MAX_VALUE, 
Integer.MAX_VALUE - 1
+                ,Short.MIN_VALUE, Short.MIN_VALUE + 1, Short.MAX_VALUE, 
Short.MAX_VALUE - 1
+                ,Byte.MIN_VALUE, Byte.MIN_VALUE + 1, Byte.MAX_VALUE, 
Byte.MAX_VALUE - 1 };
+        testValues = enrich(testValues);
+
+        int expectedSize = 0;
+        for (long v : testValues)
+        {
+            expectedSize += VIntCoding.computeVIntSize(v);
+            ndosp.writeVInt(v);
+        }
+
+        ndosp.flush();
+
+        @SuppressWarnings("resource")
+        ByteBufferDataInput bbdi = new 
ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0);
+
+        assertEquals(expectedSize, generated.toByteArray().length);
+
+        for (long v : testValues)
+        {
+            assertEquals(v, bbdi.readVInt());
+        }
+    }
+
+    @Test
+    public void testUnsignedVInt() throws Exception
+    {
+        setUp();
+        long testValues[] = new long[] { //-1 };
+                0, 1
+                , UnsignedLong.MAX_VALUE.longValue(), 
UnsignedLong.MAX_VALUE.longValue() - 1, UnsignedLong.MAX_VALUE.longValue() + 1
+                , UnsignedInteger.MAX_VALUE.longValue(), 
UnsignedInteger.MAX_VALUE.longValue() - 1, 
UnsignedInteger.MAX_VALUE.longValue() + 1
+                , UnsignedBytes.MAX_VALUE, UnsignedBytes.MAX_VALUE - 1, 
UnsignedBytes.MAX_VALUE + 1
+                , 65536, 65536 - 1, 65536 + 1 };
+        testValues = enrich(testValues);
+
+        int expectedSize = 0;
+        for (long v : testValues)
+        {
+            expectedSize += VIntCoding.computeUnsignedVIntSize(v);
+            ndosp.writeUnsignedVInt(v);
+        }
+
+        ndosp.flush();
+
+        @SuppressWarnings("resource")
+        ByteBufferDataInput bbdi = new 
ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0);
+
+        assertEquals(expectedSize, generated.toByteArray().length);
+
+        for (long v : testValues)
+            assertEquals(v, bbdi.readUnsignedVInt());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java 
b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
index a19346b..11ff23a 100644
--- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@ -18,6 +18,9 @@ import org.apache.cassandra.io.util.NIODataInputStream;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.primitives.UnsignedBytes;
+import com.google.common.primitives.UnsignedInteger;
+import com.google.common.primitives.UnsignedLong;
 
 import static org.junit.Assert.*;
 
@@ -126,7 +129,7 @@ public class NIODataInputStreamTest
 
     }
 
-    NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 
8);
+    NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 
9);
 
     @Test(expected = IOException.class)
     public void testResetThrows() throws Exception
@@ -212,7 +215,7 @@ public class NIODataInputStreamTest
         fos.write(new byte[10]);
         fos.seek(0);
 
-        is = new NIODataInputStream(fos.getChannel(), 8);
+        is = new NIODataInputStream(fos.getChannel(), 9);
 
         int remaining = 10;
         assertEquals(10, is.available());
@@ -226,6 +229,31 @@ public class NIODataInputStreamTest
         assertEquals(0, is.available());
     }
 
+    private static ReadableByteChannel wrap(final byte bytes[])
+    {
+        final ByteBuffer buf = ByteBuffer.wrap(bytes);
+        return new ReadableByteChannel()
+        {
+
+            @Override
+            public boolean isOpen() {return false;}
+
+            @Override
+            public void close() throws IOException {}
+
+            @Override
+            public int read(ByteBuffer dst) throws IOException
+            {
+                int read = Math.min(dst.remaining(), buf.remaining());
+                buf.limit(buf.position() + read);
+                dst.put(buf);
+                buf.limit(buf.capacity());
+                return read == 0 ? -1 : read;
+            }
+
+        };
+    }
+
     @SuppressWarnings("resource")
     @Test
     public void testReadUTF() throws Exception
@@ -244,28 +272,84 @@ public class NIODataInputStreamTest
         daos.writeUTF(BufferedDataOutputStreamTest.threeByte);
         daos.writeUTF(BufferedDataOutputStreamTest.fourByte);
 
-        NIODataInputStream is = new NIODataInputStream(new 
ReadableByteChannel()
+        NIODataInputStream is = new 
NIODataInputStream(wrap(baos.toByteArray()), 4096);
+
+        assertEquals(simple, is.readUTF());
+        assertEquals(BufferedDataOutputStreamTest.twoByte, is.readUTF());
+        assertEquals(BufferedDataOutputStreamTest.threeByte, is.readUTF());
+        assertEquals(BufferedDataOutputStreamTest.fourByte, is.readUTF());
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    public void testReadVInt() throws Exception {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStreamPlus daos = new WrappedDataOutputStreamPlus(baos);
+
+        long values[] = new long[] {
+                0, 1, -1,
+                Long.MIN_VALUE, Long.MIN_VALUE + 1, Long.MAX_VALUE, 
Long.MAX_VALUE - 1,
+                Integer.MIN_VALUE, Integer.MIN_VALUE + 1, Integer.MAX_VALUE, 
Integer.MAX_VALUE - 1,
+                Short.MIN_VALUE, Short.MIN_VALUE + 1, Short.MAX_VALUE, 
Short.MAX_VALUE - 1,
+                Byte.MIN_VALUE, Byte.MIN_VALUE + 1, Byte.MAX_VALUE, 
Byte.MAX_VALUE - 1 };
+        values = BufferedDataOutputStreamTest.enrich(values);
+
+        for (long v : values)
+            daos.writeVInt(v);
+
+        daos.flush();
+
+        NIODataInputStream is = new 
NIODataInputStream(wrap(baos.toByteArray()), 9);
+
+        for (long v : values)
+            assertEquals(v, is.readVInt());
+
+        boolean threw = false;
+        try
         {
+            is.readVInt();
+        }
+        catch (EOFException e)
+        {
+            threw = true;
+        }
+        assertTrue(threw);
+    }
 
-            @Override
-            public boolean isOpen() {return false;}
+    @SuppressWarnings("resource")
+    @Test
+    public void testReadUnsignedVInt() throws Exception {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStreamPlus daos = new WrappedDataOutputStreamPlus(baos);
 
-            @Override
-            public void close() throws IOException {}
+        long values[] = new long[] {
+                0, 1
+                , UnsignedLong.MAX_VALUE.longValue(), 
UnsignedLong.MAX_VALUE.longValue() - 1, UnsignedLong.MAX_VALUE.longValue() + 1
+                , UnsignedInteger.MAX_VALUE.longValue(), 
UnsignedInteger.MAX_VALUE.longValue() - 1, 
UnsignedInteger.MAX_VALUE.longValue() + 1
+                , UnsignedBytes.MAX_VALUE, UnsignedBytes.MAX_VALUE - 1, 
UnsignedBytes.MAX_VALUE + 1
+                , 65536, 65536 - 1, 65536 + 1 };
+        values = BufferedDataOutputStreamTest.enrich(values);
 
-            @Override
-            public int read(ByteBuffer dst) throws IOException
-            {
-                dst.put(baos.toByteArray());
-                return baos.toByteArray().length;
-            }
+        for (long v : values)
+            daos.writeUnsignedVInt(v);
 
-        }, 4096);
+        daos.flush();
 
-        assertEquals(simple, is.readUTF());
-        assertEquals(BufferedDataOutputStreamTest.twoByte, is.readUTF());
-        assertEquals(BufferedDataOutputStreamTest.threeByte, is.readUTF());
-        assertEquals(BufferedDataOutputStreamTest.fourByte, is.readUTF());
+        NIODataInputStream is = new 
NIODataInputStream(wrap(baos.toByteArray()), 9);
+
+        for (long v : values)
+            assertEquals(v, is.readUnsignedVInt());
+
+        boolean threw = false;
+        try
+        {
+            is.readUnsignedVInt();
+        }
+        catch (EOFException e)
+        {
+            threw = true;
+        }
+        assertTrue(threw);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java 
b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
new file mode 100644
index 0000000..f08b181
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.vint;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class VIntCodingTest
+{
+
+    @Test
+    public void testComputeSize() throws Exception
+    {
+        assertEncodedAtExpectedSize(0L, 1);
+
+        for (int size = 1 ; size < 8 ; size++)
+        {
+            assertEncodedAtExpectedSize((1L << 7 * size) - 1, size);
+            assertEncodedAtExpectedSize(1L << 7 * size, size + 1);
+        }
+        Assert.assertEquals(9, 
VIntCoding.computeUnsignedVIntSize(Long.MAX_VALUE));
+    }
+
+    private void assertEncodedAtExpectedSize(long value, int expectedSize) 
throws Exception
+    {
+        Assert.assertEquals(expectedSize, 
VIntCoding.computeUnsignedVIntSize(value));
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        VIntCoding.writeUnsignedVInt(value, dos);
+        dos.flush();
+        Assert.assertEquals( expectedSize, baos.toByteArray().length);
+
+        DataOutputBuffer dob = new DataOutputBuffer();
+        dob.writeUnsignedVInt(value);
+        Assert.assertEquals( expectedSize, dob.buffer().remaining());
+        dob.close();
+    }
+
+    @Test
+    public void testReadExtraBytesCount()
+    {
+        for (int i = 1 ; i < 8 ; i++)
+            Assert.assertEquals(i, VIntCoding.numberOfExtraBytesToRead((byte) 
((0xFF << (8 - i)) & 0xFF)));
+    }
+
+    /*
+     * Quick sanity check that 1 byte encodes up to 127 as expected
+     */
+    @Test
+    public void testOneByteCapacity() throws Exception {
+        int biggestOneByte = 127;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        VIntCoding.writeUnsignedVInt(biggestOneByte, dos);
+        dos.flush();
+        Assert.assertEquals( 1, baos.toByteArray().length);
+
+        DataOutputBuffer dob = new DataOutputBuffer();
+        dob.writeUnsignedVInt(biggestOneByte);
+        Assert.assertEquals( 1, dob.buffer().remaining());
+        dob.close();
+    }
+}

Reply via email to