This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 09d15f39e2cda10049affca472689a4e2cbca45d
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
AuthorDate: Fri Oct 4 14:36:01 2019 +0300

    DRILL-5983: Add missing nullable Parquet readers for INT and UINT logical 
types
    
    closes #1866
---
 .../parquet/columnreaders/ColumnReaderFactory.java | 36 ++++++++---
 .../NullableFixedByteAlignedReaders.java           | 63 +++++++++++++++++--
 .../ParquetFixedWidthDictionaryReaders.java        | 72 +++++++++++-----------
 3 files changed, 120 insertions(+), 51 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 03d5382..7f8c018 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.vector.VarDecimalVector;
-import org.apache.drill.exec.vector.NullableVarDecimalVector;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.DateVector;
@@ -37,8 +35,11 @@ import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableUInt8Vector;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVarDecimalVector;
 import org.apache.drill.exec.vector.TimeStampVector;
 import org.apache.drill.exec.vector.TimeVector;
 import org.apache.drill.exec.vector.UInt4Vector;
@@ -46,6 +47,7 @@ import org.apache.drill.exec.vector.UInt8Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VarDecimalVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
@@ -57,21 +59,22 @@ import org.apache.parquet.schema.PrimitiveType;
 public class ColumnReaderFactory {
 
   /**
-   * @param fixedLength
-   * @param descriptor
-   * @param columnChunkMetaData
+   * Creates fixed column reader for the given column based on its metadata.
+   *
+   * @param fixedLength if fixed length reader should be used
+   * @param descriptor column descriptor
+   * @param columnChunkMetaData column metadata
+   *
    * @return ColumnReader object instance
-   * @throws SchemaChangeException
    */
   static ColumnReader<?> createFixedColumnReader(ParquetRecordReader 
recordReader, boolean fixedLength, ColumnDescriptor descriptor,
-                                               ColumnChunkMetaData 
columnChunkMetaData, ValueVector v,
-                                               SchemaElement schemaElement)
-      throws Exception {
+                                                 ColumnChunkMetaData 
columnChunkMetaData, ValueVector v,
+                                                 SchemaElement schemaElement) 
throws Exception {
     ConvertedType convertedType = schemaElement.getConverted_type();
     // if the column is required, or repeated (in which case we just want to 
use this to generate our appropriate
     // ColumnReader for actually transferring data into the data vector inside 
of our repeated vector
     if (descriptor.getMaxDefinitionLevel() == 0 || 
descriptor.getMaxRepetitionLevel() > 0) {
-      if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN){
+      if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
         return new BitReader(recordReader, descriptor, columnChunkMetaData,
             fixedLength, (BitVector) v, schemaElement);
       } else if 
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && (
@@ -279,6 +282,16 @@ public class ColumnReaderFactory {
             return new 
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) 
valueVec, schemaElement);
           }
           switch (convertedType) {
+            case INT_8:
+            case INT_16:
+            case INT_32:
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableIntVector) valueVec, schemaElement);
+            case UINT_8:
+            case UINT_16:
+            case UINT_32:
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableUInt4Vector) valueVec, schemaElement);
             case DECIMAL:
               return new 
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
                   columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
@@ -292,6 +305,9 @@ public class ColumnReaderFactory {
             return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector)valueVec, schemaElement);
           }
           switch (convertedType) {
+            case UINT_64:
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, 
columnDescriptor,
+                columnChunkMetaData, fixedLength, (NullableUInt8Vector) 
valueVec, schemaElement);
             case DECIMAL:
               return new 
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
                   columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 886721e..94a1f59 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -17,10 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import java.nio.ByteBuffer;
-
-import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
@@ -32,16 +29,21 @@ import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableUInt8Vector;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarDecimalVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.io.api.Binary;
 import org.joda.time.DateTimeConstants;
 
-import io.netty.buffer.DrillBuf;
+import java.nio.ByteBuffer;
+
 import static 
