Repository: incubator-drill
Updated Branches:
  refs/heads/master cd7aeebe6 -> ab2795173


DRILL-649: Reading impala and avro generated parquet files

Implemented dictionary encoding for the non-varlength types.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/28dd76af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/28dd76af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/28dd76af

Branch: refs/heads/master
Commit: 28dd76af303169a2fd4269d24c7b6cfa505c25eb
Parents: cd7aeeb
Author: Jason Altekruse <[email protected]>
Authored: Fri May 16 23:19:51 2014 -0500
Committer: Jacques Nadeau <[email protected]>
Committed: Thu May 22 19:30:32 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/parquet/ColumnReader.java  |   1 +
 .../store/parquet/NullableColumnReader.java     |   9 +-
 .../NullableFixedByteAlignedReaders.java        | 170 +++++++++++++++++++
 .../exec/store/parquet/PageReadStatus.java      |  39 +++--
 .../ParquetFixedWidthDictionaryReader.java      |  54 ++++++
 .../exec/store/parquet/ParquetRecordReader.java |  20 ++-
 .../exec/store/parquet/VarLenBinaryReader.java  |   2 +-
 .../store/parquet/VarLengthColumnReaders.java   |   2 -
 .../store/parquet/ParquetRecordReaderTest.java  |  31 +++-
 .../store/parquet/ParquetResultListener.java    |   2 +-
 10 files changed, 296 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index b9faafe..43f27a6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -51,6 +51,7 @@ abstract class ColumnReader<V extends ValueVector> {
   final PageReadStatus pageReadStatus;
 
   final SchemaElement schemaElement;
+  boolean usingDictionary;
 
   // quick reference to see if the field is fixed length (as this requires an 
instanceof)
   final boolean isFixedLength;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 6040c67..687b373 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -28,7 +28,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 import java.io.IOException;
 
