This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 09d15f39e2cda10049affca472689a4e2cbca45d Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Fri Oct 4 14:36:01 2019 +0300 DRILL-5983: Add missing nullable Parquet readers for INT and UINT logical types closes #1866 --- .../parquet/columnreaders/ColumnReaderFactory.java | 36 ++++++++--- .../NullableFixedByteAlignedReaders.java | 63 +++++++++++++++++-- .../ParquetFixedWidthDictionaryReaders.java | 72 +++++++++++----------- 3 files changed, 120 insertions(+), 51 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 03d5382..7f8c018 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -19,8 +19,6 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.vector.VarDecimalVector; -import org.apache.drill.exec.vector.NullableVarDecimalVector; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.BitVector; import org.apache.drill.exec.vector.DateVector; @@ -37,8 +35,11 @@ import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.NullableIntervalVector; import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; +import org.apache.drill.exec.vector.NullableUInt4Vector; +import org.apache.drill.exec.vector.NullableUInt8Vector; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.NullableVarDecimalVector; import org.apache.drill.exec.vector.TimeStampVector; import org.apache.drill.exec.vector.TimeVector; import org.apache.drill.exec.vector.UInt4Vector; @@ -46,6 +47,7 @@ import org.apache.drill.exec.vector.UInt8Vector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.vector.VarDecimalVector; import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -57,21 +59,22 @@ import org.apache.parquet.schema.PrimitiveType; public class ColumnReaderFactory { /** - * @param fixedLength - * @param descriptor - * @param columnChunkMetaData + * Creates fixed column reader for the given column based on its metadata. + * + * @param fixedLength if fixed length reader should be used + * @param descriptor column descriptor + * @param columnChunkMetaData column metadata + * * @return ColumnReader object instance - * @throws SchemaChangeException */ static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, ValueVector v, - SchemaElement schemaElement) - throws Exception { + ColumnChunkMetaData columnChunkMetaData, ValueVector v, + SchemaElement schemaElement) throws Exception { ConvertedType convertedType = schemaElement.getConverted_type(); // if the column is required, or repeated (in which case we just want to use this to generate our appropriate // ColumnReader for actually transferring data into the data vector inside of our repeated vector if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) { - if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ + if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { return new BitReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (BitVector) v, schemaElement); } else if (!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && ( @@ -279,6 +282,16 @@ public class ColumnReaderFactory { return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement); } switch (convertedType) { + case INT_8: + case INT_16: + case INT_32: + return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, + columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement); + case UINT_8: + case UINT_16: + case UINT_32: + return new NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader, + columnDescriptor, columnChunkMetaData, fixedLength, (NullableUInt4Vector) valueVec, schemaElement); case DECIMAL: return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement); @@ -292,6 +305,9 @@ public class ColumnReaderFactory { return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement); } switch (convertedType) { + case UINT_64: + return new NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, columnDescriptor, + columnChunkMetaData, fixedLength, (NullableUInt8Vector) valueVec, schemaElement); case DECIMAL: return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index 886721e..94a1f59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -17,10 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import java.nio.ByteBuffer; - -import org.apache.drill.shaded.guava.com.google.common.primitives.Ints; -import org.apache.drill.shaded.guava.com.google.common.primitives.Longs; +import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.NullableTimeStampHolder; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; @@ -32,16 +29,21 @@ import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.NullableIntervalVector; import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; +import org.apache.drill.exec.vector.NullableUInt4Vector; +import org.apache.drill.exec.vector.NullableUInt8Vector; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.NullableVarDecimalVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.shaded.guava.com.google.common.primitives.Ints; +import org.apache.drill.shaded.guava.com.google.common.primitives.Longs; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; import org.joda.time.DateTimeConstants; -import io.netty.buffer.DrillBuf; +import java.nio.ByteBuffer; + import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; public class NullableFixedByteAlignedReaders { @@ -159,6 +161,31 @@ public class NullableFixedByteAlignedReaders { } } + static class NullableDictionaryUInt4Reader extends NullableColumnReader<NullableUInt4Vector> { + + NullableDictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableUInt4Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++) { + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + } + int writerIndex = castedBaseVector.getBuffer().writerIndex(); + castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) readLength); + } else { + for (int i = 0; i < recordsToReadInThisPass; i++) { + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); + } + } + } + } + static class NullableDictionaryTimeReader extends NullableColumnReader<NullableTimeVector> { NullableDictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, @@ -205,6 +232,31 @@ public class NullableFixedByteAlignedReaders { } } + static class NullableDictionaryUInt8Reader extends NullableColumnReader<NullableUInt8Vector> { + + NullableDictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableUInt8Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++) { + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } + int writerIndex = castedBaseVector.getBuffer().writerIndex(); + castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) readLength); + } else { + for (int i = 0; i < recordsToReadInThisPass; i++) { + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong()); + } + } + } + } + static class NullableDictionaryTimeStampReader extends NullableColumnReader<NullableTimeStampVector> { NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, @@ -463,4 +515,3 @@ public class NullableFixedByteAlignedReaders { } } } - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index e25ef1a..9e019ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import org.apache.drill.shaded.guava.com.google.common.primitives.Ints; -import org.apache.drill.shaded.guava.com.google.common.primitives.Longs; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.Float4Vector; @@ -30,6 +28,8 @@ import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.UInt8Vector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarDecimalVector; +import org.apache.drill.shaded.guava.com.google.common.primitives.Ints; +import org.apache.drill.shaded.guava.com.google.common.primitives.Longs; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -51,14 +51,14 @@ public class ParquetFixedWidthDictionaryReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - - recordsReadInThisIteration = Math.min(pageReader.currentPageCount - - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - if (usingDictionary) { - for (int i = 0; i < recordsReadInThisIteration; i++){ + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + for (int i = 0; i < recordsReadInThisIteration; i++) { valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } + } else { + super.readField(recordsToReadInThisPass); } } } @@ -150,14 +150,14 @@ public class ParquetFixedWidthDictionaryReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - - recordsReadInThisIteration = Math.min(pageReader.currentPageCount - - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - if (usingDictionary) { + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); for (int i = 0; i < recordsReadInThisIteration; i++){ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } + } else { + super.readField(recordsToReadInThisPass); } } } @@ -297,16 +297,14 @@ public class ParquetFixedWidthDictionaryReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - - recordsReadInThisIteration = Math.min(pageReader.currentPageCount - - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - - for (int i = 0; i < recordsReadInThisIteration; i++){ - try { + if (usingDictionary) { + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + for (int i = 0; i < recordsReadInThisIteration; i++) { valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); - } catch ( Exception ex) { - throw ex; } + } else { + super.readField(recordsToReadInThisPass); } } } @@ -321,17 +319,15 @@ public class ParquetFixedWidthDictionaryReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - - recordsReadInThisIteration = Math.min(pageReader.currentPageCount - - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - - for (int i = 0; i < recordsReadInThisIteration; i++){ - try { + if (usingDictionary) { + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + for (int i = 0; i < recordsReadInThisIteration; i++) { Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes(); valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true)); - } catch ( Exception ex) { - throw ex; } + } else { + super.readField(recordsToReadInThisPass); } } } @@ -346,11 +342,14 @@ public class ParquetFixedWidthDictionaryReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - recordsReadInThisIteration = Math.min(pageReader.currentPageCount + if (usingDictionary) { + recordsReadInThisIteration = Math.min(pageReader.currentPageCount - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - - for (int i = 0; i < recordsReadInThisIteration; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + for (int i = 0; i < recordsReadInThisIteration; i++) { + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + } + } else { + super.readField(recordsToReadInThisPass); } } } @@ -365,11 +364,14 @@ public class ParquetFixedWidthDictionaryReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - recordsReadInThisIteration = Math.min(pageReader.currentPageCount + if (usingDictionary) { + recordsReadInThisIteration = Math.min(pageReader.currentPageCount - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - - for (int i = 0; i < recordsReadInThisIteration; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + for (int i = 0; i < recordsReadInThisIteration; i++) { + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + } + } else { + super.readField(recordsToReadInThisPass); } } }