PARQUET-1142: Add alternatives to Hadoop classes in the API

This updates the read and write paths to avoid using Hadoop classes where 
possible.

* Adds a generic compression interface, `CompressionCodecFactory`
* Adds `OutputFile` and `PositionOutputStream`
* Adds classes to help implementations wrap input and output streams: 
`DelegatingSeekableInputStream` and `DelegatingPositionOutputStream`
* Adds `ParquetReadOptions` to avoid passing options with `Configuration`
* Updates the read and write APIs to use new abstractions instead of Hadoop

Author: Ryan Blue <b...@apache.org>

Closes #429 from rdblue/PARQUET-1142-add-hadoop-alternatives and squashes the 
following commits:

21500337b [Ryan Blue] PARQUET-1142: Fix NPE when not filtering with new read 
API.
35eddd735 [Ryan Blue] PARQUET-1142: Fix problems from Gabor's review.
da391b0d4 [Ryan Blue] PARQUET-1142: Fix binary incompatibilities.
2e3d693ab [Ryan Blue] PARQUET-1142: Update the read and write paths to use new 
files and streams.
8d57e089f [Ryan Blue] PARQUET-1142: Add OutputFile and PositionOutputStream.
42908a95e [Ryan Blue] PARQUET-1142: Extract non-Hadoop API from CodecFactory.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bfd9b4d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bfd9b4d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bfd9b4d

Branch: refs/heads/master
Commit: 8bfd9b4d8f4fb0a2b522c9328f67eb642066306b
Parents: 81f4801
Author: Ryan Blue <b...@apache.org>
Authored: Wed Dec 13 11:27:54 2017 -0800
Committer: Ryan Blue <b...@apache.org>
Committed: Wed Dec 13 11:27:54 2017 -0800

----------------------------------------------------------------------
 parquet-common/pom.xml                          |   6 +
 .../org/apache/parquet/bytes/BytesInput.java    | 486 +++++++++++
 .../bytes/CapacityByteArrayOutputStream.java    | 337 ++++++++
 .../bytes/ConcatenatingByteArrayCollector.java  |  63 ++
 .../bytes/LittleEndianDataInputStream.java      | 424 +++++++++
 .../bytes/LittleEndianDataOutputStream.java     | 220 +++++
 .../compression/CompressionCodecFactory.java    |  47 +
 .../CompressionCodecNotSupportedException.java  |  38 +
 .../hadoop/metadata/CompressionCodecName.java   |  98 +++
 .../io/DelegatingPositionOutputStream.java      |  63 ++
 .../io/DelegatingSeekableInputStream.java       | 171 ++++
 .../java/org/apache/parquet/io/InputFile.java   |   9 +-
 .../java/org/apache/parquet/io/OutputFile.java  |  34 +
 .../apache/parquet/io/PositionOutputStream.java |  39 +
 .../org/apache/parquet/io/MockInputStream.java  |  56 ++
 .../io/TestDelegatingSeekableInputStream.java   | 861 +++++++++++++++++++
 .../org/apache/parquet/bytes/BytesInput.java    | 486 -----------
 .../bytes/CapacityByteArrayOutputStream.java    | 337 --------
 .../bytes/ConcatenatingByteArrayCollector.java  |  63 --
 .../bytes/LittleEndianDataInputStream.java      | 424 ---------
 .../bytes/LittleEndianDataOutputStream.java     | 220 -----
 .../org/apache/parquet/HadoopReadOptions.java   |  98 +++
 .../org/apache/parquet/ParquetReadOptions.java  | 232 +++++
 .../parquet/filter2/compat/RowGroupFilter.java  |   4 +
 .../converter/ParquetMetadataConverter.java     |  22 +-
 .../org/apache/parquet/hadoop/CodecFactory.java |  26 +-
 .../hadoop/ColumnChunkPageReadStore.java        |   6 +-
 .../parquet/hadoop/DirectCodecFactory.java      |  12 +-
 .../hadoop/InternalParquetRecordReader.java     |  34 +-
 .../parquet/hadoop/ParquetFileReader.java       | 254 +++---
 .../parquet/hadoop/ParquetFileWriter.java       | 147 ++--
 .../parquet/hadoop/ParquetInputFormat.java      |   3 -
 .../parquet/hadoop/ParquetOutputFormat.java     |   5 +-
 .../apache/parquet/hadoop/ParquetReader.java    | 174 +++-
 .../parquet/hadoop/ParquetRecordReader.java     |  26 +-
 .../apache/parquet/hadoop/ParquetWriter.java    |  50 +-
 .../hadoop/UnmaterializableRecordCounter.java   |  15 +
 .../CompressionCodecNotSupportedException.java  |  36 -
 .../hadoop/metadata/CompressionCodecName.java   |  98 ---
 .../hadoop/util/H1SeekableInputStream.java      | 101 +--
 .../hadoop/util/H2SeekableInputStream.java      |  20 +-
 .../parquet/hadoop/util/HadoopCodecs.java       |  39 +
 .../parquet/hadoop/util/HadoopOutputFile.java   | 100 +++
 .../hadoop/util/HadoopPositionOutputStream.java |  66 ++
 .../parquet/hadoop/util/HadoopStreams.java      |  15 +
 .../TestInputOutputFormatWithPadding.java       |   6 +-
 .../parquet/hadoop/TestParquetFileWriter.java   |   1 +
 .../hadoop/util/MockHadoopInputStream.java      |  87 ++
 .../parquet/hadoop/util/MockInputStream.java    |  87 --
 .../hadoop/util/TestHadoop1ByteBufferReads.java | 761 ----------------
 .../hadoop/util/TestHadoop2ByteBufferReads.java |  30 +-
 .../parquet/tools/command/MergeCommand.java     |   3 +-
 pom.xml                                         |   9 +-
 53 files changed, 4158 insertions(+), 2891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index b0357ba..7ae6068 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -37,6 +37,12 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-format</artifactId>
