Drill 419 - enable dictionary encoding in parquet reader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b8731b68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b8731b68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b8731b68 Branch: refs/heads/master Commit: b8731b68bdc23d5f7766f45c47ca3b7257789df7 Parents: f071aca Author: Jason Altekruse <[email protected]> Authored: Mon Mar 31 10:57:48 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Mon May 5 18:53:25 2014 -0700 ---------------------------------------------------------------------- .../exec/store/parquet/ColumnDataReader.java | 10 +- .../drill/exec/store/parquet/ColumnReader.java | 5 + .../store/parquet/NullableColumnReader.java | 3 +- .../exec/store/parquet/PageReadStatus.java | 68 ++++- .../exec/store/parquet/ParquetRecordReader.java | 29 ++- .../exec/store/parquet/VarLenBinaryReader.java | 184 +------------- .../store/parquet/VarLengthColumnReaders.java | 250 +++++++++++++++++++ .../store/parquet/ParquetRecordReaderTest.java | 15 +- 8 files changed, 364 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java index a890f1c..8c6f120 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java @@ -32,16 +32,20 @@ class ColumnDataReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class); private final long endPosition; - private final FSDataInputStream input; + public final FSDataInputStream input; - public ColumnDataReader(FileSystem fs, Path path, long start, long length) throws IOException{ - this.input = fs.open(path, 64 * 1024); + public ColumnDataReader(FSDataInputStream input, long start, long length) throws IOException{ + this.input = input; this.input.seek(start); this.endPosition = start + length; } public PageHeader readPageHeader() throws IOException{ + try{ return Util.readPageHeader(input); + }catch (IOException e) { + throw e; + } } public BytesInput getPageAsBytesInput(int pageLength) throws IOException{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java index 97ecfb8..196e1fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java @@ -36,6 +36,11 @@ abstract class ColumnReader<V extends ValueVector> { // Value Vector for this column final V valueVec; + + ColumnDescriptor getColumnDescriptor() { + return columnDescriptor; + } + // column description from the parquet library final ColumnDescriptor columnDescriptor; // metadata of the column, from the parquet library http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java index 8faf756..66d1c5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java @@ -73,7 +73,8 @@ abstract class NullableColumnReader extends ColumnReader{ lastValueWasNull = true; nullsFound = 0; if (currentValueIndexInVector - totalValuesRead == recordsToReadInThisPass - || currentValueIndexInVector >= valueVec.getValueCapacity()){ + || currentValueIndexInVector >= valueVec.getValueCapacity() + || pageReadStatus.readPosInBytes >= pageReadStatus.byteLength){ break; } while(currentValueIndexInVector - totalValuesRead < recordsToReadInThisPass http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java index fe83159..021b622 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java @@ -18,16 +18,28 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; +import java.util.ArrayList; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.proto.SchemaDefProtos; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import parquet.bytes.BytesInput; +import parquet.column.Dictionary; +import parquet.column.Encoding; import parquet.column.ValuesType; +import parquet.column.page.DictionaryPage; import parquet.column.page.Page; import parquet.column.values.ValuesReader; +import parquet.column.values.dictionary.DictionaryValuesReader; import parquet.format.PageHeader; +import parquet.format.PageType; +import parquet.format.Util; import parquet.hadoop.metadata.ColumnChunkMetaData; // class to keep track of the read position of variable length columns @@ -52,15 +64,27 @@ final class PageReadStatus { //int rowGroupIndex; ValuesReader definitionLevels; ValuesReader valueReader; + Dictionary dictionary; PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ this.parentColumnReader = parentStatus; long totalByteLength = columnChunkMetaData.getTotalSize(); long start = columnChunkMetaData.getFirstDataPageOffset(); - - try{ - this.dataReader = new ColumnDataReader(fs, path, start, totalByteLength); + try { + FSDataInputStream f = fs.open(path); + if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + f.seek(columnChunkMetaData.getDictionaryPageOffset()); + PageHeader pageHeader = Util.readPageHeader(f); + assert pageHeader.type == PageType.DICTIONARY_PAGE; + DictionaryPage page = new DictionaryPage(BytesInput.copy(BytesInput.from(f, pageHeader.compressed_page_size)), + pageHeader.uncompressed_page_size, + pageHeader.dictionary_page_header.num_values, + parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) + ); + this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); + } + this.dataReader = new ColumnDataReader(f, start, totalByteLength); } catch (IOException e) { throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e); } @@ -78,10 +102,25 @@ final class PageReadStatus { currentPage = null; - if(!dataReader.hasRemainder()) return false; + if(!dataReader.hasRemainder()) { + return false; + } // next, we need to decompress the bytes - PageHeader pageHeader = dataReader.readPageHeader(); + PageHeader pageHeader = null; + // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one + // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary + do { + pageHeader = dataReader.readPageHeader(); + if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { + DictionaryPage page = new DictionaryPage(BytesInput.copy(BytesInput.from(dataReader.input, pageHeader.compressed_page_size)), + pageHeader.uncompressed_page_size, + pageHeader.dictionary_page_header.num_values, + parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) + ); + this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page); + } + } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() .decompress( // @@ -113,13 +152,20 @@ final class PageReadStatus { readPosInBytes = 0; valuesRead = 0; if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){ - definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); - readPosInBytes = definitionLevels.getNextOffset(); - valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + if (!currentPage.getValueEncoding().usesDictionary()) { + definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); + readPosInBytes = definitionLevels.getNextOffset(); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + } else { + definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); + readPosInBytes = definitionLevels.getNextOffset(); + valueReader = new DictionaryValuesReader(dictionary); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + } } - return true; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 463f3ed..75cd799 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ExpressionPosition; @@ -54,13 +55,18 @@ import parquet.format.SchemaElement; import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; +import parquet.column.Encoding; +import parquet.hadoop.CodecFactoryExposer; +import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; +import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.*; + import com.google.common.base.Joiner; -class ParquetRecordReader implements RecordReader { +public class ParquetRecordReader implements RecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class); // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors @@ -241,8 +247,8 @@ class ParquetRecordReader implements RecordReader { try { ValueVector v; ConvertedType convertedType; - ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new ArrayList<>(); - ArrayList<VarLenBinaryReader.NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>(); + ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>(); + ArrayList<NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>(); // initialize all of the column read status objects boolean fieldFixedLength = false; for (int i = 0; i < columns.size(); ++i) { @@ -250,6 +256,7 @@ class ParquetRecordReader implements RecordReader { columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i); convertedType = convertedTypes.get(column.getPath()[0]); MajorType type = toMajorType(column.getType(), getDataMode(column), convertedType); +// Preconditions.checkArgument(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY), "Dictionary Encoding not currently supported"); field = MaterializedField.create(toFieldName(column.getPath()), type); // the field was not requested to be read if ( ! fieldSelected(field)) continue; @@ -264,20 +271,20 @@ class ParquetRecordReader implements RecordReader { if (column.getMaxDefinitionLevel() == 0){// column is required if (convertedType == ConvertedType.UTF8) { varLengthColumns.add( - new VarLenBinaryReader.VarCharColumn(this, -1, column, columnChunkMetaData, false, (VarCharVector) v, convertedType)); + new VarCharColumn(this, -1, column, columnChunkMetaData, false, (VarCharVector) v, convertedType)); } else { varLengthColumns.add( - new VarLenBinaryReader.VarBinaryColumn(this, -1, column, columnChunkMetaData, false, (VarBinaryVector) v, convertedType)); + new VarBinaryColumn(this, -1, column, columnChunkMetaData, false, (VarBinaryVector) v, convertedType)); } } else{ if (convertedType == ConvertedType.UTF8) { nullableVarLengthColumns.add( - new VarLenBinaryReader.NullableVarCharColumn(this, -1, column, columnChunkMetaData, false, + new NullableVarCharColumn(this, -1, column, columnChunkMetaData, false, (NullableVarCharVector) v, convertedType)); } else { nullableVarLengthColumns.add( - new VarLenBinaryReader.NullableVarBinaryColumn(this, -1, column, columnChunkMetaData, false, + new NullableVarBinaryColumn(this, -1, column, columnChunkMetaData, false, (NullableVarBinaryVector) v, convertedType)); } } @@ -314,11 +321,11 @@ class ParquetRecordReader implements RecordReader { AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5); column.valuesReadInCurrentPass = 0; } - for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){ + for (VarLengthColumn r : varLengthReader.columns){ AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5); r.valuesReadInCurrentPass = 0; } - for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){ + for (NullableVarLengthColumn r : varLengthReader.nullableColumns){ AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5); r.valuesReadInCurrentPass = 0; } @@ -535,10 +542,10 @@ class ParquetRecordReader implements RecordReader { } columnStatuses.clear(); - for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){ + for (VarLengthColumn r : varLengthReader.columns){ r.clear(); } - for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){ + for (NullableVarLengthColumn r : varLengthReader.nullableColumns){ r.clear(); } varLengthReader.columns.clear(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java index ae01104..c217e80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java @@ -17,19 +17,10 @@ */ package org.apache.drill.exec.store.parquet; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.vector.*; -import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.VarBinaryVector; -import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.*; import parquet.bytes.BytesUtils; -import parquet.column.ColumnDescriptor; -import parquet.format.ConvertedType; -import parquet.hadoop.metadata.ColumnChunkMetaData; import java.io.IOException; -import java.util.HashMap; import java.util.List; public class VarLenBinaryReader { @@ -45,164 +36,6 @@ public class VarLenBinaryReader { this.columns = columns; } - public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { - - VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, - ConvertedType convertedType) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - - public abstract boolean setSafe(int index, byte[] bytes, int start, int length); - - public abstract int capacity(); - - } - - public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader { - - int nullsRead; - boolean currentValNull = false; - - NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, - ConvertedType convertedType ) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - } - - public abstract boolean setSafe(int index, byte[] value, int start, int length); - - public abstract int capacity(); - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - } - - public static class VarCharColumn extends VarLengthColumn <VarCharVector> { - - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected VarCharVector varCharVector; - - VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v, - ConvertedType convertedType) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - varCharVector = v; - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - return varCharVector.getMutator().setSafe(valuesReadInCurrentPass, bytes, - (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits); - } - - @Override - public int capacity() { - return varCharVector.getData().capacity(); - } - } - - public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> { - - int nullsRead; - boolean currentValNull = false; - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected NullableVarCharVector nullableVarCharVector; - - NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, - ConvertedType convertedType ) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - nullableVarCharVector = v; - } - - public boolean setSafe(int index, byte[] value, int start, int length) { - return nullableVarCharVector.getMutator().setSafe(index, value, - start, length); - } - - @Override - public int capacity() { - return nullableVarCharVector.getData().capacity(); - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - } - - public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> { - - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected VarBinaryVector varBinaryVector; - - VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, - ConvertedType convertedType) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - varBinaryVector = v; - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { - return varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, bytes, - (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits); - } - - @Override - public int capacity() { - return varBinaryVector.getData().capacity(); - } - } - - public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> { - - int nullsRead; - boolean currentValNull = false; - // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected NullableVarBinaryVector nullableVarBinaryVector; - - NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, - ConvertedType convertedType ) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); - nullableVarBinaryVector = v; - } - - public boolean setSafe(int index, byte[] value, int start, int length) { - return nullableVarBinaryVector.getMutator().setSafe(index, value, - start, length); - } - - @Override - public int capacity() { - return nullableVarBinaryVector.getData().capacity(); - } - - @Override - protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { - throw new UnsupportedOperationException(); - } - } - /** * Reads as many variable length values as possible. * @@ -278,9 +111,16 @@ public class VarLenBinaryReader { continue;// field is null, no length to add to data vector } - // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes, - (int) columnReader.pageReadStatus.readPosInBytes); + if (columnReader.usingDictionary) { + columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes(); + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + columnReader.dataTypeLengthInBits = columnReader.currDictVal.length(); + } + else { + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes, + (int) columnReader.pageReadStatus.readPosInBytes); + } lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) { @@ -308,7 +148,7 @@ public class VarLenBinaryReader { // again, I am re-purposing the unused field here, it is a length n BYTES, not bits if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){ boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes, - (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); + (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); assert success; } columnReader.currentValNull = false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java new file mode 100644 index 0000000..7e9d770 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java @@ -0,0 +1,250 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.store.parquet.ColumnReader; +import org.apache.drill.exec.store.parquet.ParquetRecordReader; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; +import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; +import parquet.column.Encoding; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.io.api.Binary; + +public class VarLengthColumnReaders { + + public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { + + boolean usingDictionary; + Binary currDictVal; + + VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + + public abstract boolean setSafe(int index, byte[] bytes, int start, int length); + + public abstract int capacity(); + + } + + public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader { + + int nullsRead; + boolean currentValNull = false; + boolean usingDictionary; + Binary currDictVal; + + NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + ConvertedType convertedType ) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + } + + public abstract boolean setSafe(int index, byte[] value, int start, int length); + + public abstract int capacity(); + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + } + + public static class VarCharColumn extends VarLengthColumn <VarCharVector> { + + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected VarCharVector varCharVector; + + VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v, + ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } + varCharVector = v; + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + boolean success; + if (usingDictionary) { + success = varCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), + 0, currDictVal.length()); + } + else { + success = varCharVector.getMutator().setSafe(index, bytes, start, length); + } + return success; + } + + @Override + public int capacity() { + return varCharVector.getData().capacity(); + } + } + + public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> { + + int nullsRead; + boolean currentValNull = false; + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected NullableVarCharVector nullableVarCharVector; + + NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, + ConvertedType convertedType ) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + nullableVarCharVector = v; + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } + } + + public boolean setSafe(int index, byte[] value, int start, int length) { + boolean success; + if (usingDictionary) { + success = nullableVarCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), + 0, currDictVal.length()); + } + else { + success = nullableVarCharVector.getMutator().setSafe(index, value, start, length); + } + return success; + } + + @Override + public int capacity() { + return nullableVarCharVector.getData().capacity(); + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + } + + public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> { + + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected VarBinaryVector varBinaryVector; + + VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, + ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } + varBinaryVector = v; + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + boolean success; + if (usingDictionary) { + success = varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), + 0, currDictVal.length()); + } + else { + success = varBinaryVector.getMutator().setSafe(index, bytes, start, length); + } + return success; + } + + @Override + public int capacity() { + return varBinaryVector.getData().capacity(); + } + } + + public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> { + + int nullsRead; + boolean currentValNull = false; + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected org.apache.drill.exec.vector.NullableVarBinaryVector nullableVarBinaryVector; + + NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, + ConvertedType convertedType ) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + nullableVarBinaryVector = v; + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + usingDictionary = true; + } + else { + usingDictionary = false; + } + } + + public boolean setSafe(int index, byte[] value, int start, int length) { + boolean success; + if (usingDictionary) { + success = nullableVarBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(), + 0, currDictVal.length()); + } + else { + success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length); + } + return success; + } + + @Override + public int capacity() { + return nullableVarBinaryVector.getData().capacity(); + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 67b5394..5d2c859 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -284,14 +284,13 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ */ public void testNullableColumnsVarLen() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields); + ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields); byte[] val = {'b'}; byte[] val2 = {'b', '2'}; byte[] val3 = {'b', '3'}; byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'}; Object[] boolVals = { val, val2, val4}; props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props)); - // testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props); fields.clear(); // pass strings instead of byte arrays @@ -301,6 +300,18 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ "\"/tmp/varLen.parquet/a\"", "unused", 1, props); } + @Ignore + @Test + public void testDictionaryEncoding() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields); + Object[] boolVals2 = { "b", "b2", "b3"}; + props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props)); + // test dictionary encoding + testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json", + "\"/tmp/dictionary_pig.parquet/a\"", "unused", 1, props); + } + @Test public void testMultipleRowGroupsAndReads() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>();
