DRILL-982: Return nulls for non-existent columns in parquet reader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fb93576a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fb93576a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fb93576a Branch: refs/heads/master Commit: fb93576a34367f38b9d040b36262bd20d88fbd35 Parents: c3e97fd Author: Jason Altekruse <[email protected]> Authored: Fri Aug 15 09:31:19 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Aug 24 12:09:34 2014 -0700 ---------------------------------------------------------------------- .../columnreaders/ParquetRecordReader.java | 69 +++++++++++++++----- .../store/parquet/ParquetRecordReaderTest.java | 24 +++++++ 2 files changed, 78 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb93576a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 4e9ac81..34e7aea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -28,6 +28,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; @@ -35,6 +36,7 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.hadoop.fs.FileSystem; @@ -85,6 +87,18 @@ public class ParquetRecordReader implements RecordReader { private List<SchemaPath> columns; private FragmentContext fragmentContext; private OperatorContext operatorContext; + // This is a parallel list to the columns list above, it is used to determine the subset of the project + // pushdown columns that do not appear in this file + private boolean[] columnsFound; + // For columns not found in the file, we need to return a schema element with the correct number of values + // at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors + // that need only have their value count set at the end of each call to next(), as the values default to null. + private List<NullableBitVector> nullFilledVectors; + // Keeps track of the number of records returned in the case where only columns outside of the file were selected. + // No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of + // records specified in the row group metadata + long mockRecordsRead; + private final CodecFactoryExposer codecFactoryExposer; int rowGroupIndex; @@ -110,8 +124,10 @@ public class ParquetRecordReader implements RecordReader { this.batchSize = batchSize; this.footer = footer; this.columns = columns; - this.fragmentContext=fragmentContext; - + if (this.columns != null) { + columnsFound = new boolean[this.columns.size()]; + nullFilledVectors = new ArrayList(); + } } public CodecFactoryExposer getCodecFactoryExposer() { @@ -169,10 +185,13 @@ public class ParquetRecordReader implements RecordReader { // for now it makes the existing tests pass, simply selecting // all available data if no columns are provided if (this.columns != null){ + int i = 0; for (SchemaPath expr : this.columns){ if ( field.matches(expr)){ + columnsFound[i] = true; return true; } + i++; } return false; } @@ -189,6 +208,7 @@ public class ParquetRecordReader implements RecordReader { ColumnDescriptor column; ColumnChunkMetaData columnChunkMetaData; int columnsToScan = 0; + mockRecordsRead = 0; MaterializedField field; ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); @@ -231,11 +251,7 @@ public class ParquetRecordReader implements RecordReader { } rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); - // none of the columns in the parquet file matched the request columns from the query - if (columnsToScan == 0){ - throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file."); - } - if (allFieldsFixedLength) { + if (columnsToScan != 0 && allFieldsFixedLength) { recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535); } @@ -245,16 +261,14 @@ public class ParquetRecordReader implements RecordReader { try { ValueVector v; - ConvertedType convertedType; SchemaElement schemaElement; ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>(); // initialize all of the column read status objects - boolean fieldFixedLength = false; + boolean fieldFixedLength; for (int i = 0; i < columns.size(); ++i) { column = columns.get(i); columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i); schemaElement = schemaElements.get(column.getPath()[0]); - convertedType = schemaElement.getConverted_type(); MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement); field = MaterializedField.create(toFieldName(column.getPath()), type); // the field was not requested to be read @@ -279,6 +293,16 @@ public class ParquetRecordReader implements RecordReader { } } varLengthReader = new VarLenBinaryReader(this, varLengthColumns); + + if (this.columns != null) { + for (int i = 0; i < columnsFound.length; i++) { + if ( ! columnsFound[i]) { + nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(this.columns.get(i), Types.optional(TypeProtos.MinorType.BIT)), + (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT, DataMode.OPTIONAL))); + + } + } + } } catch (SchemaChangeException e) { throw new ExecutionSetupException(e); } catch (Exception e) { @@ -333,11 +357,18 @@ public class ParquetRecordReader implements RecordReader { firstColumnStatus = null; } } - // TODO - replace this with new functionality of returning batches even if no columns are selected - // the query 'select 5 from parquetfile' should return the number of records that the parquet file contains - // we don't need to read any of the data, we just need to fill batches with a record count and a useless vector with - // the right number of values - if (firstColumnStatus == null) throw new DrillRuntimeException("Unexpected error reading parquet file, not reading any columns"); + // No columns found in the file were selected, simply return a full batch of null records for each column requested + if (firstColumnStatus == null) { + if (mockRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount()) { + return 0; + } + recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead); + for (ValueVector vv : nullFilledVectors ) { + vv.getMutator().setValueCount( (int) recordsToRead); + } + mockRecordsRead += recordsToRead; + return (int) recordsToRead; + } if (allFieldsFixedLength) { recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead); @@ -353,6 +384,14 @@ public class ParquetRecordReader implements RecordReader { readAllFixedFields(fixedRecordsToRead); } + // if we have requested columns that were not found in the file fill their vectors with null + // (by simply setting the value counts inside of them, as they start null filled) + if (nullFilledVectors != null) { + for (ValueVector vv : nullFilledVectors ) { + vv.getMutator().setValueCount(firstColumnStatus.getRecordsReadInCurrentPass()); + } + } + return firstColumnStatus.getRecordsReadInCurrentPass(); } catch (IOException e) { throw new DrillRuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb93576a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 2193233..cbae6ba 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -37,6 +37,7 @@ import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.FileUtils; +import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.memory.BufferAllocator; @@ -58,8 +59,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.BeforeClass; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestRule; import parquet.bytes.BytesInput; import parquet.column.page.Page; import parquet.column.page.PageReadStore; @@ -139,6 +142,27 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ } @Test + public void testNonExistentColumn() throws Exception { + testFull(QueryType.SQL, "select non_existent_column from cp.`tpch/nation.parquet`", "", 1, 1, 150000, false); + } + + + @Test + public void testNonExistentColumnLargeFile() throws Exception { + testFull(QueryType.SQL, "select non_existent_column, non_existent_col_2 from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, false); + } + + @Test + public void testNonExistentColumnsSomePresentColumnsLargeFile() throws Exception { + testFull(QueryType.SQL, "select cust_key, address, non_existent_column, non_existent_col_2 from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, false); + } + + @Test + public void testTPCHPerformace_SF1() throws Exception { + testFull(QueryType.SQL, "select * from dfs.`/tmp/orders_part-m-00001.parquet`", "", 1, 1, 150000, false); + } + + @Test public void testLocalDistributed() throws Exception { String planName = "/parquet/parquet_scan_union_screen_physical.json"; testParquetFullEngineLocalTextDistributed(planName, fileName, 1, numberRowGroups, recordsPerRowGroup);
