Repository: hive
Updated Branches:
  refs/heads/llap a8ac648c8 -> b18db4f40


http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
new file mode 100644
index 0000000..567214f
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -0,0 +1,1924 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
+import 
org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream.Kind;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.SettableUncompressedStream;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory;
+
+public class EncodedTreeReaderFactory extends TreeReaderFactory {
+  /**
+   * We choose to use a toy programming language, so we cannot use multiple 
inheritance.
+   * If we could, we could have this inherit TreeReader to contain the common 
impl, and then
+   * have e.g. SettableIntTreeReader inherit both Settable... and Int.. 
TreeReader-s.
+   * Instead, we have a settable interface that the caller will cast to and 
call setBuffers.
+   */
+  public interface SettableTreeReader {
+    void setBuffers(ColumnStreamData[] streamBuffers, boolean sameStripe) 
throws IOException;
+  }
+
+  protected static class TimestampStreamReader extends TimestampTreeReader
+      implements SettableTreeReader {
+    private boolean isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _secondsStream;
+    private SettableUncompressedStream _nanosStream;
+
+    private TimestampStreamReader(int columnId, SettableUncompressedStream 
present,
+        SettableUncompressedStream data, SettableUncompressedStream nanos, 
boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding, boolean skipCorrupt) throws 
IOException {
+      super(columnId, present, data, nanos, encoding, skipCorrupt);
+      this.isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._secondsStream = data;
+      this._nanosStream = nanos;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_secondsStream.available() > 0) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        data.seek(index);
+      }
+
+      if (_nanosStream.available() > 0) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        nanos.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_secondsStream != null) {
+        
_secondsStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+      if (_nanosStream != null) {
+        
_nanosStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.SECONDARY_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData nanosStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+      private boolean skipCorrupt;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setSecondsStream(ColumnStreamData dataStream) 
{
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setNanosStream(ColumnStreamData 
secondaryStream) {
+        this.nanosStream = secondaryStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) {
+        this.skipCorrupt = skipCorrupt;
+        return this;
+      }
+
+      public TimestampStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        SettableUncompressedStream nanos = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.SECONDARY.name(),
+                fileId, nanosStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new TimestampStreamReader(columnIndex, present, data, nanos,
+            isFileCompressed, columnEncoding, skipCorrupt);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class StringStreamReader extends StringTreeReader
+      implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private boolean _isDictionaryEncoding;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+    private SettableUncompressedStream _lengthStream;
+    private SettableUncompressedStream _dictionaryStream;
+
+    private StringStreamReader(int columnId, SettableUncompressedStream 
present,
+        SettableUncompressedStream data, SettableUncompressedStream length,
+        SettableUncompressedStream dictionary,
+        boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws 
IOException {
+      super(columnId, present, data, length, dictionary, encoding);
+      this._isDictionaryEncoding = dictionary != null;
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+      this._lengthStream = length;
+      this._dictionaryStream = dictionary;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.getPresent().seek(index);
+      }
+
+      if (_isDictionaryEncoding) {
+        // DICTIONARY encoding
+
+        // data stream could be empty stream or already reached end of stream 
before present stream.
+        // This can happen if all values in stream are nulls or last row group 
values are all null.
+        if (_dataStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDictionaryTreeReader) reader).getReader().seek(index);
+        }
+      } else {
+        // DIRECT encoding
+
+        // data stream could be empty stream or already reached end of stream 
before present stream.
+        // This can happen if all values in stream are nulls or last row group 
values are all null.
+        if (_dataStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDirectTreeReader) reader).getStream().seek(index);
+        }
+
+        if (_lengthStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDirectTreeReader) reader).getLengths().seek(index);
+        }
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+      if (!_isDictionaryEncoding) {
+        if (_lengthStream != null) {
+          
_lengthStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+        }
+      }
+
+      // set these streams only if the stripe is different
+      if (!sameStripe && _isDictionaryEncoding) {
+        if (_lengthStream != null) {
+          
_lengthStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+        }
+        if (_dictionaryStream != null) {
+          
_dictionaryStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DICTIONARY_DATA_VALUE]));
+        }
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData dictionaryStream;
+      private ColumnStreamData lengthStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setLengthStream(ColumnStreamData 
lengthStream) {
+        this.lengthStream = lengthStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDictionaryStream(ColumnStreamData 
dictStream) {
+        this.dictionaryStream = dictStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public StringStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        SettableUncompressedStream length = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), fileId,
+                lengthStream);
+
+        SettableUncompressedStream dictionary = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileId, 
dictionaryStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new StringStreamReader(columnIndex, present, data, length, 
dictionary,
+            isFileCompressed, columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+
+  }
+
+  protected static class ShortStreamReader extends ShortTreeReader implements 
SettableTreeReader {
+    private boolean isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private ShortStreamReader(int columnId, SettableUncompressedStream present,
+        SettableUncompressedStream data, boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, encoding);
+      this.isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        reader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public ShortStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new ShortStreamReader(columnIndex, present, data, 
isFileCompressed,
+            columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class LongStreamReader extends LongTreeReader implements 
SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private LongStreamReader(int columnId, SettableUncompressedStream present,
+        SettableUncompressedStream data, boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding, boolean skipCorrupt) throws 
IOException {
+      super(columnId, present, data, encoding, skipCorrupt);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+      private boolean skipCorrupt;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) {
+        this.skipCorrupt = skipCorrupt;
+        return this;
+      }
+
+      public LongStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new LongStreamReader(columnIndex, present, data, 
isFileCompressed,
+            columnEncoding, skipCorrupt);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class IntStreamReader extends IntTreeReader implements 
SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private IntStreamReader(int columnId, SettableUncompressedStream present,
+        SettableUncompressedStream data, boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, encoding);
+      this._isFileCompressed = isFileCompressed;
+      this._dataStream = data;
+      this._presentStream = present;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public IntStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new IntStreamReader(columnIndex, present, data, 
isFileCompressed,
+            columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+
+  }
+
+  protected static class FloatStreamReader extends FloatTreeReader implements 
SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private FloatStreamReader(int columnId, SettableUncompressedStream present,
+        SettableUncompressedStream data, boolean isFileCompressed) throws 
IOException {
+      super(columnId, present, data);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        stream.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public FloatStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new FloatStreamReader(columnIndex, present, data, 
isFileCompressed);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+
+  }
+
+  protected static class DoubleStreamReader extends DoubleTreeReader 
implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private DoubleStreamReader(int columnId, SettableUncompressedStream 
present,
+        SettableUncompressedStream data, boolean isFileCompressed) throws 
IOException {
+      super(columnId, present, data);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        stream.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public DoubleStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new DoubleStreamReader(columnIndex, present, data, 
isFileCompressed);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class DecimalStreamReader extends DecimalTreeReader 
implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _valueStream;
+    private SettableUncompressedStream _scaleStream;
+
+    private DecimalStreamReader(int columnId, int precision, int scale,
+        SettableUncompressedStream presentStream,
+        SettableUncompressedStream valueStream, SettableUncompressedStream 
scaleStream,
+        boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, precision, scale, presentStream, valueStream, 
scaleStream, encoding);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = presentStream;
+      this._valueStream = valueStream;
+      this._scaleStream = scaleStream;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_valueStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        valueStream.seek(index);
+      }
+
+      if (_scaleStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        scaleReader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_valueStream != null) {
+        
_valueStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+      if (_scaleStream != null) {
+        
_scaleStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.SECONDARY_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData valueStream;
+      private ColumnStreamData scaleStream;
+      private int scale;
+      private int precision;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPrecision(int precision) {
+        this.precision = precision;
+        return this;
+      }
+
+      public StreamReaderBuilder setScale(int scale) {
+        this.scale = scale;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setValueStream(ColumnStreamData valueStream) {
+        this.valueStream = valueStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setScaleStream(ColumnStreamData scaleStream) {
+        this.scaleStream = scaleStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public DecimalStreamReader build() throws IOException {
+        SettableUncompressedStream presentInStream = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.PRESENT.name(), fileId, presentStream);
+
+        SettableUncompressedStream valueInStream = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.DATA.name(), fileId, valueStream);
+
+        SettableUncompressedStream scaleInStream = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.SECONDARY.name(), fileId, scaleStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new DecimalStreamReader(columnIndex, precision, scale, 
presentInStream,
+            valueInStream,
+            scaleInStream, isFileCompressed, columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class DateStreamReader extends DateTreeReader implements 
SettableTreeReader {
+    private boolean isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private DateStreamReader(int columnId, SettableUncompressedStream present,
+        SettableUncompressedStream data, boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, encoding);
+      this.isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (isFileCompressed) {
+          index.getNext();
+        }
+        reader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public DateStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new DateStreamReader(columnIndex, present, data, 
isFileCompressed,
+            columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class CharStreamReader extends CharTreeReader implements 
SettableTreeReader {
+    private boolean _isFileCompressed;
+    private boolean _isDictionaryEncoding;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+    private SettableUncompressedStream _lengthStream;
+    private SettableUncompressedStream _dictionaryStream;
+
+    private CharStreamReader(int columnId, int maxLength,
+        SettableUncompressedStream present, SettableUncompressedStream data,
+        SettableUncompressedStream length, SettableUncompressedStream 
dictionary,
+        boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws 
IOException {
+      super(columnId, maxLength, present, data, length,
+          dictionary, encoding);
+      this._isDictionaryEncoding = dictionary != null;
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+      this._lengthStream = length;
+      this._dictionaryStream = dictionary;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.getPresent().seek(index);
+      }
+
+      if (_isDictionaryEncoding) {
+        // DICTIONARY encoding
+
+        // data stream could be empty stream or already reached end of stream 
before present stream.
+        // This can happen if all values in stream are nulls or last row group 
values are all null.
+        if (_dataStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDictionaryTreeReader) reader).getReader().seek(index);
+        }
+      } else {
+        // DIRECT encoding
+
+        // data stream could be empty stream or already reached end of stream 
before present stream.
+        // This can happen if all values in stream are nulls or last row group 
values are all null.
+        if (_dataStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDirectTreeReader) reader).getStream().seek(index);
+        }
+
+        if (_lengthStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDirectTreeReader) reader).getLengths().seek(index);
+        }
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+      if (!_isDictionaryEncoding) {
+        if (_lengthStream != null) {
+          
_lengthStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+        }
+      }
+
+      // set these streams only if the stripe is different
+      if (!sameStripe && _isDictionaryEncoding) {
+        if (_lengthStream != null) {
+          
_lengthStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+        }
+        if (_dictionaryStream != null) {
+          
_dictionaryStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DICTIONARY_DATA_VALUE]));
+        }
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private int maxLength;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData dictionaryStream;
+      private ColumnStreamData lengthStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setMaxLength(int maxLength) {
+        this.maxLength = maxLength;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setLengthStream(ColumnStreamData 
lengthStream) {
+        this.lengthStream = lengthStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDictionaryStream(ColumnStreamData 
dictStream) {
+        this.dictionaryStream = dictStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public CharStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        SettableUncompressedStream length = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), fileId,
+                lengthStream);
+
+        SettableUncompressedStream dictionary = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileId, 
dictionaryStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new CharStreamReader(columnIndex, maxLength, present, data, 
length,
+            dictionary, isFileCompressed, columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+
+  }
+
+  protected static class VarcharStreamReader extends VarcharTreeReader 
implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private boolean _isDictionaryEncoding;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+    private SettableUncompressedStream _lengthStream;
+    private SettableUncompressedStream _dictionaryStream;
+
+    private VarcharStreamReader(int columnId, int maxLength,
+        SettableUncompressedStream present, SettableUncompressedStream data,
+        SettableUncompressedStream length, SettableUncompressedStream 
dictionary,
+        boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws 
IOException {
+      super(columnId, maxLength, present, data, length,
+          dictionary, encoding);
+      this._isDictionaryEncoding = dictionary != null;
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+      this._lengthStream = length;
+      this._dictionaryStream = dictionary;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.getPresent().seek(index);
+      }
+
+      if (_isDictionaryEncoding) {
+        // DICTIONARY encoding
+
+        // data stream could be empty stream or already reached end of stream 
before present stream.
+        // This can happen if all values in stream are nulls or last row group 
values are all null.
+        if (_dataStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDictionaryTreeReader) reader).getReader().seek(index);
+        }
+      } else {
+        // DIRECT encoding
+
+        // data stream could be empty stream or already reached end of stream 
before present stream.
+        // This can happen if all values in stream are nulls or last row group 
values are all null.
+        if (_dataStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDirectTreeReader) reader).getStream().seek(index);
+        }
+
+        if (_lengthStream.available() > 0) {
+          if (_isFileCompressed) {
+            index.getNext();
+          }
+          ((StringDirectTreeReader) reader).getLengths().seek(index);
+        }
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+      if (!_isDictionaryEncoding) {
+        if (_lengthStream != null) {
+          
_lengthStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+        }
+      }
+
+      // set these streams only if the stripe is different
+      if (!sameStripe && _isDictionaryEncoding) {
+        if (_lengthStream != null) {
+          
_lengthStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+        }
+        if (_dictionaryStream != null) {
+          
_dictionaryStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DICTIONARY_DATA_VALUE]));
+        }
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private int maxLength;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData dictionaryStream;
+      private ColumnStreamData lengthStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setMaxLength(int maxLength) {
+        this.maxLength = maxLength;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setLengthStream(ColumnStreamData 
lengthStream) {
+        this.lengthStream = lengthStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDictionaryStream(ColumnStreamData 
dictStream) {
+        this.dictionaryStream = dictStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public VarcharStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        SettableUncompressedStream length = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), fileId,
+                lengthStream);
+
+        SettableUncompressedStream dictionary = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileId, 
dictionaryStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new VarcharStreamReader(columnIndex, maxLength, present, data, 
length,
+            dictionary, isFileCompressed, columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+
+  }
+
+  protected static class ByteStreamReader extends ByteTreeReader implements 
SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private ByteStreamReader(int columnId, SettableUncompressedStream present,
+        SettableUncompressedStream data, boolean isFileCompressed) throws 
IOException {
+      super(columnId, present, data);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public ByteStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new ByteStreamReader(columnIndex, present, data, 
isFileCompressed);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class BinaryStreamReader extends BinaryTreeReader 
implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+    private SettableUncompressedStream _lengthsStream;
+
+    private BinaryStreamReader(int columnId, SettableUncompressedStream 
present,
+        SettableUncompressedStream data, SettableUncompressedStream length,
+        boolean isFileCompressed,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, encoding);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+      this._lengthsStream = length;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        stream.seek(index);
+      }
+
+      if (lengths != null && _lengthsStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        lengths.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+      if (_lengthsStream != null) {
+        
_lengthsStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.LENGTH_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData lengthStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setLengthStream(ColumnStreamData 
secondaryStream) {
+        this.lengthStream = secondaryStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding 
encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public BinaryStreamReader build() throws IOException {
+        SettableUncompressedStream present = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.PRESENT.name(), fileId, presentStream);
+
+        SettableUncompressedStream data = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.DATA.name(), fileId, dataStream);
+
+        SettableUncompressedStream length = 
StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.LENGTH.name(), fileId, lengthStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new BinaryStreamReader(columnIndex, present, data, length, 
isFileCompressed,
+            columnEncoding);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  protected static class BooleanStreamReader extends BooleanTreeReader 
implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    private BooleanStreamReader(int columnId, SettableUncompressedStream 
present,
+        SettableUncompressedStream data, boolean isFileCompressed) throws 
IOException {
+      super(columnId, present, data);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = data;
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      if (present != null) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        present.seek(index);
+      }
+
+      // data stream could be empty stream or already reached end of stream 
before present stream.
+      // This can happen if all values in stream are nulls or last row group 
values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          index.getNext();
+        }
+        reader.seek(index);
+      }
+    }
+
+    @Override
+    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+        throws IOException {
+      if (_presentStream != null) {
+        
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        
_dataStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[Kind.DATA_VALUE]));
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private Long fileId;
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+
+      public StreamReaderBuilder setFileId(Long fileId) {
+        this.fileId = fileId;
+        return this;
+      }
+
+      public StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StreamReaderBuilder setPresentStream(ColumnStreamData 
presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public StreamReaderBuilder setCompressionCodec(CompressionCodec 
compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public BooleanStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                fileId, presentStream);
+
+        SettableUncompressedStream data = StreamUtils
+            
.createSettableUncompressedStream(OrcProto.Stream.Kind.DATA.name(), fileId,
+                dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new BooleanStreamReader(columnIndex, present, data, 
isFileCompressed);
+      }
+    }
+
+    public static StreamReaderBuilder builder() {
+      return new StreamReaderBuilder();
+    }
+  }
+
+  public static TreeReader[] createEncodedTreeReader(int numCols,
+      List<OrcProto.Type> types,
+      List<OrcProto.ColumnEncoding> encodings,
+      EncodedColumnBatch<OrcBatchKey> batch,
+      CompressionCodec codec, boolean skipCorrupt) throws IOException {
+    long file = batch.getBatchKey().file;
+    TreeReader[] treeReaders = new TreeReader[numCols];
+    for (int i = 0; i < numCols; i++) {
+      int columnIndex = batch.getColumnIxs()[i];
+      ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
+      OrcProto.Type columnType = types.get(columnIndex);
+
+      // EncodedColumnBatch is already decompressed, we don't really need to 
pass codec.
+      // But we need to know if the original data is compressed or not. This 
is used to skip
+      // positions in row index properly. If the file is originally compressed,
+      // then 1st position (compressed offset) in row index should be skipped 
to get
+      // uncompressed offset, else 1st position should not be skipped.
+      // TODO: there should be a better way to do this, code just needs to be 
modified
+      OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
+
+      // stream buffers are arranged in enum order of stream kind
+      ColumnStreamData present = streamBuffers[Kind.PRESENT_VALUE],
+        data = streamBuffers[Kind.DATA_VALUE],
+        dictionary = streamBuffers[Kind.DICTIONARY_DATA_VALUE],
+        lengths = streamBuffers[Kind.LENGTH_VALUE],
+        secondary = streamBuffers[Kind.SECONDARY_VALUE];
+
+      switch (columnType.getKind()) {
+        case BINARY:
+          treeReaders[i] = BinaryStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case BOOLEAN:
+          treeReaders[i] = BooleanStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .build();
+          break;
+        case BYTE:
+          treeReaders[i] = ByteStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .build();
+          break;
+        case SHORT:
+          treeReaders[i] = ShortStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case INT:
+          treeReaders[i] = IntStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case LONG:
+          treeReaders[i] = LongStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .skipCorrupt(skipCorrupt)
+              .build();
+          break;
+        case FLOAT:
+          treeReaders[i] = FloatStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .build();
+          break;
+        case DOUBLE:
+          treeReaders[i] = DoubleStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .build();
+          break;
+        case CHAR:
+          treeReaders[i] = CharStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setMaxLength(columnType.getMaximumLength())
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setDictionaryStream(dictionary)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case VARCHAR:
+          treeReaders[i] = VarcharStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setMaxLength(columnType.getMaximumLength())
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setDictionaryStream(dictionary)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case STRING:
+          treeReaders[i] = StringStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setDictionaryStream(dictionary)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case DECIMAL:
+          treeReaders[i] = DecimalStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPrecision(columnType.getPrecision())
+              .setScale(columnType.getScale())
+              .setPresentStream(present)
+              .setValueStream(data)
+              .setScaleStream(secondary)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case TIMESTAMP:
+          treeReaders[i] = TimestampStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setSecondsStream(data)
+              .setNanosStream(secondary)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .skipCorrupt(skipCorrupt)
+              .build();
+          break;
+        case DATE:
+          treeReaders[i] = DateStreamReader.builder()
+              .setFileId(file)
+              .setColumnIndex(columnIndex)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        default:
+          throw new UnsupportedOperationException("Data type not supported 
yet! " + columnType);
+      }
+    }
+
+    return treeReaders;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 8e7c62e..698632b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import 
org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.ql.io.orc.DataReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
 
 /**
  * The interface for reading encoded data from ORC files.
@@ -43,6 +44,11 @@ public interface Reader extends 
org.apache.hadoop.hive.ql.io.orc.Reader {
   public static final class OrcEncodedColumnBatch extends 
EncodedColumnBatch<OrcBatchKey> {
     /** RG index indicating the data applies for all RGs (e.g. a string 
dictionary). */
     public static final int ALL_RGS = -1;
+    /**
+     * All the previous streams are data streams, this and the next ones are 
index streams.
+     * We assume the sort will stay the same for backward compat.
+     */
+    public static final int MAX_DATA_STREAMS = 
Stream.Kind.ROW_INDEX.getNumber();
     public void init(long fileId, int stripeIx, int rgIx, int columnCount) {
       if (batchKey == null) {
         batchKey = new OrcBatchKey(fileId, stripeIx, rgIx);

http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
new file mode 100644
index 0000000..814697d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.common.DiskRangeInfo;
+import 
org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.SettableUncompressedStream;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
+
+/**
+ * Stream utility.
+ */
+public class StreamUtils {
+
+  /**
+   * Create SettableUncompressedStream from stream buffer.
+   *
+   * @param streamName - stream name
+   * @param fileId - file id
+   * @param streamBuffer - stream buffer
+   * @return - SettableUncompressedStream
+   * @throws IOException
+   */
+  public static SettableUncompressedStream 
createSettableUncompressedStream(String streamName,
+      Long fileId, ColumnStreamData streamBuffer) throws IOException {
+    if (streamBuffer == null) {
+      return null;
+    }
+
+    DiskRangeInfo diskRangeInfo = createDiskRangeInfo(streamBuffer);
+    return new SettableUncompressedStream(fileId, streamName, 
diskRangeInfo.getDiskRanges(),
+        diskRangeInfo.getTotalLength());
+  }
+
+  /**
+   * Converts stream buffers to disk ranges.
+   * @param streamBuffer - stream buffer
+   * @return - total length of disk ranges
+   */
+  public static DiskRangeInfo createDiskRangeInfo(ColumnStreamData 
streamBuffer) {
+    DiskRangeInfo diskRangeInfo = new 
DiskRangeInfo(streamBuffer.getIndexBaseOffset());
+    long offset = streamBuffer.getIndexBaseOffset(); // See ctor comment.
+    // TODO: we should get rid of this
+    for (MemoryBuffer memoryBuffer : streamBuffer.getCacheBuffers()) {
+      ByteBuffer buffer = memoryBuffer.getByteBufferDup();
+      diskRangeInfo.addDiskRange(new RecordReaderImpl.BufferChunk(buffer, 
offset));
+      offset += buffer.remaining();
+    }
+    return diskRangeInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
 
b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
index 72355e2..3ef7abe 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.common.io.encoded;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
 /**
  * A block of data for a given section of a file, similar to VRB but in 
encoded form.
  * Stores a set of buffers for each encoded stream that is a part of each 
column.
@@ -36,21 +35,15 @@ public class EncodedColumnBatch<BatchKey> {
     private List<MemoryBuffer> cacheBuffers;
     /** Base offset from the beginning of the indexable unit; for example, for 
ORC,
      * offset from the CB in a compressed file, from the stream in 
uncompressed file. */
-    private int indexBaseOffset;
-    /** Stream type; format-specific. */
-    private int streamKind;
+    private int indexBaseOffset = 0;
 
     /** Reference count. */
     private AtomicInteger refCount = new AtomicInteger(0);
 
-    public void init(int kind) {
-      streamKind = kind;
-      indexBaseOffset = 0;
-    }
-
     public void reset() {
       cacheBuffers.clear();
       refCount.set(0);
+      indexBaseOffset = 0;
     }
 
     public void incRef() {
@@ -78,15 +71,14 @@ public class EncodedColumnBatch<BatchKey> {
     public void setIndexBaseOffset(int indexBaseOffset) {
       this.indexBaseOffset = indexBaseOffset;
     }
-
-    public int getStreamKind() {
-      return streamKind;
-    }
   }
 
   /** The key that is used to map this batch to source location. */
   protected BatchKey batchKey;
-  /** Stream data for each stream, for each included column. */
+  /**
+   * Stream data for each stream, for each included column.
+   * For each column, streams are indexed by kind, with missing elements being 
null.
+   */
   protected ColumnStreamData[][] columnData;
   /** Column indexes included in the batch. Correspond to columnData elements. 
*/
   protected int[] columnIxs;
@@ -99,20 +91,24 @@ public class EncodedColumnBatch<BatchKey> {
   public int version = Integer.MIN_VALUE;
 
   public void reset() {
-    if (columnData != null) {
-      for (int i = 0; i < columnData.length; ++i) {
-        columnData[i] = null;
+    if (columnData == null) return;
+    for (int i = 0; i < columnData.length; ++i) {
+      if (columnData[i] == null) continue;
+      for (int j = 0; j < columnData[i].length; ++j) {
+        columnData[i][j] = null;
       }
     }
   }
 
   public void initColumn(int colIxMod, int colIx, int streamCount) {
     columnIxs[colIxMod] = colIx;
-    columnData[colIxMod] = new ColumnStreamData[streamCount];
+    if (columnData[colIxMod] == null || columnData[colIxMod].length != 
streamCount) {
+      columnData[colIxMod] = new ColumnStreamData[streamCount];
+    }
   }
 
-  public void setStreamData(int colIxMod, int streamIx, ColumnStreamData sb) {
-    columnData[colIxMod][streamIx] = sb;
+  public void setStreamData(int colIxMod, int streamKind, ColumnStreamData 
csd) {
+    columnData[colIxMod][streamKind] = csd;
   }
 
   public void setAllStreamsData(int colIxMod, int colIx, ColumnStreamData[] 
sbs) {
@@ -135,6 +131,12 @@ public class EncodedColumnBatch<BatchKey> {
   protected void resetColumnArrays(int columnCount) {
     if (columnIxs != null && columnCount == columnIxs.length) return;
     columnIxs = new int[columnCount];
-    columnData = new ColumnStreamData[columnCount][];
+    ColumnStreamData[][] columnData = new ColumnStreamData[columnCount][];
+    if (this.columnData != null) {
+      for (int i = 0; i < Math.min(columnData.length, this.columnData.length); 
++i) {
+        columnData[i] = this.columnData[i];
+      }
+    }
+    this.columnData = columnData;
   }
 }
\ No newline at end of file

Reply via email to