Repository: hive
Updated Branches:
  refs/heads/master 966d2b303 -> 39d46e8af


HIVE-17931: Implement Parquet vectorization reader for Array type (Colin Ma, 
reviewed by Vihang and Ferdinand)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39d46e8a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39d46e8a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39d46e8a

Branch: refs/heads/master
Commit: 39d46e8af5a3794f7395060b890f94ddc84516e7
Parents: 966d2b3
Author: Ferdinand Xu <cheng.a...@intel.com>
Authored: Mon Nov 20 09:39:38 2017 +0800
Committer: Ferdinand Xu <cheng.a...@intel.com>
Committed: Mon Nov 20 10:23:18 2017 +0800

----------------------------------------------------------------------
 .../vector/BaseVectorizedColumnReader.java      | 263 +++++++++++
 .../vector/VectorizedListColumnReader.java      | 445 +++++++++++++++++++
 .../vector/VectorizedParquetRecordReader.java   |  26 ++
 .../vector/VectorizedPrimitiveColumnReader.java | 230 +---------
 .../parquet/TestVectorizedListColumnReader.java | 308 +++++++++++++
 .../parquet/VectorizedColumnReaderTestBase.java |  10 +
 6 files changed, 1056 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
new file mode 100644
index 0000000..d132e07
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records 
for a column,
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ */
+public abstract class BaseVectorizedColumnReader implements 
VectorizedColumnReader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseVectorizedColumnReader.class);
+
+  protected boolean skipTimestampConversion = false;
+
+  /**
+   * Total number of values read.
+   */
+  protected long valuesRead;
+
+  /**
+   * value that indicates the end of the current page. That is,
+   * if valuesRead == endOfPageValueCount, we are at the end of the page.
+   */
+  protected long endOfPageValueCount;
+
+  /**
+   * The dictionary, if this column has dictionary encoding.
+   */
+  protected final Dictionary dictionary;
+
+  /**
+   * If true, the current page is dictionary encoded.
+   */
+  protected boolean isCurrentPageDictionaryEncoded;
+
+  /**
+   * Maximum definition level for this column.
+   */
+  protected final int maxDefLevel;
+
+  protected int definitionLevel;
+  protected int repetitionLevel;
+
+  /**
+   * Repetition/Definition/Value readers.
+   */
+  protected IntIterator repetitionLevelColumn;
+  protected IntIterator definitionLevelColumn;
+  protected ValuesReader dataColumn;
+
+  /**
+   * Total values in the current page.
+   */
+  protected int pageValueCount;
+
+  protected final PageReader pageReader;
+  protected final ColumnDescriptor descriptor;
+  protected final Type type;
+
+  public BaseVectorizedColumnReader(
+      ColumnDescriptor descriptor,
+      PageReader pageReader,
+      boolean skipTimestampConversion,
+      Type type) throws IOException {
+    this.descriptor = descriptor;
+    this.type = type;
+    this.pageReader = pageReader;
+    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+    this.skipTimestampConversion = skipTimestampConversion;
+
+    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+    if (dictionaryPage != null) {
+      try {
+        this.dictionary = 
dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+        this.isCurrentPageDictionaryEncoded = true;
+      } catch (IOException e) {
+        throw new IOException("could not decode the dictionary for " + 
descriptor, e);
+      }
+    } else {
+      this.dictionary = null;
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+  }
+
+  protected void readRepetitionAndDefinitionLevels() {
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
+    valuesRead++;
+  }
+
+  protected void readPage() throws IOException {
+    DataPage page = pageReader.readPage();
+
+    if (page == null) {
+      return;
+    }
+    // TODO: Why is this a visitor?
+    page.accept(new DataPage.Visitor<Void>() {
+      @Override
+      public Void visit(DataPageV1 dataPageV1) {
+        readPageV1(dataPageV1);
+        return null;
+      }
+
+      @Override
+      public Void visit(DataPageV2 dataPageV2) {
+        readPageV2(dataPageV2);
+        return null;
+      }
+    });
+  }
+
+  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, 
int valueCount) throws IOException {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = valuesRead + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
+      this.dataColumn = null;
+      if (dictionary == null) {
+        throw new IOException(
+            "could not read page in col " + descriptor +
+                " as the dictionary was missing for encoding " + dataEncoding);
+      }
+      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, 
VALUES, dictionary);
+      this.isCurrentPageDictionaryEncoded = true;
+    } else {
+      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+
+    try {
+      dataColumn.initFromPage(pageValueCount, bytes, offset);
+    } catch (IOException e) {
+      throw new IOException("could not read page in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, 
REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, 
DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    try {
+      byte[] bytes = page.getBytes().toByteArray();
+      LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + 
" records");
+      LOG.debug("reading repetition levels at 0");
+      rlReader.initFromPage(pageValueCount, bytes, 0);
+      int next = rlReader.getNextOffset();
+      LOG.debug("reading definition levels at " + next);
+      dlReader.initFromPage(pageValueCount, bytes, next);
+      next = dlReader.getNextOffset();
+      LOG.debug("reading data at " + next);
+      initDataReader(page.getValueEncoding(), bytes, next, 
page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in 
col " + descriptor, e);
+    }
+  }
+
+  private void readPageV2(DataPageV2 page) {
+    this.pageValueCount = page.getValueCount();
+    this.repetitionLevelColumn = 
newRLEIterator(descriptor.getMaxRepetitionLevel(),
+        page.getRepetitionLevels());
+    this.definitionLevelColumn = 
newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
+    try {
+      LOG.debug("page data size " + page.getData().size() + " bytes and " + 
pageValueCount + " records");
+      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, 
page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in 
col " + descriptor, e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+          new RunLengthBitPackingHybridDecoder(
+              BytesUtils.getWidthFromMaxInt(maxLevel),
+              new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read levels in page for 
col " + descriptor, e);
+    }
+  }
+
+  /**
+   * Utility classes to abstract over different way to read ints with 
different encodings.
+   * TODO: remove this layer of abstraction?
+   */
+  abstract static class IntIterator {
+    abstract int nextInt();
+  }
+
+  protected static final class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  protected static final class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  protected static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() { return 0; }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
new file mode 100644
index 0000000..ea4f2f2
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.Type;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records 
for a list column.
+ */
+public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
+
+  // The value read in last time
+  private Object lastValue;
+
+  // flag to indicate if there is no data in parquet data page
+  private boolean eof = false;
+
+  // flag to indicate if it's the first time to read parquet data page with 
this instance
+  boolean isFirstRow = true;
+
+  public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader,
+    boolean skipTimestampConversion, Type type) throws IOException {
+    super(descriptor, pageReader, skipTimestampConversion, type);
+  }
+
+  @Override
+  public void readBatch(int total, ColumnVector column, TypeInfo columnType) 
throws IOException {
+    ListColumnVector lcv = (ListColumnVector) column;
+    // Because the length of ListColumnVector.child can't be known now,
+    // the valueList will save all data for ListColumnVector temporary.
+    List<Object> valueList = new ArrayList<>();
+
+    PrimitiveObjectInspector.PrimitiveCategory category =
+        ((PrimitiveTypeInfo) ((ListTypeInfo) 
columnType).getListElementTypeInfo()).getPrimitiveCategory();
+
+    // read the first row in parquet data page, this will be only happened 
once for this instance
+    if(isFirstRow){
+      if (!fetchNextValue(category)) {
+        return;
+      }
+      isFirstRow = false;
+    }
+
+    int index = 0;
+    while (!eof && index < total) {
+      // add element to ListColumnVector one by one
+      addElement(lcv, valueList, category, index);
+      index++;
+    }
+
+    // Decode the value if necessary
+    if (isCurrentPageDictionaryEncoded) {
+      valueList = decodeDictionaryIds(valueList);
+    }
+    // Convert valueList to array for the ListColumnVector.child
+    convertValueListToListColumnVector(category, lcv, valueList, index);
+  }
+
+  private int readPageIfNeed() throws IOException {
+    // Compute the number of values we want to read in this page.
+    int leftInPage = (int) (endOfPageValueCount - valuesRead);
+    if (leftInPage == 0) {
+      // no data left in current page, load data from new page
+      readPage();
+      leftInPage = (int) (endOfPageValueCount - valuesRead);
+    }
+    return leftInPage;
+  }
+
+  private boolean fetchNextValue(PrimitiveObjectInspector.PrimitiveCategory 
category) throws IOException {
+    int left = readPageIfNeed();
+    if (left > 0) {
+      // get the values of repetition and definitionLevel
+      readRepetitionAndDefinitionLevels();
+      // read the data if it isn't null
+      if (definitionLevel == maxDefLevel) {
+        if (isCurrentPageDictionaryEncoded) {
+          lastValue = dataColumn.readValueDictionaryId();
+        } else {
+          lastValue = readPrimitiveTypedRow(category);
+        }
+      }
+      return true;
+    } else {
+      eof = true;
+      return false;
+    }
+  }
+
+  /**
+   * The function will set all data from parquet data page for an element in 
ListColumnVector
+   */
+  private void addElement(ListColumnVector lcv, List<Object> elements, 
PrimitiveObjectInspector.PrimitiveCategory category, int index) throws 
IOException {
+    lcv.offsets[index] = elements.size();
+
+    // Return directly if last value is null
+    if (definitionLevel < maxDefLevel) {
+      lcv.isNull[index] = true;
+      lcv.lengths[index] = 0;
+      // fetch the data from parquet data page for next call
+      fetchNextValue(category);
+      return;
+    }
+
+    do {
+      // add all data for an element in ListColumnVector, get out the loop if 
there is no data or the data is for new element
+      elements.add(lastValue);
+    } while (fetchNextValue(category) && (repetitionLevel != 0));
+
+    lcv.isNull[index] = false;
+    lcv.lengths[index] = elements.size() - lcv.offsets[index];
+  }
+
+  private Object 
readPrimitiveTypedRow(PrimitiveObjectInspector.PrimitiveCategory category) {
+    switch (category) {
+      case INT:
+      case BYTE:
+      case SHORT:
+        return dataColumn.readInteger();
+      case DATE:
+      case INTERVAL_YEAR_MONTH:
+      case LONG:
+        return dataColumn.readLong();
+      case BOOLEAN:
+        return dataColumn.readBoolean() ? 1 : 0;
+      case DOUBLE:
+        return dataColumn.readDouble();
+      case BINARY:
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return dataColumn.readBytes().getBytesUnsafe();
+      case FLOAT:
+        return dataColumn.readFloat();
+      case DECIMAL:
+        return dataColumn.readBytes().getBytesUnsafe();
+      case INTERVAL_DAY_TIME:
+      case TIMESTAMP:
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  private List decodeDictionaryIds(List valueList) {
+    int total = valueList.size();
+    List resultList;
+    List<Integer> intList = (List<Integer>) valueList;
+    switch (descriptor.getType()) {
+      case INT32:
+        resultList = new ArrayList<Integer>(total);
+        for (int i = 0; i < total; ++i) {
+          resultList.add(dictionary.decodeToInt(intList.get(i)));
+        }
+        break;
+      case INT64:
+        resultList = new ArrayList<Long>(total);
+        for (int i = 0; i < total; ++i) {
+          resultList.add(dictionary.decodeToLong(intList.get(i)));
+        }
+        break;
+      case FLOAT:
+        resultList = new ArrayList<Float>(total);
+        for (int i = 0; i < total; ++i) {
+          resultList.add(dictionary.decodeToFloat(intList.get(i)));
+        }
+        break;
+      case DOUBLE:
+        resultList = new ArrayList<Double>(total);
+        for (int i = 0; i < total; ++i) {
+          resultList.add(dictionary.decodeToDouble(intList.get(i)));
+        }
+        break;
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+        resultList = new ArrayList<byte[]>(total);
+        for (int i = 0; i < total; ++i) {
+          
resultList.add(dictionary.decodeToBinary(intList.get(i)).getBytesUnsafe());
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + 
descriptor.getType());
+    }
+    return resultList;
+  }
+
+  /**
+   * The lengths & offsets will be initialized as default size (1024),
+   * it should be set to the actual size according to the element number.
+   */
+  private void setChildrenInfo(ListColumnVector lcv, int itemNum, int 
elementNum) {
+    lcv.childCount = itemNum;
+    long[] lcvLength = new long[elementNum];
+    long[] lcvOffset = new long[elementNum];
+    System.arraycopy(lcv.lengths, 0, lcvLength, 0, elementNum);
+    System.arraycopy(lcv.offsets, 0, lcvOffset, 0, elementNum);
+    lcv.lengths = lcvLength;
+    lcv.offsets = lcvOffset;
+  }
+
+  private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory 
category, ListColumnVector lcv,
+      List valueList, int elementNum) {
+    int total = valueList.size();
+    setChildrenInfo(lcv, total, elementNum);
+    switch (category) {
+      case INT:
+      case BYTE:
+      case SHORT:
+      case BOOLEAN:
+        lcv.child = new LongColumnVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          ((LongColumnVector)lcv.child).vector[i] = 
((List<Integer>)valueList).get(i);
+        }
+        break;
+      case DATE:
+      case INTERVAL_YEAR_MONTH:
+      case LONG:
+        lcv.child = new LongColumnVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          ((LongColumnVector)lcv.child).vector[i] = 
((List<Long>)valueList).get(i);
+        }
+        break;
+      case DOUBLE:
+        lcv.child = new DoubleColumnVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          ((DoubleColumnVector)lcv.child).vector[i] = 
((List<Double>)valueList).get(i);
+        }
+        break;
+      case BINARY:
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        lcv.child = new BytesColumnVector(total);
+        lcv.child.init();
+        for (int i = 0; i < valueList.size(); i++) {
+          ((BytesColumnVector)lcv.child).setVal(i, 
((List<byte[]>)valueList).get(i));
+        }
+        break;
+      case FLOAT:
+        lcv.child = new DoubleColumnVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          ((DoubleColumnVector)lcv.child).vector[i] = 
((List<Float>)valueList).get(i);
+        }
+        break;
+      case DECIMAL:
+        int precision = 
type.asPrimitiveType().getDecimalMetadata().getPrecision();
+        int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
+        lcv.child = new DecimalColumnVector(total, precision, scale);
+        for (int i = 0; i < valueList.size(); i++) {
+          
((DecimalColumnVector)lcv.child).vector[i].set(((List<byte[]>)valueList).get(i),
 scale);
+        }
+        break;
+      case INTERVAL_DAY_TIME:
+      case TIMESTAMP:
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  /**
+   * Finish the result ListColumnVector with all collected information.
+   */
+  private void 
convertValueListToListColumnVector(PrimitiveObjectInspector.PrimitiveCategory 
category,
+      ListColumnVector lcv, List valueList, int elementNum) {
+    // Fill the child of ListColumnVector with valueList
+    fillColumnVector(category, lcv, valueList, elementNum);
+    setIsRepeating(lcv);
+  }
+
+  private void setIsRepeating(ListColumnVector lcv) {
+    ColumnVector child0 = getChildData(lcv, 0);
+    for (int i = 1; i < lcv.offsets.length; i++) {
+      ColumnVector currentChild = getChildData(lcv, i);
+      if (!compareColumnVector(child0, currentChild)) {
+        lcv.isRepeating = false;
+        return;
+      }
+    }
+    lcv.isRepeating = true;
+  }
+
+  /**
+   * Get the child ColumnVector of ListColumnVector
+   */
+  private ColumnVector getChildData(ListColumnVector lcv, int index) {
+    if (lcv.offsets[index] > Integer.MAX_VALUE || lcv.lengths[index] > 
Integer.MAX_VALUE) {
+      throw new RuntimeException("The element number in list is out of 
scope.");
+    }
+    if (lcv.isNull[index]) {
+      return null;
+    }
+    int start = (int)lcv.offsets[index];
+    int length = (int)lcv.lengths[index];
+    ColumnVector child = lcv.child;
+    ColumnVector resultCV = null;
+    if (child instanceof LongColumnVector) {
+      resultCV = new LongColumnVector(length);
+      try {
+        System.arraycopy(((LongColumnVector) lcv.child).vector, start,
+            ((LongColumnVector) resultCV).vector, 0, length);
+      } catch (Exception e) {
+        throw new RuntimeException("colinmjj:index:" + index + ", start:" + 
start + ",length:" + length
+            + ",vec len:" + ((LongColumnVector) lcv.child).vector.length + ", 
offset len:" + lcv.offsets.length
+            + ", len len:" + lcv.lengths.length, e);
+      }
+    }
+    if (child instanceof DoubleColumnVector) {
+      resultCV = new DoubleColumnVector(length);
+      System.arraycopy(((DoubleColumnVector) lcv.child).vector, start,
+          ((DoubleColumnVector) resultCV).vector, 0, length);
+    }
+    if (child instanceof BytesColumnVector) {
+      resultCV = new BytesColumnVector(length);
+      System.arraycopy(((BytesColumnVector) lcv.child).vector, start,
+          ((BytesColumnVector) resultCV).vector, 0, length);
+    }
+    if (child instanceof DecimalColumnVector) {
+      resultCV = new DecimalColumnVector(length,
+          ((DecimalColumnVector) child).precision, ((DecimalColumnVector) 
child).scale);
+      System.arraycopy(((DecimalColumnVector) lcv.child).vector, start,
+          ((DecimalColumnVector) resultCV).vector, 0, length);
+    }
+    return resultCV;
+  }
+
+  private boolean compareColumnVector(ColumnVector cv1, ColumnVector cv2) {
+    if (cv1 == null && cv2 == null) {
+      return true;
+    } else {
+      if (cv1 != null && cv2 != null) {
+        if (cv1 instanceof LongColumnVector && cv2 instanceof 
LongColumnVector) {
+          return compareLongColumnVector((LongColumnVector) cv1, 
(LongColumnVector) cv2);
+        }
+        if (cv1 instanceof DoubleColumnVector && cv2 instanceof 
DoubleColumnVector) {
+          return compareDoubleColumnVector((DoubleColumnVector) cv1, 
(DoubleColumnVector) cv2);
+        }
+        if (cv1 instanceof BytesColumnVector && cv2 instanceof 
BytesColumnVector) {
+          return compareBytesColumnVector((BytesColumnVector) cv1, 
(BytesColumnVector) cv2);
+        }
+        if (cv1 instanceof DecimalColumnVector && cv2 instanceof 
DecimalColumnVector) {
+          return compareDecimalColumnVector((DecimalColumnVector) cv1, 
(DecimalColumnVector) cv2);
+        }
+        throw new RuntimeException("Unsupported ColumnVector comparision 
between " + cv1.getClass().getName()
+            + " and " + cv2.getClass().getName());
+      } else {
+        return false;
+      }
+    }
+  }
+
+  private boolean compareLongColumnVector(LongColumnVector cv1, 
LongColumnVector cv2) {
+    int length1 = cv1.vector.length;
+    int length2 = cv2.vector.length;
+    if (length1 == length2) {
+      for (int i = 0; i < length1; i++) {
+        if (cv1.vector[i] != cv2.vector[i]) {
+          return false;
+        }
+      }
+    } else {
+      return false;
+    }
+    return true;
+  }
+
+  private boolean compareDoubleColumnVector(DoubleColumnVector cv1, 
DoubleColumnVector cv2) {
+    int length1 = cv1.vector.length;
+    int length2 = cv2.vector.length;
+    if (length1 == length2) {
+      for (int i = 0; i < length1; i++) {
+        if (cv1.vector[i] != cv2.vector[i]) {
+          return false;
+        }
+      }
+    } else {
+      return false;
+    }
+    return true;
+  }
+
+  private boolean compareDecimalColumnVector(DecimalColumnVector cv1, 
DecimalColumnVector cv2) {
+    int length1 = cv1.vector.length;
+    int length2 = cv2.vector.length;
+    if (length1 == length2 && cv1.scale == cv2.scale && cv1.precision == 
cv2.precision) {
+      for (int i = 0; i < length1; i++) {
+        if (cv1.vector[i] != null && cv2.vector[i] == null
+            || cv1.vector[i] == null && cv2.vector[i] != null
+            || cv1.vector[i] != null && cv2.vector[i] != null && 
!cv1.vector[i].equals(cv2.vector[i])) {
+          return false;
+        }
+      }
+    } else {
+      return false;
+    }
+    return true;
+  }
+
+  private boolean compareBytesColumnVector(BytesColumnVector cv1, 
BytesColumnVector cv2) {
+    int length1 = cv1.vector.length;
+    int length2 = cv2.vector.length;
+    if (length1 == length2) {
+      for (int i = 0; i < length1; i++) {
+        int innerLen1 = cv1.vector[i].length;
+        int innerLen2 = cv2.vector[i].length;
+        if (innerLen1 == innerLen2) {
+          for (int j = 0; j < innerLen1; j++) {
+            if (cv1.vector[i][j] != cv2.vector[i][j]) {
+              return false;
+            }
+          }
+        } else {
+          return false;
+        }
+      }
+    } else {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 1d9dba7..941bd7d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
@@ -493,10 +495,34 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
       }
       return new VectorizedStructColumnReader(fieldReaders);
     case LIST:
+      checkListColumnSupport(((ListTypeInfo) 
typeInfo).getListElementTypeInfo());
+      if (columnDescriptors == null || columnDescriptors.isEmpty()) {
+        throw new RuntimeException(
+            "Failed to find related Parquet column descriptor with type " + 
type);
+      }
+      return new VectorizedListColumnReader(descriptors.get(0),
+          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, 
type);
     case MAP:
     case UNION:
     default:
       throw new RuntimeException("Unsupported category " + 
typeInfo.getCategory().name());
     }
   }
