Repository: hive Updated Branches: refs/heads/master 966d2b303 -> 39d46e8af
HIVE-17931: Implement Parquet vectorization reader for Array type (Colin Ma, reviewed by Vihang and Ferdinand) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39d46e8a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39d46e8a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39d46e8a Branch: refs/heads/master Commit: 39d46e8af5a3794f7395060b890f94ddc84516e7 Parents: 966d2b3 Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Mon Nov 20 09:39:38 2017 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Mon Nov 20 10:23:18 2017 +0800 ---------------------------------------------------------------------- .../vector/BaseVectorizedColumnReader.java | 263 +++++++++++ .../vector/VectorizedListColumnReader.java | 445 +++++++++++++++++++ .../vector/VectorizedParquetRecordReader.java | 26 ++ .../vector/VectorizedPrimitiveColumnReader.java | 230 +--------- .../parquet/TestVectorizedListColumnReader.java | 308 +++++++++++++ .../parquet/VectorizedColumnReaderTestBase.java | 10 + 6 files changed, 1056 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java new file mode 100644 index 0000000..d132e07 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java @@ -0,0 +1,263 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * It's column level Parquet reader which is used to read a batch of records for a column, + * part of the code is referred from Apache Spark and Apache Parquet. + */ +public abstract class BaseVectorizedColumnReader implements VectorizedColumnReader { + + private static final Logger LOG = LoggerFactory.getLogger(BaseVectorizedColumnReader.class); + + protected boolean skipTimestampConversion = false; + + /** + * Total number of values read. + */ + protected long valuesRead; + + /** + * value that indicates the end of the current page. That is, + * if valuesRead == endOfPageValueCount, we are at the end of the page. + */ + protected long endOfPageValueCount; + + /** + * The dictionary, if this column has dictionary encoding. + */ + protected final Dictionary dictionary; + + /** + * If true, the current page is dictionary encoded. + */ + protected boolean isCurrentPageDictionaryEncoded; + + /** + * Maximum definition level for this column. + */ + protected final int maxDefLevel; + + protected int definitionLevel; + protected int repetitionLevel; + + /** + * Repetition/Definition/Value readers. + */ + protected IntIterator repetitionLevelColumn; + protected IntIterator definitionLevelColumn; + protected ValuesReader dataColumn; + + /** + * Total values in the current page. + */ + protected int pageValueCount; + + protected final PageReader pageReader; + protected final ColumnDescriptor descriptor; + protected final Type type; + + public BaseVectorizedColumnReader( + ColumnDescriptor descriptor, + PageReader pageReader, + boolean skipTimestampConversion, + Type type) throws IOException { + this.descriptor = descriptor; + this.type = type; + this.pageReader = pageReader; + this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.skipTimestampConversion = skipTimestampConversion; + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + try { + this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.isCurrentPageDictionaryEncoded = true; + } catch (IOException e) { + throw new IOException("could not decode the dictionary for " + descriptor, e); + } + } else { + this.dictionary = null; + this.isCurrentPageDictionaryEncoded = false; + } + } + + protected void readRepetitionAndDefinitionLevels() { + repetitionLevel = repetitionLevelColumn.nextInt(); + definitionLevel = definitionLevelColumn.nextInt(); + valuesRead++; + } + + protected void readPage() throws IOException { + DataPage page = pageReader.readPage(); + + if (page == null) { + return; + } + // TODO: Why is this a visitor? + page.accept(new DataPage.Visitor<Void>() { + @Override + public Void visit(DataPageV1 dataPageV1) { + readPageV1(dataPageV1); + return null; + } + + @Override + public Void visit(DataPageV2 dataPageV2) { + readPageV2(dataPageV2); + return null; + } + }); + } + + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { + this.pageValueCount = valueCount; + this.endOfPageValueCount = valuesRead + pageValueCount; + if (dataEncoding.usesDictionary()) { + this.dataColumn = null; + if (dictionary == null) { + throw new IOException( + "could not read page in col " + descriptor + + " as the dictionary was missing for encoding " + dataEncoding); + } + dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); + this.isCurrentPageDictionaryEncoded = true; + } else { + dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); + this.isCurrentPageDictionaryEncoded = false; + } + + try { + dataColumn.initFromPage(pageValueCount, bytes, offset); + } catch (IOException e) { + throw new IOException("could not read page in col " + descriptor, e); + } + } + + private void readPageV1(DataPageV1 page) { + ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); + ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); + this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); + this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); + try { + byte[] bytes = page.getBytes().toByteArray(); + LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); + LOG.debug("reading repetition levels at 0"); + rlReader.initFromPage(pageValueCount, bytes, 0); + int next = rlReader.getNextOffset(); + LOG.debug("reading definition levels at " + next); + dlReader.initFromPage(pageValueCount, bytes, next); + next = dlReader.getNextOffset(); + LOG.debug("reading data at " + next); + initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readPageV2(DataPageV2 page) { + this.pageValueCount = page.getValueCount(); + this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), + page.getRepetitionLevels()); + this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); + try { + LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); + initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); + } catch (IOException e) { + throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); + } + } + + private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { + try { + if (maxLevel == 0) { + return new NullIntIterator(); + } + return new RLEIntIterator( + new RunLengthBitPackingHybridDecoder( + BytesUtils.getWidthFromMaxInt(maxLevel), + new ByteArrayInputStream(bytes.toByteArray()))); + } catch (IOException e) { + throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); + } + } + + /** + * Utility classes to abstract over different way to read ints with different encodings. + * TODO: remove this layer of abstraction? + */ + abstract static class IntIterator { + abstract int nextInt(); + } + + protected static final class ValuesReaderIntIterator extends IntIterator { + ValuesReader delegate; + + public ValuesReaderIntIterator(ValuesReader delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + return delegate.readInteger(); + } + } + + protected static final class RLEIntIterator extends IntIterator { + RunLengthBitPackingHybridDecoder delegate; + + public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { + this.delegate = delegate; + } + + @Override + int nextInt() { + try { + return delegate.readInt(); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + } + + protected static final class NullIntIterator extends IntIterator { + @Override + int nextInt() { return 0; } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java new file mode 100644 index 0000000..ea4f2f2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java @@ -0,0 +1,445 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.schema.Type; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * It's column level Parquet reader which is used to read a batch of records for a list column. + */ +public class VectorizedListColumnReader extends BaseVectorizedColumnReader { + + // The value read in last time + private Object lastValue; + + // flag to indicate if there is no data in parquet data page + private boolean eof = false; + + // flag to indicate if it's the first time to read parquet data page with this instance + boolean isFirstRow = true; + + public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader, + boolean skipTimestampConversion, Type type) throws IOException { + super(descriptor, pageReader, skipTimestampConversion, type); + } + + @Override + public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException { + ListColumnVector lcv = (ListColumnVector) column; + // Because the length of ListColumnVector.child can't be known now, + // the valueList will save all data for ListColumnVector temporary. + List<Object> valueList = new ArrayList<>(); + + PrimitiveObjectInspector.PrimitiveCategory category = + ((PrimitiveTypeInfo) ((ListTypeInfo) columnType).getListElementTypeInfo()).getPrimitiveCategory(); + + // read the first row in parquet data page, this will be only happened once for this instance + if(isFirstRow){ + if (!fetchNextValue(category)) { + return; + } + isFirstRow = false; + } + + int index = 0; + while (!eof && index < total) { + // add element to ListColumnVector one by one + addElement(lcv, valueList, category, index); + index++; + } + + // Decode the value if necessary + if (isCurrentPageDictionaryEncoded) { + valueList = decodeDictionaryIds(valueList); + } + // Convert valueList to array for the ListColumnVector.child + convertValueListToListColumnVector(category, lcv, valueList, index); + } + + private int readPageIfNeed() throws IOException { + // Compute the number of values we want to read in this page. + int leftInPage = (int) (endOfPageValueCount - valuesRead); + if (leftInPage == 0) { + // no data left in current page, load data from new page + readPage(); + leftInPage = (int) (endOfPageValueCount - valuesRead); + } + return leftInPage; + } + + private boolean fetchNextValue(PrimitiveObjectInspector.PrimitiveCategory category) throws IOException { + int left = readPageIfNeed(); + if (left > 0) { + // get the values of repetition and definitionLevel + readRepetitionAndDefinitionLevels(); + // read the data if it isn't null + if (definitionLevel == maxDefLevel) { + if (isCurrentPageDictionaryEncoded) { + lastValue = dataColumn.readValueDictionaryId(); + } else { + lastValue = readPrimitiveTypedRow(category); + } + } + return true; + } else { + eof = true; + return false; + } + } + + /** + * The function will set all data from parquet data page for an element in ListColumnVector + */ + private void addElement(ListColumnVector lcv, List<Object> elements, PrimitiveObjectInspector.PrimitiveCategory category, int index) throws IOException { + lcv.offsets[index] = elements.size(); + + // Return directly if last value is null + if (definitionLevel < maxDefLevel) { + lcv.isNull[index] = true; + lcv.lengths[index] = 0; + // fetch the data from parquet data page for next call + fetchNextValue(category); + return; + } + + do { + // add all data for an element in ListColumnVector, get out the loop if there is no data or the data is for new element + elements.add(lastValue); + } while (fetchNextValue(category) && (repetitionLevel != 0)); + + lcv.isNull[index] = false; + lcv.lengths[index] = elements.size() - lcv.offsets[index]; + } + + private Object readPrimitiveTypedRow(PrimitiveObjectInspector.PrimitiveCategory category) { + switch (category) { + case INT: + case BYTE: + case SHORT: + return dataColumn.readInteger(); + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + return dataColumn.readLong(); + case BOOLEAN: + return dataColumn.readBoolean() ? 1 : 0; + case DOUBLE: + return dataColumn.readDouble(); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + return dataColumn.readBytes().getBytesUnsafe(); + case FLOAT: + return dataColumn.readFloat(); + case DECIMAL: + return dataColumn.readBytes().getBytesUnsafe(); + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + + private List decodeDictionaryIds(List valueList) { + int total = valueList.size(); + List resultList; + List<Integer> intList = (List<Integer>) valueList; + switch (descriptor.getType()) { + case INT32: + resultList = new ArrayList<Integer>(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToInt(intList.get(i))); + } + break; + case INT64: + resultList = new ArrayList<Long>(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToLong(intList.get(i))); + } + break; + case FLOAT: + resultList = new ArrayList<Float>(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToFloat(intList.get(i))); + } + break; + case DOUBLE: + resultList = new ArrayList<Double>(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToDouble(intList.get(i))); + } + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + resultList = new ArrayList<byte[]>(total); + for (int i = 0; i < total; ++i) { + resultList.add(dictionary.decodeToBinary(intList.get(i)).getBytesUnsafe()); + } + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); + } + return resultList; + } + + /** + * The lengths & offsets will be initialized as default size (1024), + * it should be set to the actual size according to the element number. + */ + private void setChildrenInfo(ListColumnVector lcv, int itemNum, int elementNum) { + lcv.childCount = itemNum; + long[] lcvLength = new long[elementNum]; + long[] lcvOffset = new long[elementNum]; + System.arraycopy(lcv.lengths, 0, lcvLength, 0, elementNum); + System.arraycopy(lcv.offsets, 0, lcvOffset, 0, elementNum); + lcv.lengths = lcvLength; + lcv.offsets = lcvOffset; + } + + private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory category, ListColumnVector lcv, + List valueList, int elementNum) { + int total = valueList.size(); + setChildrenInfo(lcv, total, elementNum); + switch (category) { + case INT: + case BYTE: + case SHORT: + case BOOLEAN: + lcv.child = new LongColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((LongColumnVector)lcv.child).vector[i] = ((List<Integer>)valueList).get(i); + } + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + lcv.child = new LongColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((LongColumnVector)lcv.child).vector[i] = ((List<Long>)valueList).get(i); + } + break; + case DOUBLE: + lcv.child = new DoubleColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((DoubleColumnVector)lcv.child).vector[i] = ((List<Double>)valueList).get(i); + } + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + lcv.child = new BytesColumnVector(total); + lcv.child.init(); + for (int i = 0; i < valueList.size(); i++) { + ((BytesColumnVector)lcv.child).setVal(i, ((List<byte[]>)valueList).get(i)); + } + break; + case FLOAT: + lcv.child = new DoubleColumnVector(total); + for (int i = 0; i < valueList.size(); i++) { + ((DoubleColumnVector)lcv.child).vector[i] = ((List<Float>)valueList).get(i); + } + break; + case DECIMAL: + int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision(); + int scale = type.asPrimitiveType().getDecimalMetadata().getScale(); + lcv.child = new DecimalColumnVector(total, precision, scale); + for (int i = 0; i < valueList.size(); i++) { + ((DecimalColumnVector)lcv.child).vector[i].set(((List<byte[]>)valueList).get(i), scale); + } + break; + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new RuntimeException("Unsupported type in the list: " + type); + } + } + + /** + * Finish the result ListColumnVector with all collected information. + */ + private void convertValueListToListColumnVector(PrimitiveObjectInspector.PrimitiveCategory category, + ListColumnVector lcv, List valueList, int elementNum) { + // Fill the child of ListColumnVector with valueList + fillColumnVector(category, lcv, valueList, elementNum); + setIsRepeating(lcv); + } + + private void setIsRepeating(ListColumnVector lcv) { + ColumnVector child0 = getChildData(lcv, 0); + for (int i = 1; i < lcv.offsets.length; i++) { + ColumnVector currentChild = getChildData(lcv, i); + if (!compareColumnVector(child0, currentChild)) { + lcv.isRepeating = false; + return; + } + } + lcv.isRepeating = true; + } + + /** + * Get the child ColumnVector of ListColumnVector + */ + private ColumnVector getChildData(ListColumnVector lcv, int index) { + if (lcv.offsets[index] > Integer.MAX_VALUE || lcv.lengths[index] > Integer.MAX_VALUE) { + throw new RuntimeException("The element number in list is out of scope."); + } + if (lcv.isNull[index]) { + return null; + } + int start = (int)lcv.offsets[index]; + int length = (int)lcv.lengths[index]; + ColumnVector child = lcv.child; + ColumnVector resultCV = null; + if (child instanceof LongColumnVector) { + resultCV = new LongColumnVector(length); + try { + System.arraycopy(((LongColumnVector) lcv.child).vector, start, + ((LongColumnVector) resultCV).vector, 0, length); + } catch (Exception e) { + throw new RuntimeException("colinmjj:index:" + index + ", start:" + start + ",length:" + length + + ",vec len:" + ((LongColumnVector) lcv.child).vector.length + ", offset len:" + lcv.offsets.length + + ", len len:" + lcv.lengths.length, e); + } + } + if (child instanceof DoubleColumnVector) { + resultCV = new DoubleColumnVector(length); + System.arraycopy(((DoubleColumnVector) lcv.child).vector, start, + ((DoubleColumnVector) resultCV).vector, 0, length); + } + if (child instanceof BytesColumnVector) { + resultCV = new BytesColumnVector(length); + System.arraycopy(((BytesColumnVector) lcv.child).vector, start, + ((BytesColumnVector) resultCV).vector, 0, length); + } + if (child instanceof DecimalColumnVector) { + resultCV = new DecimalColumnVector(length, + ((DecimalColumnVector) child).precision, ((DecimalColumnVector) child).scale); + System.arraycopy(((DecimalColumnVector) lcv.child).vector, start, + ((DecimalColumnVector) resultCV).vector, 0, length); + } + return resultCV; + } + + private boolean compareColumnVector(ColumnVector cv1, ColumnVector cv2) { + if (cv1 == null && cv2 == null) { + return true; + } else { + if (cv1 != null && cv2 != null) { + if (cv1 instanceof LongColumnVector && cv2 instanceof LongColumnVector) { + return compareLongColumnVector((LongColumnVector) cv1, (LongColumnVector) cv2); + } + if (cv1 instanceof DoubleColumnVector && cv2 instanceof DoubleColumnVector) { + return compareDoubleColumnVector((DoubleColumnVector) cv1, (DoubleColumnVector) cv2); + } + if (cv1 instanceof BytesColumnVector && cv2 instanceof BytesColumnVector) { + return compareBytesColumnVector((BytesColumnVector) cv1, (BytesColumnVector) cv2); + } + if (cv1 instanceof DecimalColumnVector && cv2 instanceof DecimalColumnVector) { + return compareDecimalColumnVector((DecimalColumnVector) cv1, (DecimalColumnVector) cv2); + } + throw new RuntimeException("Unsupported ColumnVector comparision between " + cv1.getClass().getName() + + " and " + cv2.getClass().getName()); + } else { + return false; + } + } + } + + private boolean compareLongColumnVector(LongColumnVector cv1, LongColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2) { + for (int i = 0; i < length1; i++) { + if (cv1.vector[i] != cv2.vector[i]) { + return false; + } + } + } else { + return false; + } + return true; + } + + private boolean compareDoubleColumnVector(DoubleColumnVector cv1, DoubleColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2) { + for (int i = 0; i < length1; i++) { + if (cv1.vector[i] != cv2.vector[i]) { + return false; + } + } + } else { + return false; + } + return true; + } + + private boolean compareDecimalColumnVector(DecimalColumnVector cv1, DecimalColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2 && cv1.scale == cv2.scale && cv1.precision == cv2.precision) { + for (int i = 0; i < length1; i++) { + if (cv1.vector[i] != null && cv2.vector[i] == null + || cv1.vector[i] == null && cv2.vector[i] != null + || cv1.vector[i] != null && cv2.vector[i] != null && !cv1.vector[i].equals(cv2.vector[i])) { + return false; + } + } + } else { + return false; + } + return true; + } + + private boolean compareBytesColumnVector(BytesColumnVector cv1, BytesColumnVector cv2) { + int length1 = cv1.vector.length; + int length2 = cv2.vector.length; + if (length1 == length2) { + for (int i = 0; i < length1; i++) { + int innerLen1 = cv1.vector[i].length; + int innerLen2 = cv2.vector[i].length; + if (innerLen1 == innerLen2) { + for (int j = 0; j < innerLen1; j++) { + if (cv1.vector[i][j] != cv2.vector[i][j]) { + return false; + } + } + } else { + return false; + } + } + } else { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 1d9dba7..941bd7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; @@ -493,10 +495,34 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase } return new VectorizedStructColumnReader(fieldReaders); case LIST: + checkListColumnSupport(((ListTypeInfo) typeInfo).getListElementTypeInfo()); + if (columnDescriptors == null || columnDescriptors.isEmpty()) { + throw new RuntimeException( + "Failed to find related Parquet column descriptor with type " + type); + } + return new VectorizedListColumnReader(descriptors.get(0), + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); case MAP: case UNION: default: throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name()); } } + + /** + * Check if the element type in list is supported by vectorization read. + * Supported type: INT, BYTE, SHORT, DATE, INTERVAL_YEAR_MONTH, LONG, BOOLEAN, DOUBLE, BINARY, STRING, CHAR, VARCHAR, + * FLOAT, DECIMAL + */ + private void checkListColumnSupport(TypeInfo elementType) { + if (elementType instanceof PrimitiveTypeInfo) { + switch (((PrimitiveTypeInfo)elementType).getPrimitiveCategory()) { + case INTERVAL_DAY_TIME: + case TIMESTAMP: + throw new RuntimeException("Unsupported primitive type used in list:: " + elementType); + } + } else { + throw new RuntimeException("Unsupported type used in list:" + elementType); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index e9543c6..5e577d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -23,113 +23,29 @@ import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.page.DataPage; -import org.apache.parquet.column.page.DataPageV1; -import org.apache.parquet.column.page.DataPageV2; -import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReader; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Timestamp; -import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; -import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; -import static org.apache.parquet.column.ValuesType.VALUES; - /** * It's column level Parquet reader which is used to read a batch of records for a column, * part of the code is referred from Apache Spark and Apache Parquet. */ -public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader { - - private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class); - - private boolean skipTimestampConversion = false; - - /** - * Total number of values read. - */ - private long valuesRead; - - /** - * value that indicates the end of the current page. That is, - * if valuesRead == endOfPageValueCount, we are at the end of the page. - */ - private long endOfPageValueCount; - - /** - * The dictionary, if this column has dictionary encoding. - */ - private final Dictionary dictionary; - - /** - * If true, the current page is dictionary encoded. - */ - private boolean isCurrentPageDictionaryEncoded; - - /** - * Maximum definition level for this column. - */ - private final int maxDefLevel; - - private int definitionLevel; - private int repetitionLevel; - - /** - * Repetition/Definition/Value readers. - */ - private IntIterator repetitionLevelColumn; - private IntIterator definitionLevelColumn; - private ValuesReader dataColumn; - - /** - * Total values in the current page. - */ - private int pageValueCount; - - private final PageReader pageReader; - private final ColumnDescriptor descriptor; - private final Type type; +public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader { public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, Type type) throws IOException { - this.descriptor = descriptor; - this.type = type; - this.pageReader = pageReader; - this.maxDefLevel = descriptor.getMaxDefinitionLevel(); - this.skipTimestampConversion = skipTimestampConversion; - - DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); - if (dictionaryPage != null) { - try { - this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); - this.isCurrentPageDictionaryEncoded = true; - } catch (IOException e) { - throw new IOException("could not decode the dictionary for " + descriptor, e); - } - } else { - this.dictionary = null; - this.isCurrentPageDictionaryEncoded = false; - } + super(descriptor, pageReader, skipTimestampConversion, type); } + @Override public void readBatch( int total, ColumnVector column, @@ -164,6 +80,7 @@ public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader { TypeInfo columnType, int rowId) throws IOException { PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + switch (primitiveColumnType.getPrimitiveCategory()) { case INT: case BYTE: @@ -444,143 +361,4 @@ public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader { throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType()); } } - - private void readRepetitionAndDefinitionLevels() { - repetitionLevel = repetitionLevelColumn.nextInt(); - definitionLevel = definitionLevelColumn.nextInt(); - valuesRead++; - } - - private void readPage() throws IOException { - DataPage page = pageReader.readPage(); - // TODO: Why is this a visitor? - page.accept(new DataPage.Visitor<Void>() { - @Override - public Void visit(DataPageV1 dataPageV1) { - readPageV1(dataPageV1); - return null; - } - - @Override - public Void visit(DataPageV2 dataPageV2) { - readPageV2(dataPageV2); - return null; - } - }); - } - - private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException { - this.pageValueCount = valueCount; - this.endOfPageValueCount = valuesRead + pageValueCount; - if (dataEncoding.usesDictionary()) { - this.dataColumn = null; - if (dictionary == null) { - throw new IOException( - "could not read page in col " + descriptor + - " as the dictionary was missing for encoding " + dataEncoding); - } - dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary); - this.isCurrentPageDictionaryEncoded = true; - } else { - dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); - this.isCurrentPageDictionaryEncoded = false; - } - - try { - dataColumn.initFromPage(pageValueCount, bytes, offset); - } catch (IOException e) { - throw new IOException("could not read page in col " + descriptor, e); - } - } - - private void readPageV1(DataPageV1 page) { - ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); - ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); - this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); - this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); - try { - byte[] bytes = page.getBytes().toByteArray(); - LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); - LOG.debug("reading repetition levels at 0"); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - LOG.debug("reading definition levels at " + next); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - LOG.debug("reading data at " + next); - initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); - } catch (IOException e) { - throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); - } - } - - private void readPageV2(DataPageV2 page) { - this.pageValueCount = page.getValueCount(); - this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), - page.getRepetitionLevels()); - this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); - try { - LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); - initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); - } catch (IOException e) { - throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); - } - } - - private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { - try { - if (maxLevel == 0) { - return new NullIntIterator(); - } - return new RLEIntIterator( - new RunLengthBitPackingHybridDecoder( - BytesUtils.getWidthFromMaxInt(maxLevel), - new ByteArrayInputStream(bytes.toByteArray()))); - } catch (IOException e) { - throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e); - } - } - - /** - * Utility classes to abstract over different way to read ints with different encodings. - * TODO: remove this layer of abstraction? - */ - abstract static class IntIterator { - abstract int nextInt(); - } - - protected static final class ValuesReaderIntIterator extends IntIterator { - ValuesReader delegate; - - public ValuesReaderIntIterator(ValuesReader delegate) { - this.delegate = delegate; - } - - @Override - int nextInt() { - return delegate.readInteger(); - } - } - - protected static final class RLEIntIterator extends IntIterator { - RunLengthBitPackingHybridDecoder delegate; - - public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { - this.delegate = delegate; - } - - @Override - int nextInt() { - try { - return delegate.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } - } - } - - protected static final class NullIntIterator extends IntIterator { - @Override - int nextInt() { return 0; } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java new file mode 100644 index 0000000..de19615 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedListColumnReader.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.api.Binary; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestVectorizedListColumnReader extends VectorizedColumnReaderTestBase { + + protected static void writeListData(ParquetWriter<Group> writer, boolean isDictionaryEncoding, + int elementNum) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + int listMaxSize = 4; + int listElementIndex = 0; + for (int i = 0; i < elementNum; i++) { + boolean isNull = isNull(i); + Group group = f.newGroup(); + + int listSize = i % listMaxSize + 1; + if (!isNull) { + for (int j = 0; j < listSize; j++) { + group.append("list_int32_field", getIntValue(isDictionaryEncoding, listElementIndex)); + group.append("list_int64_field", getLongValue(isDictionaryEncoding, listElementIndex)); + group.append("list_double_field", getDoubleValue(isDictionaryEncoding, listElementIndex)); + group.append("list_float_field", getFloatValue(isDictionaryEncoding, listElementIndex)); + group.append("list_boolean_field", getBooleanValue(listElementIndex)); + group.append("list_binary_field", getBinaryValue(isDictionaryEncoding, listElementIndex)); + + HiveDecimal hd = getDecimal(isDictionaryEncoding, listElementIndex).setScale(2); + HiveDecimalWritable hdw = new HiveDecimalWritable(hd); + group.append("list_decimal_field", Binary.fromConstantByteArray(hdw.getInternalStorage())); + listElementIndex++; + } + } + for (int j = 0; j < listMaxSize; j++) { + group.append("list_int32_field_for_repeat_test", getIntValue(isDictionaryEncoding, j)); + } + writer.write(group); + } + writer.close(); + } + + protected static void writeRepeateListData(ParquetWriter<Group> writer, + int elementNum, boolean isNull) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + int listMaxSize = 4; + for (int i = 0; i < elementNum; i++) { + Group group = f.newGroup(); + if (!isNull) { + for (int j = 0; j < listMaxSize; j++) { + group.append("list_int32_field_for_repeat_test", j); + } + } + writer.write(group); + } + writer.close(); + } + + @Test + public void testListReadLessOneBatch() throws Exception { + boolean isDictionaryEncoding = false; + removeFile(); + writeListData(initWriterFromFile(), isDictionaryEncoding, 1023); + testListReadAllType(isDictionaryEncoding, 1023); + removeFile(); + isDictionaryEncoding = true; + writeListData(initWriterFromFile(), isDictionaryEncoding, 1023); + testListReadAllType(isDictionaryEncoding, 1023); + removeFile(); + } + + @Test + public void testListReadEqualOneBatch() throws Exception { + boolean isDictionaryEncoding = false; + removeFile(); + writeListData(initWriterFromFile(), isDictionaryEncoding, 1024); + testListReadAllType(isDictionaryEncoding, 1024); + removeFile(); + isDictionaryEncoding = true; + writeListData(initWriterFromFile(), isDictionaryEncoding, 1024); + testListReadAllType(isDictionaryEncoding, 1024); + removeFile(); + } + + @Test + public void testListReadMoreOneBatch() throws Exception { + boolean isDictionaryEncoding = false; + removeFile(); + writeListData(initWriterFromFile(), isDictionaryEncoding, 1025); + testListReadAllType(isDictionaryEncoding, 1025); + removeFile(); + isDictionaryEncoding = true; + writeListData(initWriterFromFile(), isDictionaryEncoding, 1025); + testListReadAllType(isDictionaryEncoding, 1025); + removeFile(); + } + + @Test + public void testRepeateListRead() throws Exception { + removeFile(); + writeRepeateListData(initWriterFromFile(), 1023, false); + testRepeateListRead(1023, false); + removeFile(); + writeRepeateListData(initWriterFromFile(), 1023, true); + testRepeateListRead(1023, true); + removeFile(); + writeRepeateListData(initWriterFromFile(), 1024, false); + testRepeateListRead(1024, false); + removeFile(); + writeRepeateListData(initWriterFromFile(), 1024, true); + testRepeateListRead(1024, true); + removeFile(); + writeRepeateListData(initWriterFromFile(), 1025, false); + testRepeateListRead(1025, false); + removeFile(); + writeRepeateListData(initWriterFromFile(), 1025, true); + testRepeateListRead(1025, true); + removeFile(); + } + + private void testListReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception { + testListRead(isDictionaryEncoding, "int", elementNum); + testListRead(isDictionaryEncoding, "long", elementNum); + testListRead(isDictionaryEncoding, "double", elementNum); + testListRead(isDictionaryEncoding, "float", elementNum); + testListRead(isDictionaryEncoding, "boolean", elementNum); + testListRead(isDictionaryEncoding, "binary", elementNum); + testListRead(isDictionaryEncoding, "decimal", elementNum); + } + + private void setTypeConfiguration(String type, Configuration conf) { + if ("int".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_int32_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<int>"); + } else if ("long".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<bigint>"); + } else if ("double".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<double>"); + } else if ("float".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<float>"); + } else if ("boolean".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<boolean>"); + } else if ("binary".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_binary_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<string>"); + } else if ("decimal".equals(type)) { + conf.set(IOConstants.COLUMNS, "list_decimal_field"); + conf.set(IOConstants.COLUMNS_TYPES, "array<decimal(5,2)>"); + } + } + + private String getSchema(String type) { + if ("int".equals(type)) { + return "message hive_schema {repeated int32 list_int32_field;}"; + } else if ("long".equals(type)) { + return "message hive_schema {repeated int64 list_int64_field;}"; + } else if ("double".equals(type)) { + return "message hive_schema {repeated double list_double_field;}"; + } else if ("float".equals(type)) { + return "message hive_schema {repeated float list_float_field;}"; + } else if ("boolean".equals(type)) { + return "message hive_schema {repeated boolean list_boolean_field;}"; + } else if ("binary".equals(type)) { + return "message hive_schema {repeated binary list_binary_field;}"; + } else if ("decimal".equals(type)) { + return "message hive_schema {repeated binary list_decimal_field (DECIMAL(5,2));}"; + } else { + throw new RuntimeException("Unsupported type for TestVectorizedListColumnReader!"); + } + } + + private void assertValue(String type, ColumnVector childVector, + boolean isDictionaryEncoding, int valueIndex, int position) { + if ("int".equals(type)) { + assertEquals(getIntValue(isDictionaryEncoding, valueIndex), ((LongColumnVector)childVector).vector[position]); + } else if ("long".equals(type)) { + assertEquals(getLongValue(isDictionaryEncoding, valueIndex), ((LongColumnVector)childVector).vector[position]); + } else if ("double".equals(type)) { + assertEquals(getDoubleValue(isDictionaryEncoding, valueIndex), ((DoubleColumnVector)childVector).vector[position], 0); + } else if ("float".equals(type)) { + assertEquals(getFloatValue(isDictionaryEncoding, valueIndex), ((DoubleColumnVector)childVector).vector[position], 0); + } else if ("boolean".equals(type)) { + assertEquals((getBooleanValue(valueIndex) ? 1 : 0), ((LongColumnVector)childVector).vector[position]); + } else if ("binary".equals(type)) { + String actual = new String(ArrayUtils + .subarray(((BytesColumnVector)childVector).vector[position], ((BytesColumnVector)childVector).start[position], + ((BytesColumnVector)childVector).start[position] + ((BytesColumnVector)childVector).length[position])); + assertEquals(getStr(isDictionaryEncoding, valueIndex), actual); + } else if ("decimal".equals(type)) { + assertEquals(getDecimal(isDictionaryEncoding, valueIndex), + ((DecimalColumnVector)childVector).vector[position].getHiveDecimal()); + } else { + throw new RuntimeException("Unsupported type for TestVectorizedListColumnReader!"); + } + + } + + private void testListRead(boolean isDictionaryEncoding, String type, int elementNum) throws Exception { + Configuration conf = new Configuration(); + setTypeConfiguration(type, conf); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = createTestParquetReader(getSchema(type), conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + int index = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + for (int i = 0; i < vector.offsets.length; i++) { + if (row == elementNum) { + assertEquals(i, vector.offsets.length - 1); + break; + } + long start = vector.offsets[i]; + long length = vector.lengths[i]; + boolean isNull = isNull(row); + if (isNull) { + assertEquals(vector.isNull[i], true); + } else { + for (long j = 0; j < length; j++) { + assertValue(type, vector.child, isDictionaryEncoding, index, (int) (start + j)); + index++; + } + } + row++; + } + } + assertEquals("It doesn't exit at expected position", elementNum, row); + } finally { + reader.close(); + } + } + + private void testRepeateListRead(int elementNum, boolean isNull) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "list_int32_field_for_repeat_test"); + conf.set(IOConstants.COLUMNS_TYPES, "array<int>"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = createTestParquetReader( + "message hive_schema {repeated int32 list_int32_field_for_repeat_test;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int row = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + ListColumnVector vector = (ListColumnVector) previous.cols[0]; + + assertTrue(vector.isRepeating); + assertEquals(isNull, vector.isNull[0]); + + for (int i = 0; i < vector.offsets.length; i++) { + if (row == elementNum) { + assertEquals(i, vector.offsets.length - 1); + break; + } + row++; + } + } + assertEquals("It doesn't exit at expected position", elementNum, row); + } finally { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/39d46e8a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index 1a5d095..929e991 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -115,6 +116,15 @@ public class VectorizedColumnReaderTestBase { + " optional int32 array_element;\n" + " }\n" + "}\n" + + "repeated int32 list_int32_field;" + + "repeated int64 list_int64_field;" + + "repeated double list_double_field;" + + "repeated float list_float_field;" + + "repeated boolean list_boolean_field;" + + "repeated fixed_len_byte_array(3) list_byte_array_field;" + + "repeated binary list_binary_field;" + + "repeated binary list_decimal_field (DECIMAL(5,2));" + + "repeated int32 list_int32_field_for_repeat_test;" + "} "); protected static void removeFile() throws IOException {