DRILL-816: Fix bug with reading nullable columns resulting in mismatched number 
of records in vectors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ab279517
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ab279517
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ab279517

Branch: refs/heads/master
Commit: ab2795173c5acdae7c8c443b842f502f3c482105
Parents: 28dd76a
Author: Jason Altekruse <[email protected]>
Authored: Thu May 22 12:22:01 2014 -0500
Committer: Jacques Nadeau <[email protected]>
Committed: Thu May 22 19:31:03 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/parquet/ColumnReader.java  |  4 ++++
 .../store/parquet/NullableColumnReader.java     |  7 +++----
 .../exec/store/parquet/PageReadStatus.java      |  7 ++-----
 .../exec/store/parquet/VarLenBinaryReader.java  | 10 +++++++++-
 .../store/parquet/ParquetResultListener.java    | 21 ++++++++++++++++++--
 5 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ab279517/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 43f27a6..775fc73 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -68,6 +68,10 @@ abstract class ColumnReader<V extends ValueVector> {
   int bytesReadInCurrentPass;
 
   protected ByteBuf vectorData;
+  // when reading definition levels for nullable columns, it is a one-way 
stream of integers
+  // when reading var length data, where we don't know if all of the records 
will fit until we've read all of them
+  // we must store the last definition level an use it in at the start of the 
next batch
+  int currDefLevel;
 
   // variables for a single read pass
   long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, 
recordsReadInThisIteration = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ab279517/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 687b373..88a382a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -73,12 +73,12 @@ abstract class NullableColumnReader<V extends ValueVector> 
extends ColumnReader<
         definitionLevelsRead = 0;
         lastValueWasNull = true;
         nullsFound = 0;
-        if (currentValueIndexInVector - totalValuesRead == 
recordsToReadInThisPass
+        if (currentValueIndexInVector == recordsToReadInThisPass
             || currentValueIndexInVector >= valueVec.getValueCapacity()
             || pageReadStatus.readPosInBytes >= pageReadStatus.byteLength){
           break;
         }
-        while(currentValueIndexInVector - totalValuesRead < 
recordsToReadInThisPass
+        while(currentValueIndexInVector < recordsToReadInThisPass
             && currentValueIndexInVector < valueVec.getValueCapacity()
             && pageReadStatus.valuesRead + definitionLevelsRead < 
pageReadStatus.currentPage.getValueCount()){
           currentDefinitionLevel = 
pageReadStatus.definitionLevels.readInteger();
@@ -127,8 +127,7 @@ abstract class NullableColumnReader<V extends ValueVector> 
extends ColumnReader<
           pageReadStatus.readPosInBytes = readStartInBytes + readLength;
         }
       }
-    }
-    while (valuesReadInCurrentPass < recordsToReadInThisPass && 
pageReadStatus.currentPage != null);
+    } while (valuesReadInCurrentPass < recordsToReadInThisPass && 
pageReadStatus.currentPage != null);
     valueVec.getMutator().setValueCount(
         valuesReadInCurrentPass);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ab279517/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 20bf3e9..e4081d9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -45,6 +45,7 @@ import parquet.schema.PrimitiveType;
 
 // class to keep track of the read position of variable length columns
 final class PageReadStatus {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PageReadStatus.class);
 
   private final ColumnReader parentColumnReader;
   private final ColumnDataReader dataReader;
@@ -158,16 +159,12 @@ final class PageReadStatus {
       return false;
     }
 
-    // if the buffer holding each page's data is not large enough to hold the 
current page, re-allocate, with a little extra space
-//    if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
-//      pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 
100];
-//    }
-    // TODO - would like to get this into the mainline, hopefully before alpha
     pageDataByteArray = currentPage.getBytes().toByteArray();
 
     readPosInBytes = 0;
     valuesRead = 0;
     if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+      parentColumnReader.currDefLevel = -1;
       if (!currentPage.getValueEncoding().usesDictionary()) {
         definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
         valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ab279517/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 91719e7..4efcdaf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -103,7 +103,14 @@ public class VarLenBinaryReader {
           }
         }
         bytes = columnReader.pageReadStatus.pageDataByteArray;
-        if ( columnReader.columnDescriptor.getMaxDefinitionLevel() > 
columnReader.pageReadStatus.definitionLevels.readInteger()){
+        // we need to read all of the lengths to determine if this value will 
fit in the current vector,
+        // as we can only read each definition level once, we have to store 
the last one as we will need it
+        // at the start of the next read if we decide after reading all of the 
varlength values in this record
+        // that it will not fit in this batch
+        if ( columnReader.currDefLevel == -1 ) {
+          columnReader.currDefLevel = 
columnReader.pageReadStatus.definitionLevels.readInteger();
+        }
+        if ( columnReader.columnDescriptor.getMaxDefinitionLevel() > 
columnReader.currDefLevel){
           columnReader.currentValNull = true;
           columnReader.dataTypeLengthInBits = 0;
           columnReader.nullsRead++;
@@ -151,6 +158,7 @@ public class VarLenBinaryReader {
           assert success;
         }
         columnReader.currentValNull = false;
+        columnReader.currDefLevel = -1;
         if (columnReader.dataTypeLengthInBits > 0){
           columnReader.pageReadStatus.readPosInBytes += 
columnReader.dataTypeLengthInBits + 4;
           columnReader.bytesReadInCurrentPass += 
columnReader.dataTypeLengthInBits + 4;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ab279517/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index a4ccbcc..a533117 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -104,6 +104,9 @@ public class ParquetResultListener implements 
UserResultsListener {
       throw new RuntimeException(e);
     }
 
+    // used to make sure each vector in the batch has the same number of 
records
+    int valueCount = -1;
+
     int recordCount = 0;
     // print headers.
     if (schemaChanged) {
@@ -136,6 +139,12 @@ public class ParquetResultListener implements 
UserResultsListener {
         System.out.println("\n" + vv.getAccessor().getValueCount());
       }
       
valuesChecked.remove(vv.getField().getAsSchemaPath().getRootSegment().getPath());
+      if (valueCount == -1) {
+        valueCount = columnValCounter;
+      }
+      else {
+        assertEquals("Mismatched value count for vectors in the same batch.", 
valueCount, columnValCounter);
+      }
       
valuesChecked.put(vv.getField().getAsSchemaPath().getRootSegment().getPath(), 
columnValCounter);
     }
 
@@ -161,16 +170,24 @@ public class ParquetResultListener implements 
UserResultsListener {
       }
     }
     batchCounter++;
+    int recordsInBatch = -1;
     if(result.getHeader().getIsLastChunk()){
       // ensure the right number of columns was returned, especially important 
to ensure selective column read is working
-      assert valuesChecked.keySet().size() == props.fields.keySet().size() : 
"Unexpected number of output columns from parquet scan,";
+      //assert valuesChecked.keySet().size() == props.fields.keySet().size() : 
"Unexpected number of output columns from parquet scan,";
       for (String s : valuesChecked.keySet()) {
         try {
-          assertEquals("Record count incorrect for column: " + s, 
totalRecords, (long) valuesChecked.get(s));
+           if (recordsInBatch == -1 ){
+             recordsInBatch = valuesChecked.get(s);
+           } else {
+             assertEquals("Mismatched record counts in vectors.", 
recordsInBatch, valuesChecked.get(s).intValue());
+           }
+          //assertEquals("Record count incorrect for column: " + s, 
totalRecords, (long) valuesChecked.get(s));
         } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
       }
 
       assert valuesChecked.keySet().size() > 0;
+      batchLoader.clear();
+      result.release();
       future.set(null);
     }
     

Reply via email to