Repository: parquet-mr
Updated Branches:
  refs/heads/master ad80bfe55 -> 8bbc6cb95


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java 
b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
index 6e593c2..1512a24 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -23,11 +23,11 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
  */
 abstract public class BytesInput {
   private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class);
-  private static final boolean DEBUG = false;//Log.DEBUG;
   private static final EmptyBytesInput EMPTY_BYTES_INPUT = new 
EmptyBytesInput();
 
   /**
@@ -75,14 +74,27 @@ abstract public class BytesInput {
   public static BytesInput from(InputStream in, int bytes) {
     return new StreamBytesInput(in, bytes);
   }
-  
+
   /**
-   * @param buffer
-   * @param length number of bytes to read
-   * @return a BytesInput that will read the given bytes from the ByteBuffer
+   * @param buffers
+   * @return a BytesInput that will read the given bytes from the ByteBuffers
    */
-  public static BytesInput from(ByteBuffer buffer, int offset, int length) {
-    return new ByteBufferBytesInput(buffer, offset, length);
+  public static BytesInput from(ByteBuffer... buffers) {
+    if (buffers.length == 1) {
+      return new ByteBufferBytesInput(buffers[0]);
+    }
+    return new BufferListBytesInput(Arrays.asList(buffers));
+  }
+
+  /**
+   * @param buffers
+   * @return a BytesInput that will read the given bytes from the ByteBuffers
+   */
+  public static BytesInput from(List<ByteBuffer> buffers) {
+    if (buffers.size() == 1) {
+      return new ByteBufferBytesInput(buffers.get(0));
+    }
+    return new BufferListBytesInput(buffers);
   }
 
   /**
@@ -208,8 +220,8 @@ abstract public class BytesInput {
    * @return a new InputStream materializing the contents of this input
    * @throws IOException
    */
-  public InputStream toInputStream() throws IOException {
-    return new ByteBufferInputStream(toByteBuffer());
+  public ByteBufferInputStream toInputStream() throws IOException {
+    return ByteBufferInputStream.wrap(toByteBuffer());
   }
 
   /**
@@ -439,7 +451,7 @@ abstract public class BytesInput {
     }
 
     public ByteBuffer toByteBuffer() throws IOException {
-      return ByteBuffer.wrap(in, offset, length);
+      return java.nio.ByteBuffer.wrap(in, offset, length);
     }
 
     @Override
@@ -448,34 +460,31 @@ abstract public class BytesInput {
     }
 
   }
-  
-  private static class ByteBufferBytesInput extends BytesInput {
-    
-    private final ByteBuffer byteBuf;
-    private final int length;
-    private final int offset;
 
-    private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
-      this.byteBuf = byteBuf;
-      this.offset = offset;
-      this.length = length;
+  private static class BufferListBytesInput extends BytesInput {
+    private final List<ByteBuffer> buffers;
+    private final long length;
+
+    public BufferListBytesInput(List<ByteBuffer> buffers) {
+      this.buffers = buffers;
+      long totalLen = 0;
+      for (ByteBuffer buffer : buffers) {
+        totalLen += buffer.remaining();
+      }
+      this.length = totalLen;
     }
 
     @Override
     public void writeAllTo(OutputStream out) throws IOException {
-      final WritableByteChannel outputChannel = Channels.newChannel(out);
-      byteBuf.position(offset);
-      ByteBuffer tempBuf = byteBuf.slice();
-      tempBuf.limit(length);
-      outputChannel.write(tempBuf);
+      WritableByteChannel channel = Channels.newChannel(out);
+      for (ByteBuffer buffer : buffers) {
+        channel.write(buffer.duplicate());
+      }
     }
-    
+
     @Override
-    public ByteBuffer toByteBuffer() throws IOException {
-      byteBuf.position(offset);
-      ByteBuffer buf = byteBuf.slice();
-      buf.limit(length);
-      return buf;
+    public ByteBufferInputStream toInputStream() {
+      return ByteBufferInputStream.wrap(buffers);
     }
 
     @Override
@@ -483,4 +492,27 @@ abstract public class BytesInput {
       return length;
     }
   }
+
+  private static class ByteBufferBytesInput extends BytesInput {
+    private final ByteBuffer buffer;
+
+    private ByteBufferBytesInput(ByteBuffer buffer) {
+      this.buffer = buffer;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      Channels.newChannel(out).write(buffer.duplicate());
+    }
+
+    @Override
+    public ByteBufferInputStream toInputStream() {
+      return ByteBufferInputStream.wrap(buffer);
+    }
+
+    @Override
+    public long size() {
+      return buffer.remaining();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
new file mode 100644
index 0000000..20a142b
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
@@ -0,0 +1,382 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class MultiBufferInputStream extends ByteBufferInputStream {
+  private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+
+  private final List<ByteBuffer> buffers;
+  private final long length;
+
+  private Iterator<ByteBuffer> iterator;
+  private ByteBuffer current = EMPTY;
+  private long position = 0;
+
+  private long mark = -1;
+  private long markLimit = 0;
+  private List<ByteBuffer> markBuffers = new ArrayList<>();
+
+  MultiBufferInputStream(List<ByteBuffer> buffers) {
+    this.buffers = buffers;
+
+    long totalLen = 0;
+    for (ByteBuffer buffer : buffers) {
+      totalLen += buffer.remaining();
+    }
+    this.length = totalLen;
+
+    this.iterator = buffers.iterator();
+
+    nextBuffer();
+  }
+
+  /**
+   * Returns the position in the stream.
+   */
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (n <= 0) {
+      return 0;
+    }
+
+    if (current == null) {
+      return -1;
+    }
+
+    long bytesSkipped = 0;
+    while (bytesSkipped < n) {
+      if (current.remaining() > 0) {
+        long bytesToSkip = Math.min(n - bytesSkipped, current.remaining());
+        current.position(current.position() + (int) bytesToSkip);
+        bytesSkipped += bytesToSkip;
+        this.position += bytesToSkip;
+      } else if (!nextBuffer()) {
+        // there are no more buffers
+        return bytesSkipped > 0 ? bytesSkipped : -1;
+      }
+    }
+
+    return bytesSkipped;
+  }
+
+  @Override
+  public int read(ByteBuffer out) {
+    int len = out.remaining();
+    if (len <= 0) {
+      return 0;
+    }
+
+    if (current == null) {
+      return -1;
+    }
+
+    int bytesCopied = 0;
+    while (bytesCopied < len) {
+      if (current.remaining() > 0) {
+        int bytesToCopy;
+        ByteBuffer copyBuffer;
+        if (current.remaining() <= out.remaining()) {
+          // copy all of the current buffer
+          bytesToCopy = current.remaining();
+          copyBuffer = current;
+        } else {
+          // copy a slice of the current buffer
+          bytesToCopy = out.remaining();
+          copyBuffer = current.duplicate();
+          copyBuffer.limit(copyBuffer.position() + bytesToCopy);
+          current.position(copyBuffer.position() + bytesToCopy);
+        }
+
+        out.put(copyBuffer);
+        bytesCopied += bytesToCopy;
+        this.position += bytesToCopy;
+
+      } else if (!nextBuffer()) {
+        // there are no more buffers
+        return bytesCopied > 0 ? bytesCopied : -1;
+      }
+    }
+
+    return bytesCopied;
+  }
+
+  @Override
+  public ByteBuffer slice(int length) throws EOFException {
+    if (length <= 0) {
+      return EMPTY;
+    }
+
+    if (current == null) {
+      throw new EOFException();
+    }
+
+    ByteBuffer slice;
+    if (length > current.remaining()) {
+      // a copy is needed to return a single buffer
+      // TODO: use an allocator
+      slice = ByteBuffer.allocate(length);
+      int bytesCopied = read(slice);
+      slice.flip();
+      if (bytesCopied < length) {
+        throw new EOFException();
+      }
+    } else {
+      slice = current.duplicate();
+      slice.limit(slice.position() + length);
+      current.position(slice.position() + length);
+      this.position += length;
+    }
+
+    return slice;
+  }
+
+  public List<ByteBuffer> sliceBuffers(long len) throws EOFException {
+    if (len <= 0) {
+      return Collections.emptyList();
+    }
+
+    if (current == null) {
+      throw new EOFException();
+    }
+
+    List<ByteBuffer> buffers = new ArrayList<>();
+    long bytesAccumulated = 0;
+    while (bytesAccumulated < len) {
+      if (current.remaining() > 0) {
+        // get a slice of the current buffer to return
+        // always fits in an int because remaining returns an int that is >= 0
+        int bufLen = (int) Math.min(len - bytesAccumulated, 
current.remaining());
+        ByteBuffer slice = current.duplicate();
+        slice.limit(slice.position() + bufLen);
+        buffers.add(slice);
+        bytesAccumulated += bufLen;
+
+        // update state; the bytes are considered read
+        current.position(current.position() + bufLen);
+        this.position += bufLen;
+      } else if (!nextBuffer()) {
+        // there are no more buffers
+        throw new EOFException();
+      }
+    }
+
+    return buffers;
+  }
+
+  @Override
+  public List<ByteBuffer> remainingBuffers() {
+    if (position >= length) {
+      return Collections.emptyList();
+    }
+
+    try {
+      return sliceBuffers(length - position);
+    } catch (EOFException e) {
+      throw new RuntimeException(
+          "[Parquet bug] Stream is bad: incorrect bytes remaining " +
+              (length - position));
+    }
+  }
+
+  @Override
+  public int read(byte[] bytes, int off, int len) {
+    if (len <= 0) {
+      if (len < 0) {
+        throw new IndexOutOfBoundsException("Read length must be greater than 
0: " + len);
+      }
+      return 0;
+    }
+
+    if (current == null) {
+      return -1;
+    }
+
+    int bytesRead = 0;
+    while (bytesRead < len) {
+      if (current.remaining() > 0) {
+        int bytesToRead = Math.min(len - bytesRead, current.remaining());
+        current.get(bytes, off + bytesRead, bytesToRead);
+        bytesRead += bytesToRead;
+        this.position += bytesToRead;
+      } else if (!nextBuffer()) {
+        // there are no more buffers
+        return bytesRead > 0 ? bytesRead : -1;
+      }
+    }
+
+    return bytesRead;
+  }
+
+  @Override
+  public int read(byte[] bytes) {
+    return read(bytes, 0, bytes.length);
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (current == null) {
+      throw new EOFException();
+    }
+
+    while (true) {
+      if (current.remaining() > 0) {
+        this.position += 1;
+        return current.get() & 0xFF; // as unsigned
+      } else if (!nextBuffer()) {
+        // there are no more buffers
+        throw new EOFException();
+      }
+    }
+  }
+
+  @Override
+  public int available() {
+    long remaining = length - position;
+    if (remaining > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int) remaining;
+    }
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    if (mark >= 0) {
+      discardMark();
+    }
+    this.mark = position;
+    this.markLimit = mark + readlimit + 1;
+    if (current != null) {
+      markBuffers.add(current.duplicate());
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (mark >= 0 && position < markLimit) {
+      this.position = mark;
+      // replace the current iterator with one that adds back the buffers that
+      // have been used since mark was called.
+      this.iterator = concat(markBuffers.iterator(), iterator);
+      discardMark();
+      nextBuffer(); // go back to the marked buffers
+    } else {
+      throw new IOException("No mark defined or has read past the previous 
mark limit");
+    }
+  }
+
+  private void discardMark() {
+    this.mark = -1;
+    this.markLimit = 0;
+    markBuffers = new ArrayList<>();
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  private boolean nextBuffer() {
+    if (!iterator.hasNext()) {
+      this.current = null;
+      return false;
+    }
+
+    this.current = iterator.next().duplicate();
+
+    if (mark >= 0) {
+      if (position < markLimit) {
+        // the mark is defined and valid. save the new buffer
+        markBuffers.add(current.duplicate());
+      } else {
+        // the mark has not been used and is no longer valid
+        discardMark();
+      }
+    }
+
+    return true;
+  }
+
+  private static <E> Iterator<E> concat(Iterator<E> first, Iterator<E> second) 
{
+    return new ConcatIterator<>(first, second);
+  }
+
+  private static class ConcatIterator<E> implements Iterator<E> {
+    private final Iterator<E> first;
+    private final Iterator<E> second;
+    boolean useFirst = true;
+
+    public ConcatIterator(Iterator<E> first, Iterator<E> second) {
+      this.first = first;
+      this.second = second;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (useFirst) {
+        if (first.hasNext()) {
+          return true;
+        } else {
+          useFirst = false;
+          return second.hasNext();
+        }
+      }
+      return second.hasNext();
+    }
+
+    @Override
+    public E next() {
+      if (useFirst && !first.hasNext()) {
+        useFirst = false;
+      }
+
+      if (!useFirst && !second.hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (useFirst) {
+        return first.next();
+      }
+
+      return second.next();
+    }
+
+    @Override
+    public void remove() {
+      if (useFirst) {
+        first.remove();
+      }
+      second.remove();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java
new file mode 100644
index 0000000..999d1bb
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This ByteBufferInputStream does not consume the ByteBuffer being passed in, 
+ * but will create a slice of the current buffer.
+ */
+class SingleBufferInputStream extends ByteBufferInputStream {
+
+  private final ByteBuffer buffer;
+  private final long startPosition;
+  private int mark = -1;
+
+  SingleBufferInputStream(ByteBuffer buffer) {
+    // duplicate the buffer because its state will be modified
+    this.buffer = buffer.duplicate();
+    this.startPosition = buffer.position();
+  }
+
+  @Override
+  public long position() {
+    // position is relative to the start of the stream, not the buffer
+    return buffer.position() - startPosition;
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (!buffer.hasRemaining()) {
+       throw new EOFException();
+    }
+    return buffer.get() & 0xFF; // as unsigned
+  }
+
+  @Override
+  public int read(byte[] bytes, int offset, int length) throws IOException {
+    if (length == 0) {
+      return 0;
+    }
+
+    int remaining = buffer.remaining();
+    if (remaining <= 0) {
+      return -1;
+    }
+
+    int bytesToRead = Math.min(buffer.remaining(), length);
+    buffer.get(bytes, offset, bytesToRead);
+
+    return bytesToRead;
+  }
+  
+  @Override
+  public long skip(long n) {
+    if (n == 0) {
+      return 0;
+    }
+
+    if (buffer.remaining() <= 0) {
+      return -1;
+    }
+
+    // buffer.remaining is an int, so this will always fit in an int
+    int bytesToSkip = (int) Math.min(buffer.remaining(), n);
+    buffer.position(buffer.position() + bytesToSkip);
+
+    return bytesToSkip;
+  }
+
+  @Override
+  public int read(ByteBuffer out) {
+    int bytesToCopy;
+    ByteBuffer copyBuffer;
+    if (buffer.remaining() <= out.remaining()) {
+      // copy all of the buffer
+      bytesToCopy = buffer.remaining();
+      copyBuffer = buffer;
+    } else {
+      // copy a slice of the current buffer
+      bytesToCopy = out.remaining();
+      copyBuffer = buffer.duplicate();
+      copyBuffer.limit(buffer.position() + bytesToCopy);
+      buffer.position(buffer.position() + bytesToCopy);
+    }
+
+    out.put(copyBuffer);
+    out.flip();
+
+    return bytesToCopy;
+  }
+
+  @Override
+  public ByteBuffer slice(int length) throws EOFException {
+    if (buffer.remaining() < length) {
+      throw new EOFException();
+    }
+
+    // length is less than remaining, so it must fit in an int
+    ByteBuffer copy = buffer.duplicate();
+    copy.limit(copy.position() + length);
+    buffer.position(buffer.position() + length);
+
+    return copy;
+  }
+
+  @Override
+  public List<ByteBuffer> sliceBuffers(long length) throws EOFException {
+    if (length == 0) {
+      return Collections.emptyList();
+    }
+
+    if (length > buffer.remaining()) {
+      throw new EOFException();
+    }
+
+    // length is less than remaining, so it must fit in an int
+    return Collections.singletonList(slice((int) length));
+  }
+
+  @Override
+  public List<ByteBuffer> remainingBuffers() {
+    if (buffer.remaining() <= 0) {
+      return Collections.emptyList();
+    }
+
+    ByteBuffer remaining = buffer.duplicate();
+    buffer.position(buffer.limit());
+
+    return Collections.singletonList(remaining);
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    this.mark = buffer.position();
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (mark >= 0) {
+      buffer.position(mark);
+      this.mark = -1;
+    } else {
+      throw new IOException("No mark defined");
+    }
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  @Override
+  public int available() {
+    return buffer.remaining();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
new file mode 100644
index 0000000..7bed2a9
--- /dev/null
+++ 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
@@ -0,0 +1,597 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+public abstract class TestByteBufferInputStreams {
+
+  protected abstract ByteBufferInputStream newStream();
+  protected abstract void checkOriginalData();
+
+  @Test
+  public void testRead0() throws Exception {
+    byte[] bytes = new byte[0];
+
+    ByteBufferInputStream stream = newStream();
+
+    Assert.assertEquals("Should read 0 bytes", 0, stream.read(bytes));
+
+    int bytesRead = stream.read(new byte[100]);
+    Assert.assertTrue("Should read to end of stream", bytesRead < 100);
+
+    Assert.assertEquals("Should read 0 bytes at end of stream",
+        0, stream.read(bytes));
+  }
+
+  @Test
+  public void testReadAll() throws Exception {
+    byte[] bytes = new byte[35];
+
+    ByteBufferInputStream stream = newStream();
+
+    int bytesRead = stream.read(bytes);
+    Assert.assertEquals("Should read the entire buffer",
+        bytes.length, bytesRead);
+
+    for (int i = 0; i < bytes.length; i += 1) {
+      Assert.assertEquals("Byte i should be i", i, bytes[i]);
+      Assert.assertEquals("Should advance position", 35, stream.position());
+    }
+
+    Assert.assertEquals("Should have no more remaining content",
+        0, stream.available());
+
+    Assert.assertEquals("Should return -1 at end of stream",
+        -1, stream.read(bytes));
+
+    Assert.assertEquals("Should have no more remaining content",
+        0, stream.available());
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testSmallReads() throws Exception {
+    for (int size = 1; size < 36; size += 1) {
+      byte[] bytes = new byte[size];
+
+      ByteBufferInputStream stream = newStream();
+      long length = stream.available();
+
+      int lastBytesRead = bytes.length;
+      for (int offset = 0; offset < length; offset += bytes.length) {
+        Assert.assertEquals("Should read requested len",
+            bytes.length, lastBytesRead);
+
+        lastBytesRead = stream.read(bytes, 0, bytes.length);
+
+        Assert.assertEquals("Should advance position",
+            offset + lastBytesRead, stream.position());
+
+        // validate the bytes that were read
+        for (int i = 0; i < lastBytesRead; i += 1) {
+          Assert.assertEquals("Byte i should be i", offset + i, bytes[i]);
+        }
+      }
+
+      Assert.assertEquals("Should read fewer bytes at end of buffer",
+          length % bytes.length, lastBytesRead % bytes.length);
+
+      Assert.assertEquals("Should have no more remaining content",
+          0, stream.available());
+
+      Assert.assertEquals("Should return -1 at end of stream",
+          -1, stream.read(bytes));
+
+      Assert.assertEquals("Should have no more remaining content",
+          0, stream.available());
+    }
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testPartialBufferReads() throws Exception {
+    for (int size = 1; size < 35; size += 1) {
+      byte[] bytes = new byte[33];
+
+      ByteBufferInputStream stream = newStream();
+
+      int lastBytesRead = size;
+      for (int offset = 0; offset < bytes.length; offset += size) {
+        Assert.assertEquals("Should read requested len", size, lastBytesRead);
+
+        lastBytesRead = stream.read(
+            bytes, offset, Math.min(size, bytes.length - offset));
+
+        Assert.assertEquals("Should advance position",
+            lastBytesRead > 0 ? offset + lastBytesRead : offset,
+            stream.position());
+      }
+
+      Assert.assertEquals("Should read fewer bytes at end of buffer",
+          bytes.length % size, lastBytesRead % size);
+
+      for (int i = 0; i < bytes.length; i += 1) {
+        Assert.assertEquals("Byte i should be i", i, bytes[i]);
+      }
+
+      Assert.assertEquals("Should have no more remaining content",
+          2, stream.available());
+
+      Assert.assertEquals("Should return 2 more bytes",
+          2, stream.read(bytes));
+
+      Assert.assertEquals("Should have no more remaining content",
+          0, stream.available());
+
+      Assert.assertEquals("Should return -1 at end of stream",
+          -1, stream.read(bytes));
+
+      Assert.assertEquals("Should have no more remaining content",
+          0, stream.available());
+    }
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testReadByte() throws Exception {
+    final ByteBufferInputStream stream = newStream();
+    int length = stream.available();
+
+    for (int i = 0; i < length; i += 1) {
+      Assert.assertEquals("Position should increment", i, stream.position());
+      Assert.assertEquals(i, stream.read());
+    }
+
+    assertThrows("Should throw EOFException at end of stream",
+        EOFException.class, new Callable<Integer>() {
+          @Override
+          public Integer call() throws IOException {
+            return stream.read();
+          }
+        });
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testSlice() throws Exception {
+    ByteBufferInputStream stream = newStream();
+    int length = stream.available();
+
+    ByteBuffer empty = stream.slice(0);
+    Assert.assertNotNull("slice(0) should produce a non-null buffer", empty);
+    Assert.assertEquals("slice(0) should produce an empty buffer",
+        0, empty.remaining());
+
+    Assert.assertEquals("Position should be at start", 0, stream.position());
+
+    int i = 0;
+    while (stream.available() > 0) {
+      int bytesToSlice = Math.min(stream.available(), 10);
+      ByteBuffer buffer = stream.slice(bytesToSlice);
+
+      for (int j = 0; j < bytesToSlice; j += 1) {
+        Assert.assertEquals("Data should be correct", i + j, buffer.get());
+      }
+
+      i += bytesToSlice;
+    }
+
+    Assert.assertEquals("Position should be at end", length, 
stream.position());
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testSliceBuffers0() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    Assert.assertEquals("Should return an empty list",
+        Collections.emptyList(), stream.sliceBuffers(0));
+  }
+
+  @Test
+  public void testWholeSliceBuffers() throws Exception {
+    final ByteBufferInputStream stream = newStream();
+    final int length = stream.available();
+
+    List<ByteBuffer> buffers = stream.sliceBuffers(stream.available());
+
+    Assert.assertEquals("Should consume all buffers", length, 
stream.position());
+
+    assertThrows("Should throw EOFException when empty",
+        EOFException.class, new Callable<List<ByteBuffer>>() {
+          @Override
+          public List<ByteBuffer> call() throws Exception {
+            return stream.sliceBuffers(length);
+          }
+        });
+
+    ByteBufferInputStream copy = ByteBufferInputStream.wrap(buffers);
+    for (int i = 0; i < length; i += 1) {
+      Assert.assertEquals("Slice should have identical data", i, copy.read());
+    }
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testSliceBuffersCoverage() throws Exception {
+    for (int size = 1; size < 36; size += 1) {
+      ByteBufferInputStream stream = newStream();
+      int length = stream.available();
+
+      List<ByteBuffer> buffers = new ArrayList<>();
+      while (stream.available() > 0) {
+        buffers.addAll(stream.sliceBuffers(Math.min(size, 
stream.available())));
+      }
+
+      Assert.assertEquals("Should consume all content",
+          length, stream.position());
+
+      ByteBufferInputStream newStream = new MultiBufferInputStream(buffers);
+
+      for (int i = 0; i < length; i += 1) {
+        Assert.assertEquals("Data should be correct", i, newStream.read());
+      }
+    }
+
+    checkOriginalData();
+  }
+
+  @Test
+  public void testSliceBuffersModification() throws Exception {
+    ByteBufferInputStream stream = newStream();
+    int length = stream.available();
+
+    int sliceLength = 5;
+    List<ByteBuffer> buffers = stream.sliceBuffers(sliceLength);
+    Assert.assertEquals("Should advance the original stream",
+        length - sliceLength, stream.available());
+    Assert.assertEquals("Should advance the original stream position",
+        sliceLength, stream.position());
+
+    Assert.assertEquals("Should return a slice of the first buffer",
+        1, buffers.size());
+
+    ByteBuffer buffer = buffers.get(0);
+    Assert.assertEquals("Should have requested bytes",
+        sliceLength, buffer.remaining());
+
+    // read the buffer one past the returned limit. this should not change the
+    // next value in the original stream
+    buffer.limit(sliceLength + 1);
+    for (int i = 0; i < sliceLength + 1; i += 1) {
+      Assert.assertEquals("Should have correct data", i, buffer.get());
+    }
+
+    Assert.assertEquals("Reading a slice shouldn't advance the original 
stream",
+        sliceLength, stream.position());
+    Assert.assertEquals("Reading a slice shouldn't change the underlying data",
+        sliceLength, stream.read());
+
+    // change the underlying data buffer
+    buffer.limit(sliceLength + 2);
+    int originalValue = buffer.duplicate().get();
+    ByteBuffer undoBuffer = buffer.duplicate();
+
+    try {
+      buffer.put((byte) 255);
+
+      Assert.assertEquals(
+          "Writing to a slice shouldn't advance the original stream",
+          sliceLength + 1, stream.position());
+      Assert.assertEquals(
+          "Writing to a slice should change the underlying data",
+          255, stream.read());
+
+    } finally {
+      undoBuffer.put((byte) originalValue);
+    }
+  }
+
+  @Test
+  public void testSkip() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    while (stream.available() > 0) {
+      int bytesToSkip = Math.min(stream.available(), 10);
+      Assert.assertEquals("Should skip all, regardless of backing buffers",
+          bytesToSkip, stream.skip(bytesToSkip));
+    }
+
+    stream = newStream();
+    Assert.assertEquals(0, stream.skip(0));
+
+    int length = stream.available();
+    Assert.assertEquals("Should stop at end when out of bytes",
+        length, stream.skip(length + 10));
+    Assert.assertEquals("Should return -1 when at end",
+        -1, stream.skip(10));
+  }
+
+  @Test
+  public void testSkipFully() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    long lastPosition = 0;
+    while (stream.available() > 0) {
+      int bytesToSkip = Math.min(stream.available(), 10);
+
+      stream.skipFully(bytesToSkip);
+
+      Assert.assertEquals("Should skip all, regardless of backing buffers",
+          bytesToSkip, stream.position() - lastPosition);
+
+      lastPosition = stream.position();
+    }
+
+    final ByteBufferInputStream stream2 = newStream();
+    stream2.skipFully(0);
+    Assert.assertEquals(0, stream2.position());
+
+    final int length = stream2.available();
+    assertThrows("Should throw when out of bytes",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            stream2.skipFully(length + 10);
+            return null;
+          }
+        });
+  }
+
+  @Test
+  public void testMark() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    stream.read(new byte[7]);
+    stream.mark(100);
+
+    long mark = stream.position();
+
+    byte[] expected = new byte[100];
+    int expectedBytesRead = stream.read(expected);
+
+    long end = stream.position();
+
+    stream.reset();
+
+    Assert.assertEquals("Position should return to the mark",
+        mark, stream.position());
+
+    byte[] afterReset = new byte[100];
+    int bytesReadAfterReset = stream.read(afterReset);
+
+    Assert.assertEquals("Should read the same number of bytes",
+        expectedBytesRead, bytesReadAfterReset);
+
+    Assert.assertEquals("Read should end at the same position",
+        end, stream.position());
+
+    Assert.assertArrayEquals("Content should be equal", expected, afterReset);
+  }
+
+  @Test
+  public void testMarkTwice() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    stream.read(new byte[7]);
+    stream.mark(1);
+    stream.mark(100);
+
+    long mark = stream.position();
+
+    byte[] expected = new byte[100];
+    int expectedBytesRead = stream.read(expected);
+
+    long end = stream.position();
+
+    stream.reset();
+
+    Assert.assertEquals("Position should return to the mark",
+        mark, stream.position());
+
+    byte[] afterReset = new byte[100];
+    int bytesReadAfterReset = stream.read(afterReset);
+
+    Assert.assertEquals("Should read the same number of bytes",
+        expectedBytesRead, bytesReadAfterReset);
+
+    Assert.assertEquals("Read should end at the same position",
+        end, stream.position());
+
+    Assert.assertArrayEquals("Content should be equal", expected, afterReset);
+  }
+
+  @Test
+  public void testMarkAtStart() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    stream.mark(100);
+
+    long mark = stream.position();
+
+    byte[] expected = new byte[10];
+    Assert.assertEquals("Should read 10 bytes", 10, stream.read(expected));
+
+    long end = stream.position();
+
+    stream.reset();
+
+    Assert.assertEquals("Position should return to the mark",
+        mark, stream.position());
+
+    byte[] afterReset = new byte[10];
+    Assert.assertEquals("Should read 10 bytes", 10, stream.read(afterReset));
+
+    Assert.assertEquals("Read should end at the same position",
+        end, stream.position());
+
+    Assert.assertArrayEquals("Content should be equal", expected, afterReset);
+  }
+
+  @Test
+  public void testMarkAtEnd() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    int bytesRead = stream.read(new byte[100]);
+    Assert.assertTrue("Should read to end of stream", bytesRead < 100);
+
+    stream.mark(100);
+
+    long mark = stream.position();
+
+    byte[] expected = new byte[10];
+    Assert.assertEquals("Should read 0 bytes", -1, stream.read(expected));
+
+    long end = stream.position();
+
+    stream.reset();
+
+    Assert.assertEquals("Position should return to the mark",
+        mark, stream.position());
+
+    byte[] afterReset = new byte[10];
+    Assert.assertEquals("Should read 0 bytes", -1, stream.read(afterReset));
+
+    Assert.assertEquals("Read should end at the same position",
+        end, stream.position());
+
+    Assert.assertArrayEquals("Content should be equal", expected, afterReset);
+  }
+
+  @Test
+  public void testMarkUnset() {
+    final ByteBufferInputStream stream = newStream();
+
+    assertThrows("Should throw an error for reset() without mark()",
+        IOException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            stream.reset();
+            return null;
+          }
+        });
+  }
+
+  @Test
+  public void testMarkAndResetTwiceOverSameRange() throws Exception {
+    final ByteBufferInputStream stream = newStream();
+
+    byte[] expected = new byte[6];
+    stream.mark(10);
+    Assert.assertEquals("Should read expected bytes",
+        expected.length, stream.read(expected));
+
+    stream.reset();
+    stream.mark(10);
+
+    byte[] firstRead = new byte[6];
+    Assert.assertEquals("Should read firstRead bytes",
+        firstRead.length, stream.read(firstRead));
+
+    stream.reset();
+
+    byte[] secondRead = new byte[6];
+    Assert.assertEquals("Should read secondRead bytes",
+        secondRead.length, stream.read(secondRead));
+
+    Assert.assertArrayEquals("First read should be correct",
+        expected, firstRead);
+
+    Assert.assertArrayEquals("Second read should be correct",
+        expected, secondRead);
+  }
+
+  @Test
+  public void testMarkLimit() throws Exception {
+    final ByteBufferInputStream stream = newStream();
+
+    stream.mark(5);
+    Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5]));
+
+    stream.reset();
+
+    Assert.assertEquals("Should read 6 bytes", 6, stream.read(new byte[6]));
+
+    assertThrows("Should throw an error for reset() after limit",
+        IOException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            stream.reset();
+            return null;
+          }
+        });
+  }
+
+  @Test
+  public void testMarkDoubleReset() throws Exception {
+    final ByteBufferInputStream stream = newStream();
+
+    stream.mark(5);
+    Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5]));
+
+    stream.reset();
+
+    assertThrows("Should throw an error for double reset()",
+        IOException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            stream.reset();
+            return null;
+          }
+        });
+  }
+
+  /**
+   * A convenience method to avoid a large number of @Test(expected=...) tests
+   * @param message A String message to describe this assertion
+   * @param expected An Exception class that the Runnable should throw
+   * @param callable A Callable that is expected to throw the exception
+   */
+  public static void assertThrows(
+      String message, Class<? extends Exception> expected, Callable callable) {
+    try {
+      callable.call();
+      Assert.fail("No exception was thrown (" + message + "), expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      try {
+        Assert.assertEquals(message, expected, actual.getClass());
+      } catch (AssertionError e) {
+        e.addSuppressed(actual);
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java
 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java
new file mode 100644
index 0000000..253c986
--- /dev/null
+++ 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java
@@ -0,0 +1,141 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import org.junit.Assert;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestMultiBufferInputStream extends TestByteBufferInputStreams {
+  private static final List<ByteBuffer> DATA = Arrays.asList(
+      ByteBuffer.wrap(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }),
+      ByteBuffer.wrap(new byte[] { 9, 10, 11, 12 }),
+      ByteBuffer.wrap(new byte[] {  }),
+      ByteBuffer.wrap(new byte[] { 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 
24 }),
+      ByteBuffer.wrap(new byte[] { 25 }),
+      ByteBuffer.wrap(new byte[] { 26, 27, 28, 29, 30, 31, 32 }),
+      ByteBuffer.wrap(new byte[] { 33, 34 })
+  );
+
+  @Override
+  protected ByteBufferInputStream newStream() {
+    return new MultiBufferInputStream(DATA);
+  }
+
+  @Override
+  protected void checkOriginalData() {
+    for (ByteBuffer buffer : DATA) {
+      Assert.assertEquals("Position should not change", 0, buffer.position());
+      Assert.assertEquals("Limit should not change",
+          buffer.array().length, buffer.limit());
+    }
+  }
+
+  @Test
+  public void testSliceData() throws Exception {
+    ByteBufferInputStream stream = newStream();
+    int length = stream.available();
+
+    List<ByteBuffer> buffers = new ArrayList<>();
+    // slice the stream into 3 8-byte buffers and 1 2-byte buffer
+    while (stream.available() > 0) {
+      int bytesToSlice = Math.min(stream.available(), 8);
+      buffers.add(stream.slice(bytesToSlice));
+    }
+
+    Assert.assertEquals("Position should be at end", length, 
stream.position());
+    Assert.assertEquals("Should produce 5 buffers", 5, buffers.size());
+
+    int i = 0;
+
+    // one is a view of the first buffer because it is smaller
+    ByteBuffer one = buffers.get(0);
+    Assert.assertSame("Should be a duplicate of the first array",
+        one.array(), DATA.get(0).array());
+    Assert.assertEquals(8, one.remaining());
+    Assert.assertEquals(0, one.position());
+    Assert.assertEquals(8, one.limit());
+    Assert.assertEquals(9, one.capacity());
+    for (; i < 8; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, one.get());
+    }
+
+    // two should be a copy of the next 8 bytes
+    ByteBuffer two = buffers.get(1);
+    Assert.assertEquals(8, two.remaining());
+    Assert.assertEquals(0, two.position());
+    Assert.assertEquals(8, two.limit());
+    Assert.assertEquals(8, two.capacity());
+    for (; i < 16; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, two.get());
+    }
+
+    // three is a copy of part of the 4th buffer
+    ByteBuffer three = buffers.get(2);
+    Assert.assertSame("Should be a duplicate of the fourth array",
+        three.array(), DATA.get(3).array());
+    Assert.assertEquals(8, three.remaining());
+    Assert.assertEquals(3, three.position());
+    Assert.assertEquals(11, three.limit());
+    Assert.assertEquals(12, three.capacity());
+    for (; i < 24; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, three.get());
+    }
+
+    // four should be a copy of the next 8 bytes
+    ByteBuffer four = buffers.get(3);
+    Assert.assertEquals(8, four.remaining());
+    Assert.assertEquals(0, four.position());
+    Assert.assertEquals(8, four.limit());
+    Assert.assertEquals(8, four.capacity());
+    for (; i < 32; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, four.get());
+    }
+
+    // five should be a copy of the next 8 bytes
+    ByteBuffer five = buffers.get(4);
+    Assert.assertEquals(3, five.remaining());
+    Assert.assertEquals(0, five.position());
+    Assert.assertEquals(3, five.limit());
+    Assert.assertEquals(3, five.capacity());
+    for (; i < 35; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, five.get());
+    }
+  }
+
+  @Test
+  public void testSliceBuffersData() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    List<ByteBuffer> buffers = stream.sliceBuffers(stream.available());
+    List<ByteBuffer> nonEmptyBuffers = new ArrayList<>();
+    for (ByteBuffer buffer : DATA) {
+      if (buffer.remaining() > 0) {
+        nonEmptyBuffers.add(buffer);
+      }
+    }
+
+    Assert.assertEquals("Should return duplicates of all non-empty buffers",
+        nonEmptyBuffers, buffers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
new file mode 100644
index 0000000..9db23be
--- /dev/null
+++ 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
@@ -0,0 +1,130 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import org.junit.Assert;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
+  private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] {
+          0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+          20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 });
+
+  @Override
+  protected ByteBufferInputStream newStream() {
+    return new SingleBufferInputStream(DATA);
+  }
+
+  @Override
+  protected void checkOriginalData() {
+      Assert.assertEquals("Position should not change", 0, DATA.position());
+      Assert.assertEquals("Limit should not change",
+          DATA.array().length, DATA.limit());
+  }
+
+  @Test
+  public void testSliceData() throws Exception {
+    ByteBufferInputStream stream = newStream();
+    int length = stream.available();
+
+    List<ByteBuffer> buffers = new ArrayList<>();
+    // slice the stream into 3 8-byte buffers and 1 2-byte buffer
+    while (stream.available() > 0) {
+      int bytesToSlice = Math.min(stream.available(), 8);
+      buffers.add(stream.slice(bytesToSlice));
+    }
+
+    Assert.assertEquals("Position should be at end", length, 
stream.position());
+    Assert.assertEquals("Should produce 5 buffers", 5, buffers.size());
+
+    int i = 0;
+
+    ByteBuffer one = buffers.get(0);
+    Assert.assertSame("Should use the same backing array",
+        one.array(), DATA.array());
+    Assert.assertEquals(8, one.remaining());
+    Assert.assertEquals(0, one.position());
+    Assert.assertEquals(8, one.limit());
+    Assert.assertEquals(35, one.capacity());
+    for (; i < 8; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, one.get());
+    }
+
+    ByteBuffer two = buffers.get(1);
+    Assert.assertSame("Should use the same backing array",
+        two.array(), DATA.array());
+    Assert.assertEquals(8, two.remaining());
+    Assert.assertEquals(8, two.position());
+    Assert.assertEquals(16, two.limit());
+    Assert.assertEquals(35, two.capacity());
+    for (; i < 16; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, two.get());
+    }
+
+    // three is a copy of part of the 4th buffer
+    ByteBuffer three = buffers.get(2);
+    Assert.assertSame("Should use the same backing array",
+        three.array(), DATA.array());
+    Assert.assertEquals(8, three.remaining());
+    Assert.assertEquals(16, three.position());
+    Assert.assertEquals(24, three.limit());
+    Assert.assertEquals(35, three.capacity());
+    for (; i < 24; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, three.get());
+    }
+
+    // four should be a copy of the next 8 bytes
+    ByteBuffer four = buffers.get(3);
+    Assert.assertSame("Should use the same backing array",
+        four.array(), DATA.array());
+    Assert.assertEquals(8, four.remaining());
+    Assert.assertEquals(24, four.position());
+    Assert.assertEquals(32, four.limit());
+    Assert.assertEquals(35, four.capacity());
+    for (; i < 32; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, four.get());
+    }
+
+    // five should be a copy of the next 8 bytes
+    ByteBuffer five = buffers.get(4);
+    Assert.assertSame("Should use the same backing array",
+        five.array(), DATA.array());
+    Assert.assertEquals(3, five.remaining());
+    Assert.assertEquals(32, five.position());
+    Assert.assertEquals(35, five.limit());
+    Assert.assertEquals(35, five.capacity());
+    for (; i < 35; i += 1) {
+      Assert.assertEquals("Should produce correct values", i, five.get());
+    }
+  }
+
+  @Test
+  public void testWholeSliceBuffersData() throws Exception {
+    ByteBufferInputStream stream = newStream();
+
+    List<ByteBuffer> buffers = stream.sliceBuffers(stream.available());
+    Assert.assertEquals("Should return duplicates of all non-empty buffers",
+        Collections.singletonList(DATA), buffers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 87c8ac9..8d3e48d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -37,6 +37,8 @@ import static 
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD
 public class HadoopReadOptions extends ParquetReadOptions {
   private final Configuration conf;
 
+  private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
+
   private HadoopReadOptions(boolean useSignedStringMinMax,
                             boolean useStatsFilter,
                             boolean useDictionaryFilter,
@@ -45,11 +47,12 @@ public class HadoopReadOptions extends ParquetReadOptions {
                             MetadataFilter metadataFilter,
                             CompressionCodecFactory codecFactory,
                             ByteBufferAllocator allocator,
+                            int maxAllocationSize,
                             Map<String, String> properties,
                             Configuration conf) {
     super(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter, recordFilter,
-        metadataFilter, codecFactory, allocator, properties
+        metadataFilter, codecFactory, allocator, maxAllocationSize, properties
     );
     this.conf = conf;
   }
@@ -82,6 +85,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
       useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
       withCodecFactory(HadoopCodecs.newFactory(conf, 0));
       withRecordFilter(getFilter(conf));
+      withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
       String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
       if (badRecordThresh != null) {
         set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
@@ -92,7 +96,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
     public ParquetReadOptions build() {
       return new HadoopReadOptions(
           useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,
-          recordFilter, metadataFilter, codecFactory, allocator, properties, 
conf);
+          recordFilter, metadataFilter, codecFactory, allocator, 
maxAllocationSize, properties,
+          conf);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 5f2f0a8..4ef2460 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -38,6 +38,7 @@ public class ParquetReadOptions {
   private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
+  private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
 
   private final boolean useSignedStringMinMax;
   private final boolean useStatsFilter;
@@ -47,17 +48,19 @@ public class ParquetReadOptions {
   private final ParquetMetadataConverter.MetadataFilter metadataFilter;
   private final CompressionCodecFactory codecFactory;
   private final ByteBufferAllocator allocator;
+  private final int maxAllocationSize;
   private final Map<String, String> properties;
 
   ParquetReadOptions(boolean useSignedStringMinMax,
-                             boolean useStatsFilter,
-                             boolean useDictionaryFilter,
-                             boolean useRecordFilter,
-                             FilterCompat.Filter recordFilter,
-                             ParquetMetadataConverter.MetadataFilter 
metadataFilter,
-                             CompressionCodecFactory codecFactory,
-                             ByteBufferAllocator allocator,
-                             Map<String, String> properties) {
+                     boolean useStatsFilter,
+                     boolean useDictionaryFilter,
+                     boolean useRecordFilter,
+                     FilterCompat.Filter recordFilter,
+                     ParquetMetadataConverter.MetadataFilter metadataFilter,
+                     CompressionCodecFactory codecFactory,
+                     ByteBufferAllocator allocator,
+                     int maxAllocationSize,
+                     Map<String, String> properties) {
     this.useSignedStringMinMax = useSignedStringMinMax;
     this.useStatsFilter = useStatsFilter;
     this.useDictionaryFilter = useDictionaryFilter;
@@ -66,6 +69,7 @@ public class ParquetReadOptions {
     this.metadataFilter = metadataFilter;
     this.codecFactory = codecFactory;
     this.allocator = allocator;
+    this.maxAllocationSize = maxAllocationSize;
     this.properties = Collections.unmodifiableMap(properties);
   }
 
@@ -101,6 +105,10 @@ public class ParquetReadOptions {
     return allocator;
   }
 
+  public int getMaxAllocationSize() {
+    return maxAllocationSize;
+  }
+
   public Set<String> getPropertyNames() {
     return properties.keySet();
   }
@@ -122,16 +130,17 @@ public class ParquetReadOptions {
   }
 
   public static class Builder {
-    boolean useSignedStringMinMax = false;
-    boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
-    boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
-    boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
-    FilterCompat.Filter recordFilter = null;
-    ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
+    protected boolean useSignedStringMinMax = false;
+    protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
+    protected boolean useDictionaryFilter = 
DICTIONARY_FILTERING_ENABLED_DEFAULT;
+    protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
+    protected FilterCompat.Filter recordFilter = null;
+    protected ParquetMetadataConverter.MetadataFilter metadataFilter = 
NO_FILTER;
     // the page size parameter isn't used when only using the codec factory to 
get decompressors
-    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
-    ByteBufferAllocator allocator = new HeapByteBufferAllocator();
-    Map<String, String> properties = new HashMap<>();
+    protected CompressionCodecFactory codecFactory = 
HadoopCodecs.newFactory(0);
+    protected ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+    protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT;
+    protected Map<String, String> properties = new HashMap<>();
 
     public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
       this.useSignedStringMinMax = useSignedStringMinMax;
@@ -203,6 +212,11 @@ public class ParquetReadOptions {
       return this;
     }
 
+    public Builder withMaxAllocationInBytes(int allocationSizeInBytes) {
+      this.maxAllocationSize = allocationSizeInBytes;
+      return this;
+    }
+
     public Builder set(String key, String value) {
       properties.put(key, value);
       return this;
@@ -226,7 +240,7 @@ public class ParquetReadOptions {
     public ParquetReadOptions build() {
       return new ParquetReadOptions(
           useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,
-          recordFilter, metadataFilter, codecFactory, allocator, properties);
+          recordFilter, metadataFilter, codecFactory, allocator, 
maxAllocationSize, properties);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 8befa79..31d7bba 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -115,7 +115,7 @@ public class CodecFactory implements 
CompressionCodecFactory {
 
     @Override
     public void decompress(ByteBuffer input, int compressedSize, ByteBuffer 
output, int uncompressedSize) throws IOException {
-      ByteBuffer decompressed = decompress(BytesInput.from(input, 0, 
input.remaining()), uncompressedSize).toByteBuffer();
+      ByteBuffer decompressed = decompress(BytesInput.from(input), 
uncompressedSize).toByteBuffer();
       output.put(decompressed);
     }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index 58e79ac..1377999 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -302,7 +302,9 @@ class DirectCodecFactory extends CodecFactory implements 
AutoCloseable {
         size = Snappy.compress(this.incoming, outgoing);
       }
 
-      return BytesInput.from(outgoing, 0, (int) size);
+      outgoing.limit(size);
+
+      return BytesInput.from(outgoing);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 1ace040..6ef8a6c 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -879,23 +879,23 @@ public class ParquetFileReader implements Closeable {
    * @author Julien Le Dem
    *
    */
-  private class Chunk extends ByteBufferInputStream {
+  private class Chunk {
 
-    private final ChunkDescriptor descriptor;
+    protected final ChunkDescriptor descriptor;
+    protected final ByteBufferInputStream stream;
 
     /**
      *
      * @param descriptor descriptor for the chunk
-     * @param data contains the chunk data at offset
-     * @param offset where the chunk starts in offset
+     * @param buffers ByteBuffers that contain the chunk
      */
-    public Chunk(ChunkDescriptor descriptor, ByteBuffer data, int offset) {
-      super(data, offset, descriptor.size);
+    public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers) {
       this.descriptor = descriptor;
+      this.stream = ByteBufferInputStream.wrap(buffers);
     }
 
     protected PageHeader readPageHeader() throws IOException {
-      return Util.readPageHeader(this);
+      return Util.readPageHeader(stream);
     }
 
     /**
@@ -967,7 +967,7 @@ public class ParquetFileReader implements Closeable {
             break;
           default:
             LOG.debug("skipping page of type {} of size {}", 
pageHeader.getType(), compressedPageSize);
-            this.skip(compressedPageSize);
+            stream.skipFully(compressedPageSize);
             break;
         }
       }
@@ -977,29 +977,19 @@ public class ParquetFileReader implements Closeable {
             "Expected " + descriptor.metadata.getValueCount() + " values in 
column chunk at " +
             getPath() + " offset " + 
descriptor.metadata.getFirstDataPageOffset() +
             " but got " + valuesCountReadSoFar + " values instead over " + 
pagesInChunk.size()
-            + " pages ending at file offset " + (descriptor.fileOffset + 
pos()));
+            + " pages ending at file offset " + (descriptor.fileOffset + 
stream.position()));
       }
       BytesInputDecompressor decompressor = 
options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
       return new ColumnChunkPageReader(decompressor, pagesInChunk, 
dictionaryPage);
     }
 
     /**
-     * @return the current position in the chunk
-     */
-    public int pos() {
-      return this.byteBuf.position();
-    }
-
-    /**
      * @param size the size of the page
      * @return the page
      * @throws IOException
      */
     public BytesInput readAsBytesInput(int size) throws IOException {
-      int pos = this.byteBuf.position();
-      final BytesInput r = BytesInput.from(this.byteBuf, pos, size);
-      this.byteBuf.position(pos + size);
-      return r;
+      return BytesInput.from(stream.sliceBuffers(size));
     }
 
   }
@@ -1016,44 +1006,51 @@ public class ParquetFileReader implements Closeable {
 
     /**
      * @param descriptor the descriptor of the chunk
-     * @param byteBuf contains the data of the chunk at offset
-     * @param offset where the chunk starts in data
      * @param f the file stream positioned at the end of this chunk
      */
-    private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, 
int offset, SeekableInputStream f) {
-      super(descriptor, byteBuf, offset);
+    private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> 
buffers, SeekableInputStream f) {
+      super(descriptor, buffers);
       this.f = f;
     }
 
     protected PageHeader readPageHeader() throws IOException {
       PageHeader pageHeader;
-      int initialPos = pos();
+      stream.mark(8192); // headers should not be larger than 8k
       try {
-        pageHeader = Util.readPageHeader(this);
+        pageHeader = Util.readPageHeader(stream);
       } catch (IOException e) {
         // this is to workaround a bug where the compressedLength
         // of the chunk is missing the size of the header of the dictionary
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
         // if the last page is smaller than this, the page header itself is 
truncated in the buffer.
-        this.byteBuf.position(initialPos); // resetting the buffer to the 
position before we got the error
+        stream.reset(); // resetting the buffer to the position before we got 
the error
         LOG.info("completing the column chunk to read the page header");
-        pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // 
trying again from the buffer + remainder of the stream.
+        pageHeader = Util.readPageHeader(new SequenceInputStream(stream, f)); 
// trying again from the buffer + remainder of the stream.
       }
       return pageHeader;
     }
 
     public BytesInput readAsBytesInput(int size) throws IOException {
-      if (pos() + size > initPos + count) {
+      int available = stream.available();
+      if (size > available) {
         // this is to workaround a bug where the compressedLength
         // of the chunk is missing the size of the header of the dictionary
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
-        int l1 = initPos + count - pos();
-        int l2 = size - l1;
-        LOG.info("completed the column chunk with {} bytes", l2);
-        return BytesInput.concat(super.readAsBytesInput(l1), 
BytesInput.copy(BytesInput.from(f, l2)));
+        int missingBytes = size - available;
+        LOG.info("completed the column chunk with {} bytes", missingBytes);
+
+        List<ByteBuffer> buffers = new ArrayList<>();
+        buffers.addAll(stream.sliceBuffers(available));
+
+        ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes);
+        f.readFully(lastBuffer);
+        buffers.add(lastBuffer);
+
+        return BytesInput.from(buffers);
       }
+
       return super.readAsBytesInput(size);
     }
 
@@ -1126,22 +1123,36 @@ public class ParquetFileReader implements Closeable {
       List<Chunk> result = new ArrayList<Chunk>(chunks.size());
       f.seek(offset);
 
-      // Allocate the bytebuffer based on whether the FS can support it.
-      ByteBuffer chunksByteBuffer = options.getAllocator().allocate(length);
-      f.readFully(chunksByteBuffer);
+      int fullAllocations = length / options.getMaxAllocationSize();
+      int lastAllocationSize = length % options.getMaxAllocationSize();
+
+      int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
+      List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
+
+      for (int i = 0; i < fullAllocations; i += 1) {
+        
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
+      }
+
+      if (lastAllocationSize > 0) {
+        buffers.add(options.getAllocator().allocate(lastAllocationSize));
+      }
+
+      for (ByteBuffer buffer : buffers) {
+        f.readFully(buffer);
+        buffer.flip();
+      }
 
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
-      int currentChunkOffset = 0;
+      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
       for (int i = 0; i < chunks.size(); i++) {
         ChunkDescriptor descriptor = chunks.get(i);
         if (i < chunks.size() - 1) {
-          result.add(new Chunk(descriptor, chunksByteBuffer, 
currentChunkOffset));
+          result.add(new Chunk(descriptor, 
stream.sliceBuffers(descriptor.size)));
         } else {
           // because of a bug, the last chunk might be larger than 
descriptor.size
-          result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, 
currentChunkOffset, f));
+          result.add(new WorkaroundChunk(descriptor, 
stream.sliceBuffers(descriptor.size), f));
         }
-        currentChunkOffset += descriptor.size;
       }
       return result ;
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
index 3dd17e9..9d9a72f 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
@@ -73,7 +73,7 @@ public class TestDirectCodecFactory {
       if (useOnHeapCompression) {
         compressed = c.compress(BytesInput.from(rawArr));
       } else {
-        compressed = c.compress(BytesInput.from(rawBuf, 0, 
rawBuf.remaining()));
+        compressed = c.compress(BytesInput.from(rawBuf));
       }
 
       switch (decomp) {
@@ -95,11 +95,11 @@ public class TestDirectCodecFactory {
 
         case OFF_HEAP_BYTES_INPUT: {
           final ByteBuffer buf = compressed.toByteBuffer();
-          final ByteBuffer b = allocator.allocate(buf.capacity());
+          final ByteBuffer b = allocator.allocate(buf.limit());
           try {
             b.put(buf);
             b.flip();
-            final BytesInput input = d.decompress(BytesInput.from(b, 0, 
b.capacity()), size);
+            final BytesInput input = d.decompress(BytesInput.from(b), size);
             Assert.assertArrayEquals(
                 String.format("While testing codec %s", codec),
                 input.toByteArray(), rawArr);

Reply via email to