DRILL-827: Fix bug in reading dictionary encoded columns in parquet.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/10127846 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/10127846 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/10127846 Branch: refs/heads/master Commit: 1012784682a3e9ed27076bfa33b8c4cb2361a542 Parents: 602f817 Author: Jason Altekruse <[email protected]> Authored: Fri May 23 11:14:52 2014 -0500 Committer: Jacques Nadeau <[email protected]> Committed: Wed May 28 15:13:00 2014 -0700 ---------------------------------------------------------------------- .../exec/store/parquet/PageReadStatus.java | 2 +- .../exec/store/parquet/ParquetRecordReader.java | 2 +- .../exec/store/parquet/VarLenBinaryReader.java | 7 ++++- .../store/parquet/ParquetRecordReaderTest.java | 27 +++++++++++++++----- .../store/parquet/ParquetResultListener.java | 16 +++++++----- 5 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java index e4081d9..ba98f3c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java @@ -167,10 +167,10 @@ final class PageReadStatus { parentColumnReader.currDefLevel = -1; if (!currentPage.getValueEncoding().usesDictionary()) { definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); readPosInBytes = definitionLevels.getNextOffset(); if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); } } else { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 0996620..6754855 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -248,7 +248,7 @@ public class ParquetRecordReader implements RecordReader { boolean fieldFixedLength = false; for (int i = 0; i < columns.size(); ++i) { column = columns.get(i); - columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i); + columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i); schemaElement = schemaElements.get(column.getPath()[0]); convertedType = schemaElement.getConverted_type(); MajorType type = toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java index 4efcdaf..2575c4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java @@ -100,6 +100,8 @@ public class VarLenBinaryReader { if (!columnReader.pageReadStatus.next()) { rowGroupFinished = true; break; + } else { + columnReader.currDictVal = null; } } bytes = columnReader.pageReadStatus.pageDataByteArray; @@ -118,7 +120,9 @@ public class VarLenBinaryReader { } if (columnReader.usingDictionary) { - columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes(); + if (columnReader.currDictVal == null) { + columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes(); + } // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division columnReader.dataTypeLengthInBits = columnReader.currDictVal.length(); } @@ -169,6 +173,7 @@ public class VarLenBinaryReader { columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead; columnReader.pageReadStatus.next(); } + columnReader.currDictVal = null; } recordsReadInCurrentPass++; } while (recordsReadInCurrentPass < recordsToReadInThisPass); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 222508c..82436a3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -117,7 +117,17 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ readEntries += ","; } String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries); - testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup); + testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup, true); + } + + @Test + @Ignore + public void testDictionaryError() throws Exception { + String readEntries; + readEntries = "\"/tmp/lineitem_null_dict.parquet\""; + + String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries); + testParquetFullEngineLocalText(planText, fileName, 1, 1, 100000, false); } @Test @@ -135,21 +145,24 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ public void testParquetFullEngineLocalPath(String planFileName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ - testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName), Charsets.UTF_8), filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup); + testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName), Charsets.UTF_8), filename, + numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, true); } //specific tests should call this method, but it is not marked as a test itself intentionally - public void testParquetFullEngineLocalText(String planText, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ - testFull(QueryType.LOGICAL, planText, filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup); + public void testParquetFullEngineLocalText(String planText, String filename, int numberOfTimesRead /* specified in json plan */, + int numberOfRowGroups, int recordsPerRowGroup, boolean testValues) throws Exception{ + testFull(QueryType.LOGICAL, planText, filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, testValues); } - private void testFull(QueryType type, String planText, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ + private void testFull(QueryType type, String planText, String filename, int numberOfTimesRead /* specified in json plan */, + int numberOfRowGroups, int recordsPerRowGroup, boolean testValues) throws Exception{ // RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); HashMap<String, FieldInfo> fields = new HashMap<>(); ParquetTestProperties props = new ParquetTestProperties(numberRowGroups, recordsPerRowGroup, DEFAULT_BYTES_PER_PAGE, fields); TestFileGenerator.populateFieldInfoMap(props); - ParquetResultListener resultListener = new ParquetResultListener(getAllocator(), props, numberOfTimesRead, true); + ParquetResultListener resultListener = new ParquetResultListener(getAllocator(), props, numberOfTimesRead, testValues); Stopwatch watch = new Stopwatch().start(); testWithListener(type, planText, resultListener); resultListener.getResults(); @@ -162,7 +175,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ //use this method to submit physical plan public void testParquetFullEngineLocalTextDistributed(String planName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ String planText = Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8); - testFull(QueryType.PHYSICAL, planText, filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup); + testFull(QueryType.PHYSICAL, planText, filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, true); } public String pad(String value, int length) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java index a533117..4a0efc9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java @@ -173,15 +173,17 @@ public class ParquetResultListener implements UserResultsListener { int recordsInBatch = -1; if(result.getHeader().getIsLastChunk()){ // ensure the right number of columns was returned, especially important to ensure selective column read is working - //assert valuesChecked.keySet().size() == props.fields.keySet().size() : "Unexpected number of output columns from parquet scan,"; + if (testValues) { + assertEquals( "Unexpected number of output columns from parquet scan.", valuesChecked.keySet().size(), props.fields.keySet().size() ); + } for (String s : valuesChecked.keySet()) { try { - if (recordsInBatch == -1 ){ - recordsInBatch = valuesChecked.get(s); - } else { - assertEquals("Mismatched record counts in vectors.", recordsInBatch, valuesChecked.get(s).intValue()); - } - //assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s)); + if (recordsInBatch == -1 ){ + recordsInBatch = valuesChecked.get(s); + } else { + assertEquals("Mismatched record counts in vectors.", recordsInBatch, valuesChecked.get(s).intValue()); + } + assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s)); } catch (AssertionError e) { submissionFailed(new RpcException(e)); } }
