Drill 419 - enable dictionary encoding in parquet reader.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b8731b68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b8731b68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b8731b68

Branch: refs/heads/master
Commit: b8731b68bdc23d5f7766f45c47ca3b7257789df7
Parents: f071aca
Author: Jason Altekruse <[email protected]>
Authored: Mon Mar 31 10:57:48 2014 -0700
Committer: Steven Phillips <[email protected]>
Committed: Mon May 5 18:53:25 2014 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/ColumnDataReader.java    |  10 +-
 .../drill/exec/store/parquet/ColumnReader.java  |   5 +
 .../store/parquet/NullableColumnReader.java     |   3 +-
 .../exec/store/parquet/PageReadStatus.java      |  68 ++++-
 .../exec/store/parquet/ParquetRecordReader.java |  29 ++-
 .../exec/store/parquet/VarLenBinaryReader.java  | 184 +-------------
 .../store/parquet/VarLengthColumnReaders.java   | 250 +++++++++++++++++++
 .../store/parquet/ParquetRecordReaderTest.java  |  15 +-
 8 files changed, 364 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index a890f1c..8c6f120 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -32,16 +32,20 @@ class ColumnDataReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
   
   private final long endPosition;
-  private final FSDataInputStream input;
+  public final FSDataInputStream input;
   
