DRILL-982: Return nulls for non-existent columns in parquet reader.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fb93576a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fb93576a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fb93576a

Branch: refs/heads/master
Commit: fb93576a34367f38b9d040b36262bd20d88fbd35
Parents: c3e97fd
Author: Jason Altekruse <[email protected]>
Authored: Fri Aug 15 09:31:19 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sun Aug 24 12:09:34 2014 -0700

----------------------------------------------------------------------
 .../columnreaders/ParquetRecordReader.java      | 69 +++++++++++++++-----
 .../store/parquet/ParquetRecordReaderTest.java  | 24 +++++++
 2 files changed, 78 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb93576a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 4e9ac81..34e7aea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.hadoop.fs.FileSystem;
@@ -85,6 +87,18 @@ public class ParquetRecordReader implements RecordReader {
   private List<SchemaPath> columns;
   private FragmentContext fragmentContext;
   private OperatorContext operatorContext;
+  // This is a parallel list to the columns list above, it is used to 
determine the subset of the project
+  // pushdown columns that do not appear in this file
+  private boolean[] columnsFound;
+  // For columns not found in the file, we need to return a schema element 
with the correct number of values
+  // at that position in the schema. Currently this requires a vector be 
present. Here is a list of all of these vectors
+  // that need only have their value count set at the end of each call to 
next(), as the values default to null.
+  private List<NullableBitVector> nullFilledVectors;
+  // Keeps track of the number of records returned in the case where only 
columns outside of the file were selected.
+  // No actual data needs to be read out of the file, we only need to return 
batches until we have 'read' the number of
+  // records specified in the row group metadata
+  long mockRecordsRead;
+
   private final CodecFactoryExposer codecFactoryExposer;
   int rowGroupIndex;
 
@@ -110,8 +124,10 @@ public class ParquetRecordReader implements RecordReader {
     this.batchSize = batchSize;
     this.footer = footer;
     this.columns = columns;
-    this.fragmentContext=fragmentContext;
-
+    if (this.columns != null) {
+      columnsFound = new boolean[this.columns.size()];
+      nullFilledVectors = new ArrayList();
+    }
   }
 
   public CodecFactoryExposer getCodecFactoryExposer() {
@@ -169,10 +185,13 @@ public class ParquetRecordReader implements RecordReader {
     // for now it makes the existing tests pass, simply selecting
     // all available data if no columns are provided
     if (this.columns != null){
+      int i = 0;
       for (SchemaPath expr : this.columns){
         if ( field.matches(expr)){
+          columnsFound[i] = true;
           return true;
         }
+        i++;
       }
       return false;
     }
@@ -189,6 +208,7 @@ public class ParquetRecordReader implements RecordReader {
     ColumnDescriptor column;
     ColumnChunkMetaData columnChunkMetaData;
     int columnsToScan = 0;
+    mockRecordsRead = 0;
 
     MaterializedField field;
     ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
@@ -231,11 +251,7 @@ public class ParquetRecordReader implements RecordReader {
     }
     rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
 
-    // none of the columns in the parquet file matched the request columns 
from the query
-    if (columnsToScan == 0){
-      throw new ExecutionSetupException("Error reading from parquet file. No 
columns requested were found in the file.");
-    }
-    if (allFieldsFixedLength) {
+    if (columnsToScan != 0  && allFieldsFixedLength) {
       recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
           footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
65535);
     }
@@ -245,16 +261,14 @@ public class ParquetRecordReader implements RecordReader {
 
     try {
       ValueVector v;
-      ConvertedType convertedType;
       SchemaElement schemaElement;
       ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
       // initialize all of the column read status objects
-      boolean fieldFixedLength = false;
+      boolean fieldFixedLength;
       for (int i = 0; i < columns.size(); ++i) {
         column = columns.get(i);
         columnChunkMetaData = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
         schemaElement = schemaElements.get(column.getPath()[0]);
-        convertedType = schemaElement.getConverted_type();
         MajorType type = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), 
schemaElement.getType_length(), getDataMode(column), schemaElement);
         field = MaterializedField.create(toFieldName(column.getPath()), type);
         // the field was not requested to be read
@@ -279,6 +293,16 @@ public class ParquetRecordReader implements RecordReader {
         }
       }
       varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
+
+      if (this.columns != null) {
+        for (int i = 0; i < columnsFound.length; i++) {
+          if ( ! columnsFound[i]) {
+            
nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(this.columns.get(i),
 Types.optional(TypeProtos.MinorType.BIT)),
+                (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT, DataMode.OPTIONAL)));
+
+          }
+        }
+      }
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     } catch (Exception e) {
@@ -333,11 +357,18 @@ public class ParquetRecordReader implements RecordReader {
           firstColumnStatus = null;
         }
       }
-      // TODO - replace this with new functionality of returning batches even 
if no columns are selected
-      // the query 'select 5 from parquetfile' should return the number of 
records that the parquet file contains
-      // we don't need to read any of the data, we just need to fill batches 
with a record count and a useless vector with
-      // the right number of values
-      if (firstColumnStatus == null) throw new 
DrillRuntimeException("Unexpected error reading parquet file, not reading any 
columns");
+      // No columns found in the file were selected, simply return a full 
batch of null records for each column requested
+      if (firstColumnStatus == null) {
+        if (mockRecordsRead == 
footer.getBlocks().get(rowGroupIndex).getRowCount()) {
+          return 0;
+        }
+        recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, 
footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
+        for (ValueVector vv : nullFilledVectors ) {
+          vv.getMutator().setValueCount( (int) recordsToRead);
+        }
+        mockRecordsRead += recordsToRead;
+        return (int) recordsToRead;
+      }
 
       if (allFieldsFixedLength) {
         recordsToRead = Math.min(recordsPerBatch, 
firstColumnStatus.columnChunkMetaData.getValueCount() - 
firstColumnStatus.totalValuesRead);
@@ -353,6 +384,14 @@ public class ParquetRecordReader implements RecordReader {
         readAllFixedFields(fixedRecordsToRead);
       }
 
+      // if we have requested columns that were not found in the file fill 
their vectors with null
+      // (by simply setting the value counts inside of them, as they start 
null filled)
+      if (nullFilledVectors != null) {
+        for (ValueVector vv : nullFilledVectors ) {
+          
vv.getMutator().setValueCount(firstColumnStatus.getRecordsReadInCurrentPass());
+        }
+      }
+
       return firstColumnStatus.getRecordsReadInCurrentPass();
     } catch (IOException e) {
       throw new DrillRuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb93576a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 2193233..cbae6ba 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -58,8 +59,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.junit.rules.TestRule;
 import parquet.bytes.BytesInput;
 import parquet.column.page.Page;
 import parquet.column.page.PageReadStore;
@@ -139,6 +142,27 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   }
 
   @Test
+  public void testNonExistentColumn() throws Exception {
+    testFull(QueryType.SQL, "select non_existent_column from 
cp.`tpch/nation.parquet`", "", 1, 1, 150000, false);
+  }
+
+
+  @Test
+  public void testNonExistentColumnLargeFile() throws Exception {
+    testFull(QueryType.SQL, "select non_existent_column, non_existent_col_2 
from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, false);
+  }
+
+  @Test
+  public void testNonExistentColumnsSomePresentColumnsLargeFile() throws 
Exception {
+    testFull(QueryType.SQL, "select cust_key, address,  non_existent_column, 
non_existent_col_2 from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, 
false);
+  }
+
+  @Test
+  public void testTPCHPerformace_SF1() throws Exception {
+    testFull(QueryType.SQL, "select * from 
dfs.`/tmp/orders_part-m-00001.parquet`", "", 1, 1, 150000, false);
+  }
+
+  @Test
   public void testLocalDistributed() throws Exception {
     String planName = "/parquet/parquet_scan_union_screen_physical.json";
     testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 
numberRowGroups, recordsPerRowGroup);

Reply via email to