org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
 
 public class NullableFixedByteAlignedReaders {
@@ -159,6 +161,31 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  static class NullableDictionaryUInt4Reader extends 
NullableColumnReader<NullableUInt4Vector> {
+
+    NullableDictionaryUInt4Reader(ParquetRecordReader parentReader, 
ColumnDescriptor descriptor,
+                                  ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableUInt4Vector v,
+                                  SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, 
schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++) {
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readInteger());
+        }
+        int writerIndex = castedBaseVector.getBuffer().writerIndex();
+        castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) 
readLength);
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++) {
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.valueReader.readInteger());
+        }
+      }
+    }
+  }
+
   static class NullableDictionaryTimeReader extends 
NullableColumnReader<NullableTimeVector> {
 
     NullableDictionaryTimeReader(ParquetRecordReader parentReader, 
ColumnDescriptor descriptor,
@@ -205,6 +232,31 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  static class NullableDictionaryUInt8Reader extends 
NullableColumnReader<NullableUInt8Vector> {
+
+    NullableDictionaryUInt8Reader(ParquetRecordReader parentReader, 
ColumnDescriptor descriptor,
+                                  ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableUInt8Vector v,
+                                  SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, 
schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++) {
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readLong());
+        }
+        int writerIndex = castedBaseVector.getBuffer().writerIndex();
+        castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) 
readLength);
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++) {
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.valueReader.readLong());
+        }
+      }
+    }
+  }
+
   static class NullableDictionaryTimeStampReader extends 
NullableColumnReader<NullableTimeStampVector> {
 
     NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, 
ColumnDescriptor descriptor,
@@ -463,4 +515,3 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 }
-
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index e25ef1a..9e019ee 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.Float4Vector;
@@ -30,6 +28,8 @@ import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.UInt8Vector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarDecimalVector;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -51,14 +51,14 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-
-      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
-          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-
       if (usingDictionary) {
-        for (int i = 0; i < recordsReadInThisIteration; i++){
+        recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readInteger());
         }
+      } else {
+        super.readField(recordsToReadInThisPass);
       }
     }
   }
@@ -150,14 +150,14 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-
-      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
-        - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-
       if (usingDictionary) {
+        recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
         for (int i = 0; i < recordsReadInThisIteration; i++){
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readInteger());
         }
+      } else {
+        super.readField(recordsToReadInThisPass);
       }
     }
   }
@@ -297,16 +297,14 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-
-      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
-        - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-
-      for (int i = 0; i < recordsReadInThisIteration; i++){
-        try {
+      if (usingDictionary) {
+        recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readLong());
-        } catch ( Exception ex) {
-          throw ex;
         }
+      } else {
+        super.readField(recordsToReadInThisPass);
       }
     }
   }
@@ -321,17 +319,15 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-
-      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
-              - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-
-      for (int i = 0; i < recordsReadInThisIteration; i++){
-        try {
+      if (usingDictionary) {
+        recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
           Binary binaryTimeStampValue = 
pageReader.dictionaryValueReader.readBytes();
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
getDateTimeValueFromBinary(binaryTimeStampValue, true));
-        } catch ( Exception ex) {
-          throw ex;
         }
+      } else {
+        super.readField(recordsToReadInThisPass);
       }
     }
   }
@@ -346,11 +342,14 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+      if (usingDictionary) {
+        recordsReadInThisIteration = Math.min(pageReader.currentPageCount
           - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-
-      for (int i = 0; i < recordsReadInThisIteration; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readFloat());
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readFloat());
+        }
+      } else {
+        super.readField(recordsToReadInThisPass);
       }
     }
   }
@@ -365,11 +364,14 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+      if (usingDictionary) {
+        recordsReadInThisIteration = Math.min(pageReader.currentPageCount
           - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-
-      for (int i = 0; i < recordsReadInThisIteration; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readDouble());
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readDouble());
+        }
+      } else {
+        super.readField(recordsToReadInThisPass);
       }
     }
   }

Reply via email to