-  public ColumnDataReader(FileSystem fs, Path path, long start, long length) 
throws IOException{
-    this.input = fs.open(path, 64 * 1024);
+  public ColumnDataReader(FSDataInputStream input, long start, long length) 
throws IOException{
+    this.input = input;
     this.input.seek(start);
     this.endPosition = start + length;
   }
   
   public PageHeader readPageHeader() throws IOException{
+    try{
     return Util.readPageHeader(input);
+    }catch (IOException e) {
+      throw e;
+    }
   }
   
   public BytesInput getPageAsBytesInput(int pageLength) throws IOException{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 97ecfb8..196e1fd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -36,6 +36,11 @@ abstract class ColumnReader<V extends ValueVector> {
   
   // Value Vector for this column
   final V valueVec;
+
+  ColumnDescriptor getColumnDescriptor() {
+    return columnDescriptor;
+  }
+
   // column description from the parquet library
   final ColumnDescriptor columnDescriptor;
   // metadata of the column, from the parquet library

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 8faf756..66d1c5f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -73,7 +73,8 @@ abstract class NullableColumnReader extends ColumnReader{
         lastValueWasNull = true;
         nullsFound = 0;
         if (currentValueIndexInVector - totalValuesRead == 
recordsToReadInThisPass
-            || currentValueIndexInVector >= valueVec.getValueCapacity()){
+            || currentValueIndexInVector >= valueVec.getValueCapacity()
+            || pageReadStatus.readPosInBytes >= pageReadStatus.byteLength){
           break;
         }
         while(currentValueIndexInVector - totalValuesRead < 
recordsToReadInThisPass

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index fe83159..021b622 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -18,16 +18,28 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.proto.SchemaDefProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.bytes.BytesInput;
+import parquet.column.Dictionary;
+import parquet.column.Encoding;
 import parquet.column.ValuesType;
+import parquet.column.page.DictionaryPage;
 import parquet.column.page.Page;
 import parquet.column.values.ValuesReader;
+import parquet.column.values.dictionary.DictionaryValuesReader;
 import parquet.format.PageHeader;
+import parquet.format.PageType;
+import parquet.format.Util;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 // class to keep track of the read position of variable length columns
@@ -52,15 +64,27 @@ final class PageReadStatus {
   //int rowGroupIndex;
   ValuesReader definitionLevels;
   ValuesReader valueReader;
+  Dictionary dictionary;
 
   PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, 
ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
 
     long totalByteLength = columnChunkMetaData.getTotalSize();
     long start = columnChunkMetaData.getFirstDataPageOffset();
-
-    try{
-      this.dataReader = new ColumnDataReader(fs, path, start, totalByteLength);
+    try {
+      FSDataInputStream f = fs.open(path);
+      if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+        f.seek(columnChunkMetaData.getDictionaryPageOffset());
+        PageHeader pageHeader = Util.readPageHeader(f);
+        assert pageHeader.type == PageType.DICTIONARY_PAGE;
+        DictionaryPage page = new 
DictionaryPage(BytesInput.copy(BytesInput.from(f, 
pageHeader.compressed_page_size)),
+            pageHeader.uncompressed_page_size,
+            pageHeader.dictionary_page_header.num_values,
+            
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+        );
+        this.dictionary = 
page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+      }
+      this.dataReader = new ColumnDataReader(f, start, totalByteLength);
     } catch (IOException e) {
       throw new ExecutionSetupException("Error opening or reading metatdata 
for parquet file at location: " + path.getName(), e);
     }
@@ -78,10 +102,25 @@ final class PageReadStatus {
 
     currentPage = null;
 
-    if(!dataReader.hasRemainder()) return false;
+    if(!dataReader.hasRemainder()) {
+      return false;
+    }
 
     // next, we need to decompress the bytes
-    PageHeader pageHeader = dataReader.readPageHeader();
+    PageHeader pageHeader = null;
+    // TODO - figure out if we need multiple dictionary pages, I believe it 
may be limited to one
+    // I think we are clobbering parts of the dictionary if there can be 
multiple pages of dictionary
+    do {
+      pageHeader = dataReader.readPageHeader();
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+        DictionaryPage page = new 
DictionaryPage(BytesInput.copy(BytesInput.from(dataReader.input, 
pageHeader.compressed_page_size)),
+                pageHeader.uncompressed_page_size,
+                pageHeader.dictionary_page_header.num_values,
+                
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+        );
+        this.dictionary = 
page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page);
+      }
+    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
     BytesInput bytesIn = 
parentColumnReader.parentReader.getCodecFactoryExposer()
         .decompress( //
@@ -113,13 +152,20 @@ final class PageReadStatus {
     readPosInBytes = 0;
     valuesRead = 0;
     if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
-      definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
-      valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
-      definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, 0);
-      readPosInBytes = definitionLevels.getNextOffset();
-      valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, 
(int) readPosInBytes);
+      if (!currentPage.getValueEncoding().usesDictionary()) {
+        definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
+        valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
+        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, 0);
+        readPosInBytes = definitionLevels.getNextOffset();
+        valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+      } else {
+        definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
+        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, 0);
+        readPosInBytes = definitionLevels.getNextOffset();
+        valueReader = new DictionaryValuesReader(dictionary);
+        valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+      }
     }
-
     return true;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 463f3ed..75cd799 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -54,13 +55,18 @@ import parquet.format.SchemaElement;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
+import parquet.column.Encoding;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.PrimitiveType;
 
+import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.*;
+
 import com.google.common.base.Joiner;
 
-class ParquetRecordReader implements RecordReader {
+public class ParquetRecordReader implements RecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   // this value has been inflated to read in multiple value vectors at once, 
and then break them up into smaller vectors
@@ -241,8 +247,8 @@ class ParquetRecordReader implements RecordReader {
     try {
       ValueVector v;
       ConvertedType convertedType;
-      ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new 
ArrayList<>();
-      ArrayList<VarLenBinaryReader.NullableVarLengthColumn> 
nullableVarLengthColumns = new ArrayList<>();
+      ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
+      ArrayList<NullableVarLengthColumn> nullableVarLengthColumns = new 
ArrayList<>();
       // initialize all of the column read status objects
       boolean fieldFixedLength = false;
       for (int i = 0; i < columns.size(); ++i) {
@@ -250,6 +256,7 @@ class ParquetRecordReader implements RecordReader {
         columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
         convertedType = convertedTypes.get(column.getPath()[0]);
         MajorType type = toMajorType(column.getType(), getDataMode(column), 
convertedType);
+//        
Preconditions.checkArgument(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY),
 "Dictionary Encoding not currently supported");
         field = MaterializedField.create(toFieldName(column.getPath()), type);
         // the field was not requested to be read
         if ( ! fieldSelected(field)) continue;
@@ -264,20 +271,20 @@ class ParquetRecordReader implements RecordReader {
           if (column.getMaxDefinitionLevel() == 0){// column is required
             if (convertedType == ConvertedType.UTF8) {
               varLengthColumns.add(
-                new VarLenBinaryReader.VarCharColumn(this, -1, column, 
columnChunkMetaData, false, (VarCharVector) v, convertedType));
+                new VarCharColumn(this, -1, column, columnChunkMetaData, 
false, (VarCharVector) v, convertedType));
             } else {
               varLengthColumns.add(
-                  new VarLenBinaryReader.VarBinaryColumn(this, -1, column, 
columnChunkMetaData, false, (VarBinaryVector) v, convertedType));
+                  new VarBinaryColumn(this, -1, column, columnChunkMetaData, 
false, (VarBinaryVector) v, convertedType));
             }
           }
           else{
             if (convertedType == ConvertedType.UTF8) {
               nullableVarLengthColumns.add(
-                new VarLenBinaryReader.NullableVarCharColumn(this, -1, column, 
columnChunkMetaData, false,
+                new NullableVarCharColumn(this, -1, column, 
columnChunkMetaData, false,
                     (NullableVarCharVector) v, convertedType));
             } else {
               nullableVarLengthColumns.add(
-                new VarLenBinaryReader.NullableVarBinaryColumn(this, -1, 
column, columnChunkMetaData, false,
+                new NullableVarBinaryColumn(this, -1, column, 
columnChunkMetaData, false,
                   (NullableVarBinaryVector) v, convertedType));
             }
           }
@@ -314,11 +321,11 @@ class ParquetRecordReader implements RecordReader {
       AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5);
       column.valuesReadInCurrentPass = 0;
     }
-    for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+    for (VarLengthColumn r : varLengthReader.columns){
       AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
-    for (VarLenBinaryReader.NullableVarLengthColumn r : 
varLengthReader.nullableColumns){
+    for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
       AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
@@ -535,10 +542,10 @@ class ParquetRecordReader implements RecordReader {
     }
     columnStatuses.clear();
 
-    for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+    for (VarLengthColumn r : varLengthReader.columns){
       r.clear();
     }
-    for (VarLenBinaryReader.NullableVarLengthColumn r : 
varLengthReader.nullableColumns){
+    for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
       r.clear();
     }
     varLengthReader.columns.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index ae01104..c217e80 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -17,19 +17,10 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.*;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.*;
 import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 
 public class VarLenBinaryReader {
@@ -45,164 +36,6 @@ public class VarLenBinaryReader {
     this.columns = columns;
   }
 
-  public static abstract class VarLengthColumn<V extends ValueVector> extends 
ColumnReader {
-
-    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
-                    ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, V v,
-                    ConvertedType convertedType) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-
-    public abstract boolean setSafe(int index, byte[] bytes, int start, int 
length);
-
-    public abstract int capacity();
-
-  }
-
-  public static abstract class NullableVarLengthColumn<V extends ValueVector> 
extends ColumnReader {
-
-    int nullsRead;
-    boolean currentValNull = false;
-
-    NullableVarLengthColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
-                            ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, V v,
-                            ConvertedType convertedType ) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-    }
-
-    public abstract boolean setSafe(int index, byte[] value, int start, int 
length);
-
-    public abstract int capacity();
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
-
-    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
-    protected VarCharVector varCharVector;
-
-    VarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
-                    ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, VarCharVector v,
-                    ConvertedType convertedType) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-      varCharVector = v;
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      return varCharVector.getMutator().setSafe(valuesReadInCurrentPass, bytes,
-          (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits);
-    }
-
-    @Override
-    public int capacity() {
-      return varCharVector.getData().capacity();
-    }
-  }
-
-  public static class NullableVarCharColumn extends NullableVarLengthColumn 
<NullableVarCharVector> {
-
-    int nullsRead;
-    boolean currentValNull = false;
-    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
-    protected NullableVarCharVector nullableVarCharVector;
-
-    NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
-                            ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarCharVector v,
-                            ConvertedType convertedType ) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-      nullableVarCharVector = v;
-    }
-
-    public boolean setSafe(int index, byte[] value, int start, int length) {
-      return nullableVarCharVector.getMutator().setSafe(index, value,
-          start, length);
-    }
-
-    @Override
-    public int capacity() {
-      return nullableVarCharVector.getData().capacity();
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  public static class VarBinaryColumn extends VarLengthColumn 
<VarBinaryVector> {
-
-    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
-    protected VarBinaryVector varBinaryVector;
-
-    VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
-                  ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, VarBinaryVector v,
-                  ConvertedType convertedType) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-      varBinaryVector = v;
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      return varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, 
bytes,
-          (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits);
-    }
-
-    @Override
-    public int capacity() {
-      return varBinaryVector.getData().capacity();
-    }
-  }
-
-  public static class NullableVarBinaryColumn extends NullableVarLengthColumn 
<NullableVarBinaryVector> {
-
-    int nullsRead;
-    boolean currentValNull = false;
-    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
-    protected NullableVarBinaryVector nullableVarBinaryVector;
-
-    NullableVarBinaryColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
-                          ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarBinaryVector v,
-                          ConvertedType convertedType ) throws 
ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
-      nullableVarBinaryVector = v;
-    }
-
-    public boolean setSafe(int index, byte[] value, int start, int length) {
-      return nullableVarBinaryVector.getMutator().setSafe(index, value,
-          start, length);
-    }
-
-    @Override
-    public int capacity() {
-      return nullableVarBinaryVector.getData().capacity();
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   /**
    * Reads as many variable length values as possible.
    *
@@ -278,9 +111,16 @@ public class VarLenBinaryReader {
           continue;// field is null, no length to add to data vector
         }
 
-        // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
-        columnReader.dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(bytes,
-            (int) columnReader.pageReadStatus.readPosInBytes);
+        if (columnReader.usingDictionary) {
+          columnReader.currDictVal = 
columnReader.pageReadStatus.valueReader.readBytes();
+          // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
+          columnReader.dataTypeLengthInBits = 
columnReader.currDictVal.length();
+        }
+        else {
+          // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
+          columnReader.dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(bytes,
+              (int) columnReader.pageReadStatus.readPosInBytes);
+        }
         lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
 
         if (columnReader.bytesReadInCurrentPass + 
columnReader.dataTypeLengthInBits > columnReader.capacity()) {
@@ -308,7 +148,7 @@ public class VarLenBinaryReader {
         // again, I am re-purposing the unused field here, it is a length n 
BYTES, not bits
         if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits 
> 0){
           boolean success = 
columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
-              (int) columnReader.pageReadStatus.readPosInBytes + 4, 
columnReader.dataTypeLengthInBits);
+                (int) columnReader.pageReadStatus.readPosInBytes + 4, 
columnReader.dataTypeLengthInBits);
           assert success;
         }
         columnReader.currentValNull = false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
new file mode 100644
index 0000000..7e9d770
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
@@ -0,0 +1,250 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.store.parquet.ColumnReader;
+import org.apache.drill.exec.store.parquet.ParquetRecordReader;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.column.Encoding;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
+
+public class VarLengthColumnReaders {
+
+  public static abstract class VarLengthColumn<V extends ValueVector> extends 
ColumnReader {
+
+    boolean usingDictionary;
+    Binary currDictVal;
+
+    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, V v,
+                    ConvertedType convertedType) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+
+    public abstract boolean setSafe(int index, byte[] bytes, int start, int 
length);
+
+    public abstract int capacity();
+
+  }
+
+  public static abstract class NullableVarLengthColumn<V extends ValueVector> 
extends ColumnReader {
+
+    int nullsRead;
+    boolean currentValNull = false;
+    boolean usingDictionary;
+    Binary currDictVal;
+
+    NullableVarLengthColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, V v,
+                            ConvertedType convertedType ) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+    }
+
+    public abstract boolean setSafe(int index, byte[] value, int start, int 
length);
+
+    public abstract int capacity();
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
+
+    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
+    protected VarCharVector varCharVector;
+
+    VarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                  ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, VarCharVector v,
+                  ConvertedType convertedType) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+        usingDictionary = true;
+      }
+      else {
+        usingDictionary = false;
+      }
+      varCharVector = v;
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      boolean success;
+      if (usingDictionary) {
+        success = varCharVector.getMutator().setSafe(valuesReadInCurrentPass, 
currDictVal.getBytes(),
+            0, currDictVal.length());
+      }
+      else {
+        success = varCharVector.getMutator().setSafe(index, bytes, start, 
length);
+      }
+      return success;
+    }
+
+    @Override
+    public int capacity() {
+      return varCharVector.getData().capacity();
+    }
+  }
+
+  public static class NullableVarCharColumn extends NullableVarLengthColumn 
<NullableVarCharVector> {
+
+    int nullsRead;
+    boolean currentValNull = false;
+    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
+    protected NullableVarCharVector nullableVarCharVector;
+
+    NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                          ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarCharVector v,
+                          ConvertedType convertedType ) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+      nullableVarCharVector = v;
+      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+          usingDictionary = true;
+      }
+      else {
+        usingDictionary = false;
+      }
+    }
+
+    public boolean setSafe(int index, byte[] value, int start, int length) {
+      boolean success;
+      if (usingDictionary) {
+        success = 
nullableVarCharVector.getMutator().setSafe(valuesReadInCurrentPass, 
currDictVal.getBytes(),
+            0, currDictVal.length());
+      }
+      else {
+        success = nullableVarCharVector.getMutator().setSafe(index, value, 
start, length);
+      }
+      return success;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableVarCharVector.getData().capacity();
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class VarBinaryColumn extends VarLengthColumn 
<VarBinaryVector> {
+
+    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
+    protected VarBinaryVector varBinaryVector;
+
+    VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, VarBinaryVector v,
+                    ConvertedType convertedType) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+        usingDictionary = true;
+      }
+      else {
+        usingDictionary = false;
+      }
+      varBinaryVector = v;
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      boolean success;
+      if (usingDictionary) {
+        success = 
varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, 
currDictVal.getBytes(),
+            0, currDictVal.length());
+      }
+      else {
+        success = varBinaryVector.getMutator().setSafe(index, bytes, start, 
length);
+      }
+      return success;
+    }
+
+    @Override
+    public int capacity() {
+      return varBinaryVector.getData().capacity();
+    }
+  }
+
+  public static class NullableVarBinaryColumn extends NullableVarLengthColumn 
<NullableVarBinaryVector> {
+
+    int nullsRead;
+    boolean currentValNull = false;
+    // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
+    protected org.apache.drill.exec.vector.NullableVarBinaryVector 
nullableVarBinaryVector;
+
+    NullableVarBinaryColumn(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarBinaryVector v,
+                            ConvertedType convertedType ) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, convertedType);
+      nullableVarBinaryVector = v;
+      if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+        usingDictionary = true;
+      }
+      else {
+        usingDictionary = false;
+      }
+    }
+
+    public boolean setSafe(int index, byte[] value, int start, int length) {
+      boolean success;
+      if (usingDictionary) {
+        success = 
nullableVarBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, 
currDictVal.getBytes(),
+            0, currDictVal.length());
+      }
+      else {
+        success = nullableVarBinaryVector.getMutator().setSafe(index, value, 
start, length);
+      }
+      return  success;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableVarBinaryVector.getData().capacity();
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader 
firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8731b68/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 67b5394..5d2c859 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -284,14 +284,13 @@ public class ParquetRecordReaderTest extends 
BaseTestQuery{
    */
   public void testNullableColumnsVarLen() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(1, 300000, 
DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 3000000, 
DEFAULT_BYTES_PER_PAGE, fields);
     byte[] val = {'b'};
     byte[] val2 = {'b', '2'};
     byte[] val3 = {'b', '3'};
     byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
     Object[] boolVals = { val, val2, val4};
     props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, 
TypeProtos.MinorType.BIT, props));
-    //
     testParquetFullEngineEventBased(false, 
"/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, 
props);
     fields.clear();
     // pass strings instead of byte arrays
@@ -301,6 +300,18 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
         "\"/tmp/varLen.parquet/a\"", "unused", 1, props);
   }
 
+  @Ignore
+  @Test
+  public void testDictionaryEncoding() throws Exception {
+    HashMap<String, FieldInfo> fields = new HashMap<>();
+    ParquetTestProperties props = new ParquetTestProperties(1, 300000, 
DEFAULT_BYTES_PER_PAGE, fields);
+    Object[] boolVals2 = { "b", "b2", "b3"};
+    props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, 
TypeProtos.MinorType.BIT, props));
+    // test dictionary encoding
+    testParquetFullEngineEventBased(false, 
"/parquet/parquet_scan_screen_read_entry_replace.json",
+        "\"/tmp/dictionary_pig.parquet/a\"", "unused", 1, props);
+  }
+
   @Test
   public void testMultipleRowGroupsAndReads() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();

Reply via email to