+      <version>${parquet.format.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <version>${slf4j.version}</version>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java 
b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
new file mode 100644
index 0000000..6e593c2
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -0,0 +1,486 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A source of bytes capable of writing itself to an output.
+ * A BytesInput should be consumed right away.
+ * It is not a container.
+ * For example if it is referring to a stream,
+ * subsequent BytesInput reads from the stream will be incorrect
+ * if the previous has not been consumed.
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class BytesInput {
+  private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class);
+  private static final boolean DEBUG = false;//Log.DEBUG;
+  private static final EmptyBytesInput EMPTY_BYTES_INPUT = new 
EmptyBytesInput();
+
+  /**
+   * logically concatenate the provided inputs
+   * @param inputs the inputs to concatenate
+   * @return a concatenated input
+   */
+  public static BytesInput concat(BytesInput... inputs) {
+    return new SequenceBytesIn(Arrays.asList(inputs));
+  }
+
+  /**
+   * logically concatenate the provided inputs
+   * @param inputs the inputs to concatenate
+   * @return a concatenated input
+   */
+  public static BytesInput concat(List<BytesInput> inputs) {
+    return new SequenceBytesIn(inputs);
+  }
+
+  /**
+   * @param in
+   * @param bytes number of bytes to read
+   * @return a BytesInput that will read that number of bytes from the stream
+   */
+  public static BytesInput from(InputStream in, int bytes) {
+    return new StreamBytesInput(in, bytes);
+  }
+  
+  /**
+   * @param buffer
+   * @param length number of bytes to read
+   * @return a BytesInput that will read the given bytes from the ByteBuffer
+   */
+  public static BytesInput from(ByteBuffer buffer, int offset, int length) {
+    return new ByteBufferBytesInput(buffer, offset, length);
+  }
+
+  /**
+   *
+   * @param in
+   * @return a Bytes input that will write the given bytes
+   */
+  public static BytesInput from(byte[] in) {
+    LOG.debug("BytesInput from array of {} bytes", in.length);
+    return new ByteArrayBytesInput(in, 0 , in.length);
+  }
+
+  public static BytesInput from(byte[] in, int offset, int length) {
+    LOG.debug("BytesInput from array of {} bytes", length);
+    return new ByteArrayBytesInput(in, offset, length);
+  }
+
+  /**
+   * @param intValue the int to write
+   * @return a BytesInput that will write 4 bytes in little endian
+   */
+  public static BytesInput fromInt(int intValue) {
+    return new IntBytesInput(intValue);
+  }
+
+  /**
+   * @param intValue the int to write
+   * @return a BytesInput that will write var int
+   */
+  public static BytesInput fromUnsignedVarInt(int intValue) {
+    return new UnsignedVarIntBytesInput(intValue);
+  }
+
+  /**
+   *
+   * @param intValue the int to write
+   */
+  public static BytesInput fromZigZagVarInt(int intValue) {
+    int zigZag = (intValue << 1) ^ (intValue >> 31);
+    return new UnsignedVarIntBytesInput(zigZag);
+  }
+
+  /**
+   * @param longValue the long to write
+   * @return a BytesInput that will write var long
+   */
+  public static BytesInput fromUnsignedVarLong(long longValue) {
+    return new UnsignedVarLongBytesInput(longValue);
+  }
+
+  /**
+   *
+   * @param longValue the long to write
+   */
+  public static BytesInput fromZigZagVarLong(long longValue) {
+    long zigZag = (longValue << 1) ^ (longValue >> 63);
+    return new UnsignedVarLongBytesInput(zigZag);
+  }
+
+  /**
+   * @param arrayOut
+   * @return a BytesInput that will write the content of the buffer
+   */
+  public static BytesInput from(CapacityByteArrayOutputStream arrayOut) {
+    return new CapacityBAOSBytesInput(arrayOut);
+  }
+
+  /**
+   * @param baos - stream to wrap into a BytesInput
+   * @return a BytesInput that will write the content of the buffer
+   */
+  public static BytesInput from(ByteArrayOutputStream baos) {
+    return new BAOSBytesInput(baos);
+  }
+
+  /**
+   * @return an empty bytes input
+   */
+  public static BytesInput empty() {
+    return EMPTY_BYTES_INPUT;
+  }
+
+  /**
+   * copies the input into a new byte array
+   * @param bytesInput
+   * @return
+   * @throws IOException
+   */
+  public static BytesInput copy(BytesInput bytesInput) throws IOException {
+    return from(bytesInput.toByteArray());
+  }
+
+  /**
+   * writes the bytes into a stream
+   * @param out
+   * @throws IOException
+   */
+  abstract public void writeAllTo(OutputStream out) throws IOException;
+
+  /**
+   *
+   * @return a new byte array materializing the contents of this input
+   * @throws IOException
+   */
+  public byte[] toByteArray() throws IOException {
+    BAOS baos = new BAOS((int)size());
+    this.writeAllTo(baos);
+    LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size());
+    return baos.getBuf();
+  }
+
+  /**
+   *
+   * @return a new ByteBuffer materializing the contents of this input
+   * @throws IOException
+   */
+  public ByteBuffer toByteBuffer() throws IOException {
+    return ByteBuffer.wrap(toByteArray());
+  }
+
+  /**
+   *
+   * @return a new InputStream materializing the contents of this input
+   * @throws IOException
+   */
+  public InputStream toInputStream() throws IOException {
+    return new ByteBufferInputStream(toByteBuffer());
+  }
+
+  /**
+   *
+   * @return the size in bytes that would be written
+   */
+  abstract public long size();
+
+  private static final class BAOS extends ByteArrayOutputStream {
+    private BAOS(int size) {
+      super(size);
+    }
+
+    public byte[] getBuf() {
+      return this.buf;
+    }
+  }
+
+  private static class StreamBytesInput extends BytesInput {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BytesInput.StreamBytesInput.class);
+    private final InputStream in;
+    private final int byteCount;
+
+    private StreamBytesInput(InputStream in, int byteCount) {
+      super();
+      this.in = in;
+      this.byteCount = byteCount;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      LOG.debug("write All {} bytes", byteCount);
+      // TODO: more efficient
+      out.write(this.toByteArray());
+    }
+
+    public byte[] toByteArray() throws IOException {
+      LOG.debug("read all {} bytes", byteCount);
+      byte[] buf = new byte[byteCount];
+      new DataInputStream(in).readFully(buf);
+      return buf;
+    }
+
+    @Override
+    public long size() {
+      return byteCount;
+    }
+
+  }
+
+  private static class SequenceBytesIn extends BytesInput {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BytesInput.SequenceBytesIn.class);
+
+    private final List<BytesInput> inputs;
+    private final long size;
+
+    private SequenceBytesIn(List<BytesInput> inputs) {
+      this.inputs = inputs;
+      long total = 0;
+      for (BytesInput input : inputs) {
+        total += input.size();
+      }
+      this.size = total;
+    }
+
+    @SuppressWarnings("unused")
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      for (BytesInput input : inputs) {
+
+        LOG.debug("write {} bytes to out", input.size());
+        if (input instanceof SequenceBytesIn) LOG.debug("{");
+        input.writeAllTo(out);
+        if (input instanceof SequenceBytesIn) LOG.debug("}");
+      }
+    }
+
+    @Override
+    public long size() {
+      return size;
+    }
+
+  }
+
+  private static class IntBytesInput extends BytesInput {
+
+    private final int intValue;
+
+    public IntBytesInput(int intValue) {
+      this.intValue = intValue;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      BytesUtils.writeIntLittleEndian(out, intValue);
+    }
+
+    public ByteBuffer toByteBuffer() throws IOException {
+      return ByteBuffer.allocate(4).putInt(0, intValue);
+    }
+
+    @Override
+    public long size() {
+      return 4;
+    }
+
+  }
+
+  private static class UnsignedVarIntBytesInput extends BytesInput {
+
+    private final int intValue;
+
+    public UnsignedVarIntBytesInput(int intValue) {
+      this.intValue = intValue;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      BytesUtils.writeUnsignedVarInt(intValue, out);
+    }
+
+    public ByteBuffer toByteBuffer() throws IOException {
+      ByteBuffer ret = ByteBuffer.allocate((int) size());
+      BytesUtils.writeUnsignedVarInt(intValue, ret);
+      return ret;
+    }
+
+    @Override
+    public long size() {
+      int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7;
+      return s == 0 ? 1 : s;
+    }
+  }
+
+  private static class UnsignedVarLongBytesInput extends BytesInput {
+
+    private final long longValue;
+
+    public UnsignedVarLongBytesInput(long longValue) {
+      this.longValue = longValue;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      BytesUtils.writeUnsignedVarLong(longValue, out);
+    }
+
+    @Override
+    public long size() {
+      int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7;
+      return s == 0 ? 1 : s;
+    }
+  }
+
+  private static class EmptyBytesInput extends BytesInput {
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+    }
+
+    @Override
+    public long size() {
+      return 0;
+    }
+
+    public ByteBuffer toByteBuffer() throws IOException {
+      return ByteBuffer.allocate(0);
+    }
+
+  }
+
+  private static class CapacityBAOSBytesInput extends BytesInput {
+
+    private final CapacityByteArrayOutputStream arrayOut;
+
+    private CapacityBAOSBytesInput(CapacityByteArrayOutputStream arrayOut) {
+      this.arrayOut = arrayOut;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      arrayOut.writeTo(out);
+    }
+
+    @Override
+    public long size() {
+      return arrayOut.size();
+    }
+
+  }
+
+  private static class BAOSBytesInput extends BytesInput {
+
+    private final ByteArrayOutputStream arrayOut;
+
+    private BAOSBytesInput(ByteArrayOutputStream arrayOut) {
+      this.arrayOut = arrayOut;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      arrayOut.writeTo(out);
+    }
+
+    @Override
+    public long size() {
+      return arrayOut.size();
+    }
+
+  }
+
+  private static class ByteArrayBytesInput extends BytesInput {
+
+    private final byte[] in;
+    private final int offset;
+    private final int length;
+
+    private ByteArrayBytesInput(byte[] in, int offset, int length) {
+      this.in = in;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      out.write(in, offset, length);
+    }
+
+    public ByteBuffer toByteBuffer() throws IOException {
+      return ByteBuffer.wrap(in, offset, length);
+    }
+
+    @Override
+    public long size() {
+      return length;
+    }
+
+  }
+  
+  private static class ByteBufferBytesInput extends BytesInput {
+    
+    private final ByteBuffer byteBuf;
+    private final int length;
+    private final int offset;
+
+    private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
+      this.byteBuf = byteBuf;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      final WritableByteChannel outputChannel = Channels.newChannel(out);
+      byteBuf.position(offset);
+      ByteBuffer tempBuf = byteBuf.slice();
+      tempBuf.limit(length);
+      outputChannel.write(tempBuf);
+    }
+    
+    @Override
+    public ByteBuffer toByteBuffer() throws IOException {
+      byteBuf.position(offset);
+      ByteBuffer buf = byteBuf.slice();
+      buf.limit(length);
+      return buf;
+    }
+
+    @Override
+    public long size() {
+      return length;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
new file mode 100644
index 0000000..92674d4
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -0,0 +1,337 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import static java.lang.Math.max;
+import static java.lang.Math.pow;
+import static java.lang.String.format;
+import static org.apache.parquet.Preconditions.checkArgument;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.OutputStreamCloseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy 
for growing that does not involve copying.
+ * Where ByteArrayOutputStream is backed by a single array that "grows" by 
copying into a new larger array, this output
+ * stream grows by allocating a new array (slab) and adding it to a list of 
previous arrays.
+ *
+ * Each new slab is allocated to be the same size as all the previous slabs 
combined, so these allocations become
+ * exponentially less frequent, just like ByteArrayOutputStream, with one 
difference. This output stream accepts a
+ * max capacity hint, which is a hint describing the max amount of data that 
will be written to this stream. As the
+ * total size of this stream nears this max, this stream starts to grow 
linearly instead of exponentially.
+ * So new slabs are allocated to be 1/5th of the max capacity hint,
+ * instead of equal to the total previous size of all slabs. This is useful 
because it prevents allocating roughly
+ * twice the needed space when a new slab is added just before the stream is 
done being used.
+ *
+ * When reusing this stream it will adjust the initial slab size based on the 
previous data size, aiming for fewer
+ * allocations, with the assumption that a similar amount of data will be 
written to this stream on re-use.
+ * See ({@link CapacityByteArrayOutputStream#reset()}).
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class CapacityByteArrayOutputStream extends OutputStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CapacityByteArrayOutputStream.class);
+  private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]);
+
+  private int initialSlabSize;
+  private final int maxCapacityHint;
+  private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
+
+  private ByteBuffer currentSlab;
+  private int currentSlabIndex;
+  private int bytesAllocated = 0;
+  private int bytesUsed = 0;
+  private ByteBufferAllocator allocator;
+
+  /**
+   * Return an initial slab size such that a CapacityByteArrayOutputStream 
constructed with it
+   * will end up allocating targetNumSlabs in order to reach targetCapacity. 
This aims to be
+   * a balance between the overhead of creating new slabs and wasting memory 
by eagerly making
+   * initial slabs too big.
+   *
+   * Note that targetCapacity here need not match maxCapacityHint in the 
constructor of
+   * CapacityByteArrayOutputStream, though often that would make sense.
+   *
+   * @param minSlabSize no matter what we shouldn't make slabs any smaller 
than this
+   * @param targetCapacity after we've allocated targetNumSlabs how much 
capacity should we have?
+   * @param targetNumSlabs how many slabs should it take to reach 
targetCapacity?
+   */
+  public static int initialSlabSizeHeuristic(int minSlabSize, int 
targetCapacity, int targetNumSlabs) {
+    // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double 
targetNumSlabs times
+    // before reaching the targetCapacity
+    // eg for page size of 1MB we start at 1024 bytes.
+    // we also don't want to start too small, so we also apply a minimum.
+    return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
+  }
+
+  public static CapacityByteArrayOutputStream withTargetNumSlabs(
+      int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+    return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, 
new HeapByteBufferAllocator());
+  }
+
+  /**
+   * Construct a CapacityByteArrayOutputStream configured such that its 
initial slab size is
+   * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == 
maxCapacityHint
+   */
+  public static CapacityByteArrayOutputStream withTargetNumSlabs(
+      int minSlabSize, int maxCapacityHint, int targetNumSlabs, 
ByteBufferAllocator allocator) {
+
+    return new CapacityByteArrayOutputStream(
+        initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
+        maxCapacityHint, allocator);
+  }
+
+  /**
+   * Defaults maxCapacityHint to 1MB
+   * @param initialSlabSize
+   * @deprecated use {@link 
CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, 
ByteBufferAllocator)}
+   */
+  @Deprecated
+  public CapacityByteArrayOutputStream(int initialSlabSize) {
+    this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator());
+  }
+
+  /**
+   * Defaults maxCapacityHint to 1MB
+   * @param initialSlabSize
+   * @deprecated use {@link 
CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, 
ByteBufferAllocator)}
+   */
+  @Deprecated
+  public CapacityByteArrayOutputStream(int initialSlabSize, 
ByteBufferAllocator allocator) {
+    this(initialSlabSize, 1024 * 1024, allocator);
+  }
+
+  /**
+   * @param initialSlabSize the size to make the first slab
+   * @param maxCapacityHint a hint (not guarantee) of the max amount of data 
written to this stream
+   * @deprecated use {@link 
CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, 
ByteBufferAllocator)}
+   */
+  @Deprecated
+  public CapacityByteArrayOutputStream(int initialSlabSize, int 
maxCapacityHint) {
+    this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator());
+  }
+
+  /**
+   * @param initialSlabSize the size to make the first slab
+   * @param maxCapacityHint a hint (not guarantee) of the max amount of data 
written to this stream
+   */
+  public CapacityByteArrayOutputStream(int initialSlabSize, int 
maxCapacityHint, ByteBufferAllocator allocator) {
+    checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
+    checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
+    checkArgument(maxCapacityHint >= initialSlabSize, 
String.format("maxCapacityHint can't be less than initialSlabSize %d %d", 
initialSlabSize, maxCapacityHint));
+    this.initialSlabSize = initialSlabSize;
+    this.maxCapacityHint = maxCapacityHint;
+    this.allocator = allocator;
+    reset();
+  }
+
+  /**
+   * the new slab is guaranteed to be at least minimumSize
+   * @param minimumSize the size of the data we want to copy in the new slab
+   */
+  private void addSlab(int minimumSize) {
+    int nextSlabSize;
+
+    if (bytesUsed == 0) {
+      nextSlabSize = initialSlabSize;
+    } else if (bytesUsed > maxCapacityHint / 5) {
+      // to avoid an overhead of up to twice the needed size, we get linear 
when approaching target page size
+      nextSlabSize = maxCapacityHint / 5;
+    } else {
+      // double the size every time
+      nextSlabSize = bytesUsed;
+    }
+
+    if (nextSlabSize < minimumSize) {
+      LOG.debug("slab size {} too small for value of size {}. Bumping up slab 
size", nextSlabSize, minimumSize);
+      nextSlabSize = minimumSize;
+    }
+
+    LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), 
nextSlabSize);
+
+    this.currentSlab = allocator.allocate(nextSlabSize);
+    this.slabs.add(currentSlab);
+    this.bytesAllocated += nextSlabSize;
+    this.currentSlabIndex = 0;
+  }
+
+  @Override
+  public void write(int b) {
+    if (!currentSlab.hasRemaining()) {
+      addSlab(1);
+    }
+    currentSlab.put(currentSlabIndex, (byte) b);
+    currentSlabIndex += 1;
+    currentSlab.position(currentSlabIndex);
+    bytesUsed += 1;
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) {
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) - b.length > 0)) {
+      throw new IndexOutOfBoundsException(
+          String.format("Given byte array of size %d, with requested 
length(%d) and offset(%d)", b.length, len, off));
+    }
+    if (len >= currentSlab.remaining()) {
+      final int length1 = currentSlab.remaining();
+      currentSlab.put(b, off, length1);
+      bytesUsed += length1;
+      currentSlabIndex += length1;
+      final int length2 = len - length1;
+      addSlab(length2);
+      currentSlab.put(b, off + length1, length2);
+      currentSlabIndex = length2;
+      bytesUsed += length2;
+    } else {
+      currentSlab.put(b, off, len);
+      currentSlabIndex += len;
+      bytesUsed += len;
+    }
+  }
+
+  private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws 
IOException {
+    if (buf.hasArray()) {
+      out.write(buf.array(), buf.arrayOffset(), len);
+    } else {
+      // The OutputStream interface only takes a byte[], unfortunately this 
means that a ByteBuffer
+      // not backed by a byte array must be copied to fulfil this interface
+      byte[] copy = new byte[len];
+      buf.flip();
+      buf.get(copy);
+      out.write(copy);
+    }
+  }
+
+  /**
+   * Writes the complete contents of this buffer to the specified output 
stream argument. the output
+   * stream's write method <code>out.write(slab, 0, slab.length)</code>) will 
be called once per slab.
+   *
+   * @param      out   the output stream to which to write the data.
+   * @exception  IOException  if an I/O error occurs.
+   */
+  public void writeTo(OutputStream out) throws IOException {
+    for (int i = 0; i < slabs.size() - 1; i++) {
+      writeToOutput(out, slabs.get(i), slabs.get(i).position());
+    }
+    writeToOutput(out, currentSlab, currentSlabIndex);
+  }
+
+  /**
+   * @return The total size in bytes of data written to this stream.
+   */
+  public long size() {
+    return bytesUsed;
+  }
+
+  /**
+   *
+   * @return The total size in bytes currently allocated for this stream.
+   */
+  public int getCapacity() {
+    return bytesAllocated;
+  }
+
+  /**
+   * When re-using an instance with reset, it will adjust slab size based on 
previous data size.
+   * The intent is to reuse the same instance for the same type of data (for 
example, the same column).
+   * The assumption is that the size in the buffer will be consistent.
+   */
+  public void reset() {
+    // readjust slab size.
+    // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the 
same size
+    this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
+    LOG.debug("initial slab of size {}", initialSlabSize);
+    for (ByteBuffer slab : slabs) {
+      allocator.release(slab);
+    }
+    this.slabs.clear();
+    this.bytesAllocated = 0;
+    this.bytesUsed = 0;
+    this.currentSlab = EMPTY_SLAB;
+    this.currentSlabIndex = 0;
+  }
+
+  /**
+   * @return the index of the last value written to this stream, which
+   * can be passed to {@link #setByte(long, byte)} in order to change it
+   */
+  public long getCurrentIndex() {
+    checkArgument(bytesUsed > 0, "This is an empty stream");
+    return bytesUsed - 1;
+  }
+
+  /**
+   * Replace the byte stored at position index in this stream with value
+   *
+   * @param index which byte to replace
+   * @param value the value to replace it with
+   */
+  public void setByte(long index, byte value) {
+    checkArgument(index < bytesUsed, "Index: " + index + " is >= the current 
size of: " + bytesUsed);
+
+    long seen = 0;
+    for (int i = 0; i < slabs.size(); i++) {
+      ByteBuffer slab = slabs.get(i);
+      if (index < seen + slab.limit()) {
+        // ok found index
+        slab.put((int)(index-seen), value);
+        break;
+      }
+      seen += slab.limit();
+    }
+  }
+
+  /**
+   * @param prefix  a prefix to be used for every new line in the string
+   * @return a text representation of the memory usage of this structure
+   */
+  public String memUsageString(String prefix) {
+    return format("%s %s %d slabs, %,d bytes", prefix, 
getClass().getSimpleName(), slabs.size(), getCapacity());
+  }
+
+  /**
+   * @return the total number of allocated slabs
+   */
+  int getSlabCount() {
+    return slabs.size();
+  }
+
+  @Override
+  public void close() {
+    for (ByteBuffer slab : slabs) {
+      allocator.release(slab);
+    }
+    try {
+      super.close();
+    }catch(IOException e){
+      throw new OutputStreamCloseException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
new file mode 100644
index 0000000..d333168
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.lang.String.format;
+
+public class ConcatenatingByteArrayCollector extends BytesInput {
+  private final List<byte[]> slabs = new ArrayList<byte[]>();
+  private long size = 0;
+
+  public void collect(BytesInput bytesInput) throws IOException {
+    byte[] bytes = bytesInput.toByteArray();
+    slabs.add(bytes);
+    size += bytes.length;
+  }
+
+  public void reset() {
+    size = 0;
+    slabs.clear();
+  }
+
+  @Override
+  public void writeAllTo(OutputStream out) throws IOException {
+    for (byte[] slab : slabs) {
+      out.write(slab);
+    }
+  }
+
+  @Override
+  public long size() {
+    return size;
+  }
+
+  /**
+   * @param prefix  a prefix to be used for every new line in the string
+   * @return a text representation of the memory usage of this structure
+   */
+  public String memUsageString(String prefix) {
+    return format("%s %s %d slabs, %,d bytes", prefix, 
getClass().getSimpleName(), slabs.size(), size);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
new file mode 100644
index 0000000..a092753
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
@@ -0,0 +1,424 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Based on DataInputStream but little endian and without the String/char 
methods
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class LittleEndianDataInputStream extends InputStream {
+
+  private final InputStream in;
+
+  /**
+   * Creates a LittleEndianDataInputStream that uses the specified
+   * underlying InputStream.
+   *
+   * @param  in   the specified input stream
+   */
+  public LittleEndianDataInputStream(InputStream in) {
+    this.in = in;
+  }
+
+  /**
+   * See the general contract of the <code>readFully</code>
+   * method of <code>DataInput</code>.
+   * <p>
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @param      b   the buffer into which the data is read.
+   * @exception  EOFException  if this input stream reaches the end before
+   *             reading all the bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final void readFully(byte b[]) throws IOException {
+    readFully(b, 0, b.length);
+  }
+
+  /**
+   * See the general contract of the <code>readFully</code>
+   * method of <code>DataInput</code>.
+   * <p>
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @param      b     the buffer into which the data is read.
+   * @param      off   the start offset of the data.
+   * @param      len   the number of bytes to read.
+   * @exception  EOFException  if this input stream reaches the end before
+   *               reading all the bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final void readFully(byte b[], int off, int len) throws IOException {
+    if (len < 0)
+      throw new IndexOutOfBoundsException();
+    int n = 0;
+    while (n < len) {
+      int count = in.read(b, off + n, len - n);
+      if (count < 0)
+        throw new EOFException();
+      n += count;
+    }
+  }
+
+  /**
+   * See the general contract of the <code>skipBytes</code>
+   * method of <code>DataInput</code>.
+   * <p>
+   * Bytes for this operation are read from the contained
+   * input stream.
+   *
+   * @param      n   the number of bytes to be skipped.
+   * @return     the actual number of bytes skipped.
+   * @exception  IOException  if the contained input stream does not support
+   *             seek, or the stream has been closed and
+   *             the contained input stream does not support
+   *             reading after close, or another I/O error occurs.
+   */
+  public final int skipBytes(int n) throws IOException {
+    int total = 0;
+    int cur = 0;
+
+    while ((total<n) && ((cur = (int) in.skip(n-total)) > 0)) {
+      total += cur;
+    }
+
+    return total;
+  }
+
+  /**
+   * @return
+   * @throws IOException
+   * @see java.io.InputStream#read()
+   */
+  public int read() throws IOException {
+    return in.read();
+  }
+
+  /**
+   * @return
+   * @see java.lang.Object#hashCode()
+   */
+  public int hashCode() {
+    return in.hashCode();
+  }
+
+  /**
+   * @param b
+   * @return
+   * @throws IOException
+   * @see java.io.InputStream#read(byte[])
+   */
+  public int read(byte[] b) throws IOException {
+    return in.read(b);
+  }
+
+  /**
+   * @param obj
+   * @return
+   * @see java.lang.Object#equals(java.lang.Object)
+   */
+  public boolean equals(Object obj) {
+    return in.equals(obj);
+  }
+
+  /**
+   * @param b
+   * @param off
+   * @param len
+   * @return
+   * @throws IOException
+   * @see java.io.InputStream#read(byte[], int, int)
+   */
+  public int read(byte[] b, int off, int len) throws IOException {
+    return in.read(b, off, len);
+  }
+
+  /**
+   * @param n
+   * @return
+   * @throws IOException
+   * @see java.io.InputStream#skip(long)
+   */
+  public long skip(long n) throws IOException {
+    return in.skip(n);
+  }
+
+  /**
+   * @return
+   * @throws IOException
+   * @see java.io.InputStream#available()
+   */
+  public int available() throws IOException {
+    return in.available();
+  }
+
+  /**
+   * @throws IOException
+   * @see java.io.InputStream#close()
+   */
+  public void close() throws IOException {
+    in.close();
+  }
+
+  /**
+   * @param readlimit
+   * @see java.io.InputStream#mark(int)
+   */
+  public void mark(int readlimit) {
+    in.mark(readlimit);
+  }
+
+  /**
+   * @throws IOException
+   * @see java.io.InputStream#reset()
+   */
+  public void reset() throws IOException {
+    in.reset();
+  }
+
+  /**
+   * @return
+   * @see java.io.InputStream#markSupported()
+   */
+  public boolean markSupported() {
+    return in.markSupported();
+  }
+
+  /**
+   * See the general contract of the <code>readBoolean</code>
+   * method of <code>DataInput</code>.
+   * <p>
+   * Bytes for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the <code>boolean</code> value read.
+   * @exception  EOFException  if this input stream has reached the end.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final boolean readBoolean() throws IOException {
+    int ch = in.read();
+    if (ch < 0)
+      throw new EOFException();
+    return (ch != 0);
+  }
+
+  /**
+   * See the general contract of the <code>readByte</code>
+   * method of <code>DataInput</code>.
+   * <p>
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next byte of this input stream as a signed 8-bit
+   *             <code>byte</code>.
+   * @exception  EOFException  if this input stream has reached the end.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final byte readByte() throws IOException {
+    int ch = in.read();
+    if (ch < 0)
+      throw new EOFException();
+    return (byte)(ch);
+  }
+
+  /**
+   * See the general contract of the <code>readUnsignedByte</code>
+   * method of <code>DataInput</code>.
+   * <p>
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next byte of this input stream, interpreted as an
+   *             unsigned 8-bit number.
+   * @exception  EOFException  if this input stream has reached the end.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see         java.io.FilterInputStream#in
+   */
+  public final int readUnsignedByte() throws IOException {
+    int ch = in.read();
+    if (ch < 0)
+      throw new EOFException();
+    return ch;
+  }
+
+  /**
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next two bytes of this input stream, interpreted as a
+   *             signed 16-bit number.
+   * @exception  EOFException  if this input stream reaches the end before
+   *               reading two bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final short readShort() throws IOException {
+    int ch2 = in.read();
+    int ch1 = in.read();
+    if ((ch1 | ch2) < 0)
+      throw new EOFException();
+    return (short)((ch1 << 8) + (ch2 << 0));
+  }
+
+  /**
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next two bytes of this input stream, interpreted as an
+   *             unsigned 16-bit integer.
+   * @exception  EOFException  if this input stream reaches the end before
+   *             reading two bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final int readUnsignedShort() throws IOException {
+    int ch2 = in.read();
+    int ch1 = in.read();
+    if ((ch1 | ch2) < 0)
+      throw new EOFException();
+    return (ch1 << 8) + (ch2 << 0);
+  }
+
+  /**
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next four bytes of this input stream, interpreted as an
+   *             <code>int</code>.
+   * @exception  EOFException  if this input stream reaches the end before
+   *               reading four bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final int readInt() throws IOException {
+    // TODO: has this been benchmarked against two alternate implementations?
+    // 1) Integer.reverseBytes(in.readInt())
+    // 2) keep a member byte[4], wrapped by an IntBuffer with appropriate 
endianness set,
+    //    and call IntBuffer.get()
+    // Both seem like they might be faster.
+    int ch4 = in.read();
+    int ch3 = in.read();
+    int ch2 = in.read();
+    int ch1 = in.read();
+    if ((ch1 | ch2 | ch3 | ch4) < 0)
+      throw new EOFException();
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  private byte readBuffer[] = new byte[8];
+
+  /**
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next eight bytes of this input stream, interpreted as a
+   *             <code>long</code>.
+   * @exception  EOFException  if this input stream reaches the end before
+   *               reading eight bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.io.FilterInputStream#in
+   */
+  public final long readLong() throws IOException {
+    // TODO: see perf question above in readInt
+    readFully(readBuffer, 0, 8);
+    return (((long)readBuffer[7] << 56) +
+        ((long)(readBuffer[6] & 255) << 48) +
+        ((long)(readBuffer[5] & 255) << 40) +
+        ((long)(readBuffer[4] & 255) << 32) +
+        ((long)(readBuffer[3] & 255) << 24) +
+        ((readBuffer[2] & 255) << 16) +
+        ((readBuffer[1] & 255) <<  8) +
+        ((readBuffer[0] & 255) <<  0));
+  }
+
+  /**
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next four bytes of this input stream, interpreted as a
+   *             <code>float</code>.
+   * @exception  EOFException  if this input stream reaches the end before
+   *               reading four bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.lang.Float#intBitsToFloat(int)
+   */
+  public final float readFloat() throws IOException {
+    return Float.intBitsToFloat(readInt());
+  }
+
+  /**
+   * Bytes
+   * for this operation are read from the contained
+   * input stream.
+   *
+   * @return     the next eight bytes of this input stream, interpreted as a
+   *             <code>double</code>.
+   * @exception  EOFException  if this input stream reaches the end before
+   *               reading eight bytes.
+   * @exception  IOException   the stream has been closed and the contained
+   *             input stream does not support reading after close, or
+   *             another I/O error occurs.
+   * @see        java.lang.Double#longBitsToDouble(long)
+   */
+  public final double readDouble() throws IOException {
+    return Double.longBitsToDouble(readLong());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
new file mode 100644
index 0000000..9d4a8a9
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
@@ -0,0 +1,220 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import org.apache.parquet.IOExceptionUtils;
+import org.apache.parquet.ParquetRuntimeException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Based on DataOutputStream but in little endian and without the String/char 
methods
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class LittleEndianDataOutputStream extends OutputStream {
+
+  private final OutputStream out;
+
+  /**
+   * Creates a new data output stream to write data to the specified
+   * underlying output stream. The counter <code>written</code> is
+   * set to zero.
+   *
+   * @param   out   the underlying output stream, to be saved for later
+   *                use.
+   * @see     java.io.FilterOutputStream#out
+   */
+  public LittleEndianDataOutputStream(OutputStream out) {
+    this.out = out;
+  }
+
+  /**
+   * Writes the specified byte (the low eight bits of the argument
+   * <code>b</code>) to the underlying output stream. If no exception
+   * is thrown, the counter <code>written</code> is incremented by
+   * <code>1</code>.
+   * <p>
+   * Implements the <code>write</code> method of <code>OutputStream</code>.
+   *
+   * @param      b   the <code>byte</code> to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public void write(int b) throws IOException {
+    out.write(b);
+  }
+
+  /**
+   * Writes <code>len</code> bytes from the specified byte array
+   * starting at offset <code>off</code> to the underlying output stream.
+   * If no exception is thrown, the counter <code>written</code> is
+   * incremented by <code>len</code>.
+   *
+   * @param      b     the data.
+   * @param      off   the start offset in the data.
+   * @param      len   the number of bytes to write.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public void write(byte b[], int off, int len) throws IOException {
+    out.write(b, off, len);
+  }
+
+  /**
+   * Flushes this data output stream. This forces any buffered output
+   * bytes to be written out to the stream.
+   * <p>
+   * The <code>flush</code> method of <code>DataOutputStream</code>
+   * calls the <code>flush</code> method of its underlying output stream.
+   *
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   * @see        java.io.OutputStream#flush()
+   */
+  public void flush() throws IOException {
+    out.flush();
+  }
+
+  /**
+   * Writes a <code>boolean</code> to the underlying output stream as
+   * a 1-byte value. The value <code>true</code> is written out as the
+   * value <code>(byte)1</code>; the value <code>false</code> is
+   * written out as the value <code>(byte)0</code>. If no exception is
+   * thrown, the counter <code>written</code> is incremented by
+   * <code>1</code>.
+   *
+   * @param      v   a <code>boolean</code> value to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public final void writeBoolean(boolean v) throws IOException {
+    out.write(v ? 1 : 0);
+  }
+
+  /**
+   * Writes out a <code>byte</code> to the underlying output stream as
+   * a 1-byte value. If no exception is thrown, the counter
+   * <code>written</code> is incremented by <code>1</code>.
+   *
+   * @param      v   a <code>byte</code> value to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public final void writeByte(int v) throws IOException {
+    out.write(v);
+  }
+
+  /**
+   * Writes a <code>short</code> to the underlying output stream as two
+   * bytes, low byte first. If no exception is thrown, the counter
+   * <code>written</code> is incremented by <code>2</code>.
+   *
+   * @param      v   a <code>short</code> to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public final void writeShort(int v) throws IOException {
+    out.write((v >>> 0) & 0xFF);
+    out.write((v >>> 8) & 0xFF);
+  }
+
+  /**
+   * Writes an <code>int</code> to the underlying output stream as four
+   * bytes, low byte first. If no exception is thrown, the counter
+   * <code>written</code> is incremented by <code>4</code>.
+   *
+   * @param      v   an <code>int</code> to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public final void writeInt(int v) throws IOException {
+    // TODO: see note in LittleEndianDataInputStream: maybe faster
+    // to use Integer.reverseBytes() and then writeInt, or a ByteBuffer
+    // approach
+    out.write((v >>>  0) & 0xFF);
+    out.write((v >>>  8) & 0xFF);
+    out.write((v >>> 16) & 0xFF);
+    out.write((v >>> 24) & 0xFF);
+  }
+
+  private byte writeBuffer[] = new byte[8];
+
+  /**
+   * Writes a <code>long</code> to the underlying output stream as eight
+   * bytes, low byte first. In no exception is thrown, the counter
+   * <code>written</code> is incremented by <code>8</code>.
+   *
+   * @param      v   a <code>long</code> to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   */
+  public final void writeLong(long v) throws IOException {
+    writeBuffer[7] = (byte)(v >>> 56);
+    writeBuffer[6] = (byte)(v >>> 48);
+    writeBuffer[5] = (byte)(v >>> 40);
+    writeBuffer[4] = (byte)(v >>> 32);
+    writeBuffer[3] = (byte)(v >>> 24);
+    writeBuffer[2] = (byte)(v >>> 16);
+    writeBuffer[1] = (byte)(v >>>  8);
+    writeBuffer[0] = (byte)(v >>>  0);
+    out.write(writeBuffer, 0, 8);
+  }
+
+  /**
+   * Converts the float argument to an <code>int</code> using the
+   * <code>floatToIntBits</code> method in class <code>Float</code>,
+   * and then writes that <code>int</code> value to the underlying
+   * output stream as a 4-byte quantity, low byte first. If no
+   * exception is thrown, the counter <code>written</code> is
+   * incremented by <code>4</code>.
+   *
+   * @param      v   a <code>float</code> value to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   * @see        java.lang.Float#floatToIntBits(float)
+   */
+  public final void writeFloat(float v) throws IOException {
+    writeInt(Float.floatToIntBits(v));
+  }
+
+  /**
+   * Converts the double argument to a <code>long</code> using the
+   * <code>doubleToLongBits</code> method in class <code>Double</code>,
+   * and then writes that <code>long</code> value to the underlying
+   * output stream as an 8-byte quantity, low byte first. If no
+   * exception is thrown, the counter <code>written</code> is
+   * incremented by <code>8</code>.
+   *
+   * @param      v   a <code>double</code> value to be written.
+   * @exception  IOException  if an I/O error occurs.
+   * @see        java.io.FilterOutputStream#out
+   * @see        java.lang.Double#doubleToLongBits(double)
+   */
+  public final void writeDouble(double v) throws IOException {
+    writeLong(Double.doubleToLongBits(v));
+  }
+
+  public void close() {
+    IOExceptionUtils.closeQuietly(out);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
 
b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
new file mode 100644
index 0000000..5b1b657
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.compression;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface CompressionCodecFactory {
+
+  BytesInputCompressor getCompressor(CompressionCodecName codecName);
+
+  BytesInputDecompressor getDecompressor(CompressionCodecName codecName);
+
+  void release();
+
+  interface BytesInputCompressor {
+    BytesInput compress(BytesInput bytes) throws IOException;
+    CompressionCodecName getCodecName();
+    void release();
+  }
+
+  interface BytesInputDecompressor {
+    BytesInput decompress(BytesInput bytes, int uncompressedSize) throws 
IOException;
+    void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, 
int uncompressedSize) throws IOException;
+    void release();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
 
b/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
new file mode 100644
index 0000000..bf2da32
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * This exception will be thrown when the codec is not supported by parquet, 
meaning there is no
+ * matching codec defined in {@link CompressionCodecName}
+ */
+public class CompressionCodecNotSupportedException extends RuntimeException {
+  private final Class codecClass;
+
+  public CompressionCodecNotSupportedException(Class codecClass) {
+    super("codec not supported: " + codecClass.getName());
+    this.codecClass = codecClass;
+  }
+
+  public Class getCodecClass() {
+    return codecClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
 
b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
new file mode 100644
index 0000000..8cdede0
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+
+import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
+import java.util.Locale;
+
+public enum CompressionCodecName {
+  UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
+  SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", 
CompressionCodec.SNAPPY, ".snappy"),
+  GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, 
".gz"),
+  LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
+  BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, 
".br"),
+  LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
+  ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, 
".zstd");
+
+  public static CompressionCodecName fromConf(String name) {
+     if (name == null) {
+       return UNCOMPRESSED;
+     }
+     return valueOf(name.toUpperCase(Locale.ENGLISH));
+  }
+
+  public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
+    if (clazz == null) {
+      return UNCOMPRESSED;
+    }
+    String name = clazz.getName();
+    for (CompressionCodecName codec : CompressionCodecName.values()) {
+      if (name.equals(codec.getHadoopCompressionCodecClassName())) {
+        return codec;
+      }
+    }
+    throw new CompressionCodecNotSupportedException(clazz);
+  }
+
+  public static CompressionCodecName fromParquet(CompressionCodec codec) {
+    for (CompressionCodecName codecName : CompressionCodecName.values()) {
+      if (codec.equals(codecName.parquetCompressionCodec)) {
+        return codecName;
+      }
+    }
+    throw new IllegalArgumentException("Unknown compression codec " + codec);
+  }
+
+  private final String hadoopCompressionCodecClass;
+  private final CompressionCodec parquetCompressionCodec;
+  private final String extension;
+
+  private CompressionCodecName(String hadoopCompressionCodecClass, 
CompressionCodec parquetCompressionCodec, String extension) {
+    this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
+    this.parquetCompressionCodec = parquetCompressionCodec;
+    this.extension = extension;
+  }
+
+  public String getHadoopCompressionCodecClassName() {
+    return hadoopCompressionCodecClass;
+  }
+
+  public Class getHadoopCompressionCodecClass() {
+    String codecClassName = getHadoopCompressionCodecClassName();
+    if (codecClassName==null) {
+      return null;
+    }
+    try {
+      return Class.forName(codecClassName);
+    } catch (ClassNotFoundException e) {
+      return null;
+    }
+  }
+
+  public CompressionCodec getParquetCompressionCodec() {
+    return parquetCompressionCodec;
+  }
+
+  public String getExtension() {
+    return extension;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
new file mode 100644
index 0000000..9e52428
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class DelegatingPositionOutputStream extends 
PositionOutputStream {
+  private final OutputStream stream;
+
+  public DelegatingPositionOutputStream(OutputStream stream) {
+    this.stream = stream;
+  }
+
+  public OutputStream getStream() {
+    return stream;
+  }
+
+  @Override
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public abstract long getPos() throws IOException;
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    stream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
new file mode 100644
index 0000000..bc4940c
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
@@ -0,0 +1,171 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Implements read methods required by {@link SeekableInputStream} for generic 
input streams.
+ * <p>
+ * Implementations must implement {@link #getPos()} and {@link #seek(long)} 
and may optionally
+ * implement other read methods to improve performance.
+ */
+public abstract class DelegatingSeekableInputStream extends 
SeekableInputStream {
+
+  private final int COPY_BUFFER_SIZE = 8192;
+  private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+
+  private final InputStream stream;
+
+  public DelegatingSeekableInputStream(InputStream stream) {
+    this.stream = stream;
+  }
+
+  public InputStream getStream() {
+    return stream;
+  }
+
+  @Override
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  @Override
+  public abstract long getPos() throws IOException;
+
+  @Override
+  public abstract void seek(long newPos) throws IOException;
+
+  @Override
+  public int read() throws IOException {
+    return stream.read();
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return stream.read(b, off, len);
+  }
+
+  @Override
+  public void readFully(byte[] bytes) throws IOException {
+    readFully(stream, bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void readFully(byte[] bytes, int start, int len) throws IOException {
+    readFully(stream, bytes, start, len);
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    if (buf.hasArray()) {
+      return readHeapBuffer(stream, buf);
+    } else {
+      return readDirectBuffer(stream, buf, temp);
+    }
+  }
+
+  @Override
+  public void readFully(ByteBuffer buf) throws IOException {
+    if (buf.hasArray()) {
+      readFullyHeapBuffer(stream, buf);
+    } else {
+      readFullyDirectBuffer(stream, buf, temp);
+    }
+  }
+
+  // Visible for testing
+  static void readFully(InputStream f, byte[] bytes, int start, int len) 
throws IOException {
+    int offset = start;
+    int remaining = len;
+    while (remaining > 0) {
+      int bytesRead = f.read(bytes, offset, remaining);
+      if (bytesRead < 0) {
+        throw new EOFException(
+            "Reached the end of stream with " + remaining + " bytes left to 
read");
+      }
+
+      remaining -= bytesRead;
+      offset += bytesRead;
+    }
+  }
+
+  // Visible for testing
+  static int readHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {
+    int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining());
+    if (bytesRead < 0) {
+      // if this resulted in EOF, don't update position
+      return bytesRead;
+    } else {
+      buf.position(buf.position() + bytesRead);
+      return bytesRead;
+    }
+  }
+
+  // Visible for testing
+  static void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws 
IOException {
+    readFully(f, buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining());
+    buf.position(buf.limit());
+  }
+
+  // Visible for testing
+  static int readDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) 
throws IOException {
+    // copy all the bytes that return immediately, stopping at the first
+    // read that doesn't return a full buffer.
+    int nextReadLength = Math.min(buf.remaining(), temp.length);
+    int totalBytesRead = 0;
+    int bytesRead;
+
+    while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
+      buf.put(temp);
+      totalBytesRead += bytesRead;
+      nextReadLength = Math.min(buf.remaining(), temp.length);
+    }
+
+    if (bytesRead < 0) {
+      // return -1 if nothing was read
+      return totalBytesRead == 0 ? -1 : totalBytesRead;
+    } else {
+      // copy the last partial buffer
+      buf.put(temp, 0, bytesRead);
+      totalBytesRead += bytesRead;
+      return totalBytesRead;
+    }
+  }
+
+  // Visible for testing
+  static void readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] 
temp) throws IOException {
+    int nextReadLength = Math.min(buf.remaining(), temp.length);
+    int bytesRead = 0;
+
+    while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) 
>= 0) {
+      buf.put(temp, 0, bytesRead);
+      nextReadLength = Math.min(buf.remaining(), temp.length);
+    }
+
+    if (bytesRead < 0 && buf.remaining() > 0) {
+      throw new EOFException(
+          "Reached the end of stream with " + buf.remaining() + " bytes left 
to read");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java 
b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
index e2c7cc0..f910074 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
@@ -28,15 +28,16 @@ import java.io.IOException;
 public interface InputFile {
 
   /**
-   * Returns the total length of the file, in bytes.
+   * @return the total length of the file, in bytes.
    * @throws IOException if the length cannot be determined
    */
   long getLength() throws IOException;
 
   /**
-   * Opens a new {@link SeekableInputStream} for the underlying
-   * data file.
-   * @throws IOException if the stream cannot be opened.
+   * Open a new {@link SeekableInputStream} for the underlying data file.
+   *
+   * @return a new {@link SeekableInputStream} to read the file
+   * @throws IOException if the stream cannot be opened
    */
   SeekableInputStream newStream() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java 
b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
new file mode 100644
index 0000000..2d6de44
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+
+public interface OutputFile {
+
+  PositionOutputStream create(long blockSizeHint) throws IOException;
+
+  PositionOutputStream createOrOverwrite(long blockSizeHint) throws 
IOException;
+
+  boolean supportsBlockSize();
+
+  long defaultBlockSize();
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java 
b/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java
new file mode 100644
index 0000000..066c46b
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * {@code PositionOutputStream} is an interface with the methods needed by
+ * Parquet to write data to a file or Hadoop data stream.
+ */
+public abstract class PositionOutputStream extends OutputStream {
+
+  /**
+   * Reports the current position of this output stream.
+   *
+   * @return a long, the current position in bytes starting from 0
+   * @throws IOException when the underlying stream throws IOException
+   */
+  public abstract long getPos() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java 
b/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java
new file mode 100644
index 0000000..42e3a8a
--- /dev/null
+++ b/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.ByteArrayInputStream;
+
+class MockInputStream extends ByteArrayInputStream {
+
+  static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 
};
+
+  private int[] lengths;
+  private int current = 0;
+  MockInputStream(int... actualReadLengths) {
+    super(TEST_ARRAY);
+    this.lengths = actualReadLengths;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) {
+    if (current < lengths.length) {
+      if (len <= lengths[current]) {
+        // when len == lengths[current], the next read will by 0 bytes
+        int bytesRead = super.read(b, off, len);
+        lengths[current] -= bytesRead;
+        return bytesRead;
+      } else {
+        int bytesRead = super.read(b, off, lengths[current]);
+        current += 1;
+        return bytesRead;
+      }
+    } else {
+      return super.read(b, off, len);
+    }
+  }
+
+  public long getPos() {
+    return this.pos;
+  }
+}

Reply via email to