+
+  /**
+   * Check if the element type in list is supported by vectorization read.
+   * Supported type: INT, BYTE, SHORT, DATE, INTERVAL_YEAR_MONTH, LONG, 
BOOLEAN, DOUBLE, BINARY, STRING, CHAR, VARCHAR,
+   *                 FLOAT, DECIMAL
+   */
+  private void checkListColumnSupport(TypeInfo elementType) {
+    if (elementType instanceof PrimitiveTypeInfo) {
+      switch (((PrimitiveTypeInfo)elementType).getPrimitiveCategory()) {
+        case INTERVAL_DAY_TIME:
+        case TIMESTAMP:
+          throw new RuntimeException("Unsupported primitive type used in 
list:: " + elementType);
+      }
+    } else {
+      throw new RuntimeException("Unsupported type used in list:" + 
elementType);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
index e9543c6..5e577d2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -23,113 +23,29 @@ import 
org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.sql.Timestamp;
 
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
 /**
  * It's column level Parquet reader which is used to read a batch of records 
for a column,
  * part of the code is referred from Apache Spark and Apache Parquet.
  */
-public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader 
{
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class);
-
-  private boolean skipTimestampConversion = false;
-
-  /**
-   * Total number of values read.
-   */
-  private long valuesRead;
-
-  /**
-   * value that indicates the end of the current page. That is,
-   * if valuesRead == endOfPageValueCount, we are at the end of the page.
-   */
-  private long endOfPageValueCount;
-
-  /**
-   * The dictionary, if this column has dictionary encoding.
-   */
-  private final Dictionary dictionary;
-
-  /**
-   * If true, the current page is dictionary encoded.
-   */
-  private boolean isCurrentPageDictionaryEncoded;
-
-  /**
-   * Maximum definition level for this column.
-   */
-  private final int maxDefLevel;
-
-  private int definitionLevel;
-  private int repetitionLevel;
-
-  /**
-   * Repetition/Definition/Value readers.
-   */
-  private IntIterator repetitionLevelColumn;
-  private IntIterator definitionLevelColumn;
-  private ValuesReader dataColumn;
-
-  /**
-   * Total values in the current page.
-   */
-  private int pageValueCount;
-
-  private final PageReader pageReader;
-  private final ColumnDescriptor descriptor;
-  private final Type type;
+public class VectorizedPrimitiveColumnReader extends 
BaseVectorizedColumnReader {
 
   public VectorizedPrimitiveColumnReader(
     ColumnDescriptor descriptor,
     PageReader pageReader,
     boolean skipTimestampConversion,
     Type type) throws IOException {
-    this.descriptor = descriptor;
-    this.type = type;
-    this.pageReader = pageReader;
-    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-    this.skipTimestampConversion = skipTimestampConversion;
-
-    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-    if (dictionaryPage != null) {
-      try {
-        this.dictionary = 
dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
-        this.isCurrentPageDictionaryEncoded = true;
-      } catch (IOException e) {
-        throw new IOException("could not decode the dictionary for " + 
descriptor, e);
-      }
-    } else {
-      this.dictionary = null;
-      this.isCurrentPageDictionaryEncoded = false;
-    }
+    super(descriptor, pageReader, skipTimestampConversion, type);
   }
 
+  @Override
   public void readBatch(
     int total,
     ColumnVector column,
@@ -164,6 +80,7 @@ public class VectorizedPrimitiveColumnReader implements 
VectorizedColumnReader {
     TypeInfo columnType,
     int rowId) throws IOException {
     PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+
     switch (primitiveColumnType.getPrimitiveCategory()) {
     case INT:
     case BYTE:
@@ -444,143 +361,4 @@ public class VectorizedPrimitiveColumnReader implements 
VectorizedColumnReader {
       throw new UnsupportedOperationException("Unsupported type: " + 
descriptor.getType());
     }
   }
-
-  private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.nextInt();
-    definitionLevel = definitionLevelColumn.nextInt();
-    valuesRead++;
-  }
-
-  private void readPage() throws IOException {
-    DataPage page = pageReader.readPage();
-    // TODO: Why is this a visitor?
-    page.accept(new DataPage.Visitor<Void>() {
-      @Override
-      public Void visit(DataPageV1 dataPageV1) {
-        readPageV1(dataPageV1);
-        return null;
-      }
-
-      @Override
-      public Void visit(DataPageV2 dataPageV2) {
-        readPageV2(dataPageV2);
-        return null;
-      }
-    });
-  }
-
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, 
int valueCount) throws IOException {
-    this.pageValueCount = valueCount;
-    this.endOfPageValueCount = valuesRead + pageValueCount;
-    if (dataEncoding.usesDictionary()) {
-      this.dataColumn = null;
-      if (dictionary == null) {
-        throw new IOException(
-          "could not read page in col " + descriptor +
-            " as the dictionary was missing for encoding " + dataEncoding);
-      }
-      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, 
VALUES, dictionary);
-      this.isCurrentPageDictionaryEncoded = true;
-    } else {
-      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
-      this.isCurrentPageDictionaryEncoded = false;
-    }
-
-    try {
-      dataColumn.initFromPage(pageValueCount, bytes, offset);
-    } catch (IOException e) {
-      throw new IOException("could not read page in col " + descriptor, e);
-    }
-  }
-
-  private void readPageV1(DataPageV1 page) {
-    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, 
REPETITION_LEVEL);
-    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, 
DEFINITION_LEVEL);
-    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-    try {
-      byte[] bytes = page.getBytes().toByteArray();
-      LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + 
" records");
-      LOG.debug("reading repetition levels at 0");
-      rlReader.initFromPage(pageValueCount, bytes, 0);
-      int next = rlReader.getNextOffset();
-      LOG.debug("reading definition levels at " + next);
-      dlReader.initFromPage(pageValueCount, bytes, next);
-      next = dlReader.getNextOffset();
-      LOG.debug("reading data at " + next);
-      initDataReader(page.getValueEncoding(), bytes, next, 
page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in 
col " + descriptor, e);
-    }
-  }
-
-  private void readPageV2(DataPageV2 page) {
-    this.pageValueCount = page.getValueCount();
-    this.repetitionLevelColumn = 
newRLEIterator(descriptor.getMaxRepetitionLevel(),
-      page.getRepetitionLevels());
-    this.definitionLevelColumn = 
newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
-    try {
-      LOG.debug("page data size " + page.getData().size() + " bytes and " + 
pageValueCount + " records");
-      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, 
page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in 
col " + descriptor, e);
-    }
-  }
-
-  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-    try {
-      if (maxLevel == 0) {
-        return new NullIntIterator();
-      }
-      return new RLEIntIterator(
-        new RunLengthBitPackingHybridDecoder(
-          BytesUtils.getWidthFromMaxInt(maxLevel),
-          new ByteArrayInputStream(bytes.toByteArray())));
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read levels in page for 
col " + descriptor, e);
-    }
-  }
-
-  /**
-   * Utility classes to abstract over different way to read ints with 
different encodings.
-   * TODO: remove this layer of abstraction?
-   */
-  abstract static class IntIterator {
-    abstract int nextInt();
-  }
-
-  protected static final class ValuesReaderIntIterator extends IntIterator {
-    ValuesReader delegate;
-
-    public ValuesReaderIntIterator(ValuesReader delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      return delegate.readInteger();
-    }
-  }
-
-  protected static final class RLEIntIterator extends IntIterator {
-    RunLengthBitPackingHybridDecoder delegate;
-
-    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      try {
-        return delegate.readInt();
-      } catch (IOException e) {
-        throw new ParquetDecodingException(e);
-      }
-    }
-  }
-
-  protected static final class NullIntIterator extends IntIterator {
-    @Override
-    int nextInt() { return 0; }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java
new file mode 100644
index 0000000..de19615
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import 
org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestVectorizedListColumnReader extends 
VectorizedColumnReaderTestBase {
+
+  protected static void writeListData(ParquetWriter<Group> writer, boolean 
isDictionaryEncoding,
+    int elementNum) throws IOException {
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    int listMaxSize = 4;
+    int listElementIndex = 0;
+    for (int i = 0; i < elementNum; i++) {
+      boolean isNull = isNull(i);
+      Group group = f.newGroup();
+
+      int listSize = i % listMaxSize + 1;
+      if (!isNull) {
+        for (int j = 0; j < listSize; j++) {
+          group.append("list_int32_field", getIntValue(isDictionaryEncoding, 
listElementIndex));
+          group.append("list_int64_field", getLongValue(isDictionaryEncoding, 
listElementIndex));
+          group.append("list_double_field", 
getDoubleValue(isDictionaryEncoding, listElementIndex));
+          group.append("list_float_field", getFloatValue(isDictionaryEncoding, 
listElementIndex));
+          group.append("list_boolean_field", 
getBooleanValue(listElementIndex));
+          group.append("list_binary_field", 
getBinaryValue(isDictionaryEncoding, listElementIndex));
+
+          HiveDecimal hd = getDecimal(isDictionaryEncoding, 
listElementIndex).setScale(2);
+          HiveDecimalWritable hdw = new HiveDecimalWritable(hd);
+          group.append("list_decimal_field", 
Binary.fromConstantByteArray(hdw.getInternalStorage()));
+          listElementIndex++;
+        }
+      }
+      for (int j = 0; j < listMaxSize; j++) {
+        group.append("list_int32_field_for_repeat_test", 
getIntValue(isDictionaryEncoding, j));
+      }
+      writer.write(group);
+    }
+    writer.close();
+  }
+
+  protected static void writeRepeateListData(ParquetWriter<Group> writer,
+    int elementNum, boolean isNull) throws IOException {
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    int listMaxSize = 4;
+    for (int i = 0; i < elementNum; i++) {
+      Group group = f.newGroup();
+      if (!isNull) {
+        for (int j = 0; j < listMaxSize; j++) {
+          group.append("list_int32_field_for_repeat_test", j);
+        }
+      }
+      writer.write(group);
+    }
+    writer.close();
+  }
+
+  @Test
+  public void testListReadLessOneBatch() throws Exception {
+    boolean isDictionaryEncoding = false;
+    removeFile();
+    writeListData(initWriterFromFile(), isDictionaryEncoding, 1023);
+    testListReadAllType(isDictionaryEncoding, 1023);
+    removeFile();
+    isDictionaryEncoding = true;
+    writeListData(initWriterFromFile(), isDictionaryEncoding, 1023);
+    testListReadAllType(isDictionaryEncoding, 1023);
+    removeFile();
+  }
+
+  @Test
+  public void testListReadEqualOneBatch() throws Exception {
+    boolean isDictionaryEncoding = false;
+    removeFile();
+    writeListData(initWriterFromFile(), isDictionaryEncoding, 1024);
+    testListReadAllType(isDictionaryEncoding, 1024);
+    removeFile();
+    isDictionaryEncoding = true;
+    writeListData(initWriterFromFile(), isDictionaryEncoding, 1024);
+    testListReadAllType(isDictionaryEncoding, 1024);
+    removeFile();
+  }
+
+  @Test
+  public void testListReadMoreOneBatch() throws Exception {
+    boolean isDictionaryEncoding = false;
+    removeFile();
+    writeListData(initWriterFromFile(), isDictionaryEncoding, 1025);
+    testListReadAllType(isDictionaryEncoding, 1025);
+    removeFile();
+    isDictionaryEncoding = true;
+    writeListData(initWriterFromFile(), isDictionaryEncoding, 1025);
+    testListReadAllType(isDictionaryEncoding, 1025);
+    removeFile();
+  }
+
+  @Test
+  public void testRepeateListRead() throws Exception {
+    removeFile();
+    writeRepeateListData(initWriterFromFile(), 1023, false);
+    testRepeateListRead(1023, false);
+    removeFile();
+    writeRepeateListData(initWriterFromFile(), 1023, true);
+    testRepeateListRead(1023, true);
+    removeFile();
+    writeRepeateListData(initWriterFromFile(), 1024, false);
+    testRepeateListRead(1024, false);
+    removeFile();
+    writeRepeateListData(initWriterFromFile(), 1024, true);
+    testRepeateListRead(1024, true);
+    removeFile();
+    writeRepeateListData(initWriterFromFile(), 1025, false);
+    testRepeateListRead(1025, false);
+    removeFile();
+    writeRepeateListData(initWriterFromFile(), 1025, true);
+    testRepeateListRead(1025, true);
+    removeFile();
+  }
+
+  private void testListReadAllType(boolean isDictionaryEncoding, int 
elementNum) throws Exception {
+    testListRead(isDictionaryEncoding, "int", elementNum);
+    testListRead(isDictionaryEncoding, "long", elementNum);
+    testListRead(isDictionaryEncoding, "double", elementNum);
+    testListRead(isDictionaryEncoding, "float", elementNum);
+    testListRead(isDictionaryEncoding, "boolean", elementNum);
+    testListRead(isDictionaryEncoding, "binary", elementNum);
+    testListRead(isDictionaryEncoding, "decimal", elementNum);
+  }
+
+  private void setTypeConfiguration(String type, Configuration conf) {
+    if ("int".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_int32_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<int>");
+    } else if ("long".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_int64_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<bigint>");
+    } else if ("double".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_double_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<double>");
+    } else if ("float".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_float_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<float>");
+    } else if ("boolean".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_boolean_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<boolean>");
+    } else if ("binary".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_binary_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<string>");
+    } else if ("decimal".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "list_decimal_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "array<decimal(5,2)>");
+    }
+  }
+
+  private String getSchema(String type) {
+    if ("int".equals(type)) {
+      return "message hive_schema {repeated int32 list_int32_field;}";
+    } else if ("long".equals(type)) {
+      return "message hive_schema {repeated int64 list_int64_field;}";
+    } else if ("double".equals(type)) {
+      return "message hive_schema {repeated double list_double_field;}";
+    } else if ("float".equals(type)) {
+      return "message hive_schema {repeated float list_float_field;}";
+    } else if ("boolean".equals(type)) {
+      return "message hive_schema {repeated boolean list_boolean_field;}";
+    } else if ("binary".equals(type)) {
+      return "message hive_schema {repeated binary list_binary_field;}";
+    } else if ("decimal".equals(type)) {
+      return "message hive_schema {repeated binary list_decimal_field 
(DECIMAL(5,2));}";
+    } else {
+      throw new RuntimeException("Unsupported type for 
TestVectorizedListColumnReader!");
+    }
+  }
+
+  private void assertValue(String type, ColumnVector childVector,
+    boolean isDictionaryEncoding, int valueIndex, int position) {
+    if ("int".equals(type)) {
+      assertEquals(getIntValue(isDictionaryEncoding, valueIndex), 
((LongColumnVector)childVector).vector[position]);
+    } else if ("long".equals(type)) {
+      assertEquals(getLongValue(isDictionaryEncoding, valueIndex), 
((LongColumnVector)childVector).vector[position]);
+    } else if ("double".equals(type)) {
+      assertEquals(getDoubleValue(isDictionaryEncoding, valueIndex), 
((DoubleColumnVector)childVector).vector[position], 0);
+    } else if ("float".equals(type)) {
+      assertEquals(getFloatValue(isDictionaryEncoding, valueIndex), 
((DoubleColumnVector)childVector).vector[position], 0);
+    } else if ("boolean".equals(type)) {
+      assertEquals((getBooleanValue(valueIndex) ? 1 : 0), 
((LongColumnVector)childVector).vector[position]);
+    } else if ("binary".equals(type)) {
+      String actual = new String(ArrayUtils
+          .subarray(((BytesColumnVector)childVector).vector[position], 
((BytesColumnVector)childVector).start[position],
+              ((BytesColumnVector)childVector).start[position] + 
((BytesColumnVector)childVector).length[position]));
+      assertEquals(getStr(isDictionaryEncoding, valueIndex), actual);
+    } else if ("decimal".equals(type)) {
+      assertEquals(getDecimal(isDictionaryEncoding, valueIndex),
+          
((DecimalColumnVector)childVector).vector[position].getHiveDecimal());
+    } else {
+      throw new RuntimeException("Unsupported type for 
TestVectorizedListColumnReader!");
+    }
+
+  }
+
+  private void testListRead(boolean isDictionaryEncoding, String type, int 
elementNum) throws Exception {
+    Configuration conf = new Configuration();
+    setTypeConfiguration(type, conf);
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader = 
createTestParquetReader(getSchema(type), conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int row = 0;
+    int index = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        ListColumnVector vector = (ListColumnVector) previous.cols[0];
+        for (int i = 0; i < vector.offsets.length; i++) {
+          if (row == elementNum) {
+            assertEquals(i, vector.offsets.length - 1);
+            break;
+          }
+          long start = vector.offsets[i];
+          long length = vector.lengths[i];
+          boolean isNull = isNull(row);
+          if (isNull) {
+            assertEquals(vector.isNull[i], true);
+          } else {
+            for (long j = 0; j < length; j++) {
+              assertValue(type, vector.child, isDictionaryEncoding, index, 
(int) (start + j));
+              index++;
+            }
+          }
+          row++;
+        }
+      }
+      assertEquals("It doesn't exit at expected position", elementNum, row);
+    } finally {
+      reader.close();
+    }
+  }
+
+  private void testRepeateListRead(int elementNum, boolean isNull) throws 
Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS, "list_int32_field_for_repeat_test");
+    conf.set(IOConstants.COLUMNS_TYPES, "array<int>");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader = createTestParquetReader(
+        "message hive_schema {repeated int32 
list_int32_field_for_repeat_test;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int row = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        ListColumnVector vector = (ListColumnVector) previous.cols[0];
+
+        assertTrue(vector.isRepeating);
+        assertEquals(isNull, vector.isNull[0]);
+
+        for (int i = 0; i < vector.offsets.length; i++) {
+          if (row == elementNum) {
+            assertEquals(i, vector.offsets.length - 1);
+            break;
+          }
+          row++;
+        }
+      }
+      assertEquals("It doesn't exit at expected position", elementNum, row);
+    } finally {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
index 1a5d095..929e991 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -115,6 +116,15 @@ public class VectorizedColumnReaderTestBase {
       + "    optional int32 array_element;\n"
       + "  }\n"
       + "}\n"
+      + "repeated int32 list_int32_field;"
+      + "repeated int64 list_int64_field;"
+      + "repeated double list_double_field;"
+      + "repeated float list_float_field;"
+      + "repeated boolean list_boolean_field;"
+      + "repeated fixed_len_byte_array(3) list_byte_array_field;"
+      + "repeated binary list_binary_field;"
+      + "repeated binary list_decimal_field (DECIMAL(5,2));"
+      + "repeated int32 list_int32_field_for_repeat_test;"
       + "} ");
 
   protected static void removeFile() throws IOException {

Reply via email to