-abstract class NullableColumnReader extends ColumnReader{
+abstract class NullableColumnReader<V extends ValueVector> extends 
ColumnReader<V>{
 
   int nullsFound;
   // used to skip nulls found
@@ -37,7 +37,7 @@ abstract class NullableColumnReader extends ColumnReader{
   int bitsUsed;
 
   NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-               boolean fixedLength, ValueVector v, SchemaElement 
schemaElement) throws ExecutionSetupException {
+               boolean fixedLength, V v, SchemaElement schemaElement) throws 
ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
   }
 
@@ -118,7 +118,8 @@ abstract class NullableColumnReader extends ColumnReader{
         valuesReadInCurrentPass += recordsReadInThisIteration;
         totalValuesRead += recordsReadInThisIteration;
         pageReadStatus.valuesRead += recordsReadInThisIteration;
-        if (readStartInBytes + readLength >= pageReadStatus.byteLength && 
bitsUsed == 0) {
+        if ( (readStartInBytes + readLength >= pageReadStatus.byteLength && 
bitsUsed == 0)
+            || pageReadStatus.valuesRead == 
pageReadStatus.currentPage.getValueCount()) {
           if (!pageReadStatus.next()) {
             break;
           }
@@ -133,4 +134,4 @@ abstract class NullableColumnReader extends ColumnReader{
   }
 
   protected abstract void readField(long recordsToRead, ColumnReader 
firstColumnStatus);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
new file mode 100644
index 0000000..6dd96de
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.*;
+
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+class NullableFixedByteAlignedReaders {
+
+  public static NullableColumnReader 
getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
+                                                      ColumnDescriptor 
columnDescriptor,
+                                                      ColumnChunkMetaData 
columnChunkMetaData,
+                                                      boolean fixedLength,
+                                                      ValueVector valueVec,
+                                                      SchemaElement 
schemaElement) throws ExecutionSetupException {
+    if (! 
columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+      return new NullableFixedByteAlignedReader(parentReader, allocateSize, 
columnDescriptor, columnChunkMetaData,
+              fixedLength, valueVec, schemaElement);
+    } else {
+      if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) 
{
+        return new NullableDictionaryBigIntReader(parentReader, allocateSize, 
columnDescriptor, columnChunkMetaData,
+              fixedLength, (NullableBigIntVector)valueVec, schemaElement);
+      }
+      else if (columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.INT32) {
+        return new NullableDicationaryIntReader(parentReader, allocateSize, 
columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableIntVector)valueVec, schemaElement);
+      }
+      else if (columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return new NullableDictionaryFloat4Reader(parentReader, allocateSize, 
columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
+      }
+      else if (columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.DOUBLE) {
+        return new NullableDictionaryFloat8Reader(parentReader, allocateSize, 
columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
+      }
+      else{
+        throw new ExecutionSetupException("Unsupported nullable column type " 
+ columnDescriptor.getType().name() );
+      }
+    }
+  }
+
+  private static class NullableFixedByteAlignedReader extends 
NullableColumnReader {
+    private byte[] bytes;
+
+    NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                      ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, ValueVector v, SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) {
+      this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+      // set up metadata
+      this.readStartInBytes = pageReadStatus.readPosInBytes;
+      this.readLengthInBits = recordsReadInThisIteration * 
dataTypeLengthInBits;
+      this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+      this.bytes = pageReadStatus.pageDataByteArray;
+
+      // fill in data.
+      vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
+    }
+  }
+
+  private static class NullableDicationaryIntReader extends 
NullableColumnReader<NullableIntVector> {
+
+    private byte[] bytes;
+
+    NullableDicationaryIntReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                                 ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableIntVector v,
+                                 SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReadStatus.valueReader.readInteger());
+        }
+      }
+    }
+  }
+
+  private static class NullableDictionaryBigIntReader extends 
NullableColumnReader<NullableBigIntVector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableBigIntVector v,
+                                   SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) {
+      for (int i = 0; i < recordsToReadInThisPass; i++){
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReadStatus.valueReader.readLong());
+      }
+    }
+  }
+
+  private static class NullableDictionaryFloat4Reader extends 
NullableColumnReader<NullableFloat4Vector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableFloat4Vector v,
+                                   SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) {
+      for (int i = 0; i < recordsToReadInThisPass; i++){
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReadStatus.valueReader.readFloat());
+      }
+    }
+  }
+
+  private static class NullableDictionaryFloat8Reader extends 
NullableColumnReader<NullableFloat8Vector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                                  ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableFloat8Vector v,
+                                  SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) {
+      for (int i = 0; i < recordsToReadInThisPass; i++){
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReadStatus.valueReader.readDouble());
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 021b622..20bf3e9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -41,6 +41,7 @@ import parquet.format.PageHeader;
 import parquet.format.PageType;
 import parquet.format.Util;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
 
 // class to keep track of the read position of variable length columns
 final class PageReadStatus {
@@ -65,26 +66,33 @@ final class PageReadStatus {
   ValuesReader definitionLevels;
   ValuesReader valueReader;
   Dictionary dictionary;
+  PageHeader pageHeader = null;
 
   PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, 
ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
 
-    long totalByteLength = columnChunkMetaData.getTotalSize();
+    long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
     long start = columnChunkMetaData.getFirstDataPageOffset();
     try {
       FSDataInputStream f = fs.open(path);
+      this.dataReader = new ColumnDataReader(f, start, totalByteLength);
       if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
         f.seek(columnChunkMetaData.getDictionaryPageOffset());
         PageHeader pageHeader = Util.readPageHeader(f);
         assert pageHeader.type == PageType.DICTIONARY_PAGE;
-        DictionaryPage page = new 
DictionaryPage(BytesInput.copy(BytesInput.from(f, 
pageHeader.compressed_page_size)),
+        BytesInput bytesIn = 
parentColumnReader.parentReader.getCodecFactoryExposer()
+            .decompress( //
+                
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+                pageHeader.getUncompressed_page_size(), //
+                parentColumnReader.columnChunkMetaData.getCodec());
+        DictionaryPage page = new DictionaryPage(
+            bytesIn,
             pageHeader.uncompressed_page_size,
             pageHeader.dictionary_page_header.num_values,
             
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
         );
         this.dictionary = 
page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
       }
-      this.dataReader = new ColumnDataReader(f, start, totalByteLength);
     } catch (IOException e) {
       throw new ExecutionSetupException("Error opening or reading metatdata 
for parquet file at location: " + path.getName(), e);
     }
@@ -102,21 +110,29 @@ final class PageReadStatus {
 
     currentPage = null;
 
-    if(!dataReader.hasRemainder()) {
+    // TODO - the metatdata for total size appears to be incorrect for impala 
generated files, need to find cause
+    // and submit a bug report
+    if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == 
parentColumnReader.columnChunkMetaData.getValueCount()) {
       return false;
     }
 
     // next, we need to decompress the bytes
-    PageHeader pageHeader = null;
     // TODO - figure out if we need multiple dictionary pages, I believe it 
may be limited to one
     // I think we are clobbering parts of the dictionary if there can be 
multiple pages of dictionary
     do {
       pageHeader = dataReader.readPageHeader();
       if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-        DictionaryPage page = new 
DictionaryPage(BytesInput.copy(BytesInput.from(dataReader.input, 
pageHeader.compressed_page_size)),
-                pageHeader.uncompressed_page_size,
-                pageHeader.dictionary_page_header.num_values,
-                
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+        System.out.println(pageHeader.dictionary_page_header.getEncoding());
+        BytesInput bytesIn = 
parentColumnReader.parentReader.getCodecFactoryExposer()
+            .decompress( //
+                
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+                pageHeader.getUncompressed_page_size(), //
+                parentColumnReader.columnChunkMetaData.getCodec());
+        DictionaryPage page = new DictionaryPage(
+            bytesIn,
+            pageHeader.uncompressed_page_size,
+            pageHeader.dictionary_page_header.num_values,
+            
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
         );
         this.dictionary = 
page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page);
       }
@@ -157,13 +173,16 @@ final class PageReadStatus {
         valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
         definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, 0);
         readPosInBytes = definitionLevels.getNextOffset();
-        valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        if (parentColumnReader.columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+          valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        }
       } else {
         definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
         definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, 0);
         readPosInBytes = definitionLevels.getNextOffset();
         valueReader = new DictionaryValuesReader(dictionary);
         valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        this.parentColumnReader.usingDictionary = true;
       }
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
new file mode 100644
index 0000000..c0720a9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ParquetFixedWidthDictionaryReader extends ColumnReader{
+
+  ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, ValueVector v,
+                                    SchemaElement schemaElement) throws 
ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+  }
+
+  @Override
+  public void readField(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) {
+
+    recordsReadInThisIteration = 
Math.min(pageReadStatus.currentPage.getValueCount()
+        - pageReadStatus.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+    int defLevel;
+    for (int i = 0; i < recordsReadInThisIteration; i++){
+      defLevel = pageReadStatus.definitionLevels.readInteger();
+      // if the value is defined
+      if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
+        if (columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.INT64)
+          ((BigIntVector)valueVec).getMutator().set(i + 
valuesReadInCurrentPass,
+              pageReadStatus.valueReader.readLong() );
+      }
+      // otherwise the value is skipped, because the bit vector indicating 
nullability is zero filled
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 9cdd205..0996620 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -28,8 +28,6 @@ import com.google.common.base.Preconditions;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -59,8 +57,6 @@ import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
 import parquet.column.Encoding;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.PrimitiveType;
@@ -325,9 +321,15 @@ public class ParquetRecordReader implements RecordReader {
         } else if (length <= 16) {
           columnStatuses.add(new Decimal38Reader(this, allocateSize, 
descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
         }
-      } else {
-        columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, 
descriptor, columnChunkMetaData,
-            fixedLength, v, schemaElement));
+      }
+      else{
+        if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+          columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, 
allocateSize, descriptor, columnChunkMetaData,
+              fixedLength, v, schemaElement));
+        } else {
+          columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, 
descriptor, columnChunkMetaData,
+              fixedLength, v, schemaElement));
+        }
       }
       return true;
     }
@@ -343,8 +345,8 @@ public class ParquetRecordReader implements RecordReader {
           columnStatuses.add(new NullableDecimal38Reader(this, allocateSize, 
descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
         }
       } else {
-        columnStatuses.add(new NullableFixedByteAlignedReader(this, 
allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v, schemaElement));
+        
columnStatuses.add(NullableFixedByteAlignedReaders.getNullableColumnReader(this,
 allocateSize, descriptor,
+            columnChunkMetaData, fixedLength, v, schemaElement));
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index c217e80..91719e7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -97,7 +97,6 @@ public class VarLenBinaryReader {
         }
         if (columnReader.pageReadStatus.currentPage == null
             || columnReader.pageReadStatus.valuesRead == 
columnReader.pageReadStatus.currentPage.getValueCount()) {
-          columnReader.totalValuesRead += 
columnReader.pageReadStatus.valuesRead;
           if (!columnReader.pageReadStatus.next()) {
             rowGroupFinished = true;
             break;
@@ -159,6 +158,7 @@ public class VarLenBinaryReader {
         columnReader.pageReadStatus.valuesRead++;
         columnReader.valuesReadInCurrentPass++;
         if ( columnReader.pageReadStatus.valuesRead == 
columnReader.pageReadStatus.currentPage.getValueCount()) {
+          columnReader.totalValuesRead += 
columnReader.pageReadStatus.valuesRead;
           columnReader.pageReadStatus.next();
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
index bbc669d..fe9e574 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
@@ -40,7 +40,6 @@ public class VarLengthColumnReaders {
 
   public static abstract class VarLengthColumn<V extends ValueVector> extends 
ColumnReader {
 
-    boolean usingDictionary;
     Binary currDictVal;
 
     VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
@@ -70,7 +69,6 @@ public class VarLengthColumnReaders {
 
     int nullsRead;
     boolean currentValNull = false;
-    boolean usingDictionary;
     Binary currDictVal;
 
     NullableVarLengthColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index e594441..dec4b15 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import static org.apache.drill.exec.store.parquet.TestFileGenerator.intVals;
 import static 
org.apache.drill.exec.store.parquet.TestFileGenerator.populateFieldInfoMap;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -291,7 +292,8 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     testParquetFullEngineEventBased(false, 
"/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, 
props);
     fields.clear();
     // pass strings instead of byte arrays
-    Object[] boolVals2 = { "b", "b2", "b3"};
+    Object[] boolVals2 = { new org.apache.hadoop.io.Text("b"), new 
org.apache.hadoop.io.Text("b2"),
+        new org.apache.hadoop.io.Text("b3")};
     props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, 
TypeProtos.MinorType.BIT, props));
     testParquetFullEngineEventBased(false, 
"/parquet/parquet_scan_screen_read_entry_replace.json",
         "\"/tmp/varLen.parquet/a\"", "unused", 1, props);
@@ -301,12 +303,27 @@ public class ParquetRecordReaderTest extends 
BaseTestQuery{
   @Test
   public void testDictionaryEncoding() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(1, 300000, 
DEFAULT_BYTES_PER_PAGE, fields);
-    Object[] boolVals2 = { "b", "b2", "b3"};
-    props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, 
TypeProtos.MinorType.BIT, props));
-    // test dictionary encoding
-    testParquetFullEngineEventBased(false, 
"/parquet/parquet_scan_screen_read_entry_replace.json",
-        "\"/tmp/dictionary_pig.parquet/a\"", "unused", 1, props);
+    ParquetTestProperties props = new ParquetTestProperties(1, 25, 
DEFAULT_BYTES_PER_PAGE, fields);
+    Object[] boolVals = null;
+    props.fields.put("n_name", null);
+    props.fields.put("n_nationkey", null);
+    props.fields.put("n_regionkey", null);
+    props.fields.put("n_comment", null);
+    testParquetFullEngineEventBased(false, false, 
"/parquet/parquet_scan_screen_read_entry_replace.json",
+        "\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, true);
+
+    fields = new HashMap<>();
+    props = new ParquetTestProperties(1, 5, DEFAULT_BYTES_PER_PAGE, fields);
+    props.fields.put("employee_id", null);
+    props.fields.put("name", null);
+    props.fields.put("role", null);
+    props.fields.put("phone", null);
+    props.fields.put("password_hash", null);
+    props.fields.put("gender_male", null);
+    props.fields.put("height", null);
+    props.fields.put("hair_thickness", null);
+    testParquetFullEngineEventBased(false, false, 
"/parquet/parquet_scan_screen_read_entry_replace.json",
+        "\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, true);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28dd76af/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 04197bc..a4ccbcc 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -113,7 +113,7 @@ public class ParquetResultListener implements 
UserResultsListener {
       ValueVector vv = vw.getValueVector();
       currentField = 
props.fields.get(vv.getField().getAsSchemaPath().getRootSegment().getPath());
       if (ParquetRecordReaderTest.VERBOSE_DEBUG){
-        System.out.println("\n" + (String) currentField.name);
+        System.out.println("\n" + 
vv.getField().getAsSchemaPath().getRootSegment().getPath());
       }
       if ( ! 
valuesChecked.containsKey(vv.getField().getAsSchemaPath().getRootSegment().getPath())){
         
valuesChecked.put(vv.getField().getAsSchemaPath().getRootSegment().getPath(), 
0);

Reply via email to