DRILL-6016: Fix for Error reading INT96 created by Apache Spark closes #1166
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/127e4150 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/127e4150 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/127e4150 Branch: refs/heads/master Commit: 127e4150b9495c465f8c37a534dfd50512013765 Parents: 67669a0 Author: Rahul Raj <rajra...@gmail.com> Authored: Wed Mar 14 12:05:45 2018 +0530 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Fri Apr 6 12:01:04 2018 +0300 ---------------------------------------------------------------------- .../columnreaders/ColumnReaderFactory.java | 7 +++- .../ParquetFixedWidthDictionaryReaders.java | 27 +++++++++++++ .../physical/impl/writer/TestParquetWriter.java | 40 ++++++++++++------- ...ark-generated-int96-timestamp.snappy.parquet | Bin 0 -> 2896 bytes .../testInt96DictChange/q1.tsv | 12 ------ 5 files changed, 59 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 09cdc5d..ba5f1de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -156,8 +156,13 @@ public class ColumnReaderFactory { case DOUBLE: return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement); case FIXED_LEN_BYTE_ARRAY: - case INT96: return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + case INT96: + if (recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement); + } else { + return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + } default: throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() ); } http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index 5fbac20..5033046 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -34,6 +34,8 @@ import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; +import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; + public class ParquetFixedWidthDictionaryReaders { static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> { @@ -294,6 +296,31 @@ public class ParquetFixedWidthDictionaryReaders { } } + static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> { + DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + try { + Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes(); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true)); + } catch ( Exception ex) { + throw ex; + } + } + } + } + static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> { DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index e3fc833..c359e69 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -39,7 +39,6 @@ import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.categories.UnlikelyTest; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.fn.interp.TestConstantFolding; @@ -780,17 +779,31 @@ public class TestParquetWriter extends BaseTestQuery { Test the reading of a binary field as drill timestamp where data is in dictionary _and_ non-dictionary encoded pages */ @Test - @Ignore("relies on particular time zone, works for UTC") public void testImpalaParquetBinaryAsTimeStamp_DictChange() throws Exception { try { testBuilder() - .sqlQuery("select int96_ts from dfs.`parquet/int96_dict_change` order by int96_ts") + .sqlQuery("select min(int96_ts) date_value from dfs.`parquet/int96_dict_change`") .optionSettingQueriesForTestQuery( "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) .ordered() - .csvBaselineFile("testframework/testParquetReader/testInt96DictChange/q1.tsv") - .baselineTypes(TypeProtos.MinorType.TIMESTAMP) - .baselineColumns("int96_ts") + .baselineColumns("date_value") + .baselineValues(new DateTime(convertToLocalTimestamp("1970-01-01 00:00:01.000"))) + .build().run(); + } finally { + resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + } + } + + @Test + public void testSparkParquetBinaryAsTimeStamp_DictChange() throws Exception { + try { + testBuilder() + .sqlQuery("select distinct run_date from cp.`parquet/spark-generated-int96-timestamp.snappy.parquet`") + .optionSettingQueriesForTestQuery( + "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) + .ordered() + .baselineColumns("run_date") + .baselineValues(new DateTime(convertToLocalTimestamp("2017-12-06 16:38:43.988"))) .build().run(); } finally { resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); @@ -842,16 +855,15 @@ public class TestParquetWriter extends BaseTestQuery { Test the conversion from int96 to impala timestamp with hive data including nulls. Validate against expected values */ @Test - @Ignore("relies on particular time zone") public void testHiveParquetTimestampAsInt96_basic() throws Exception { testBuilder() - .unOrdered() - .sqlQuery("SELECT cast(convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as varchar(19)) as timestamp_field " - + "from cp.`parquet/part1/hive_all_types.parquet` ") - .baselineColumns("timestamp_field") - .baselineValues("2013-07-05 17:01:00") - .baselineValues((Object)null) - .go(); + .unOrdered() + .sqlQuery("SELECT convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as timestamp_field " + + "from cp.`parquet/part1/hive_all_types.parquet` ") + .baselineColumns("timestamp_field") + .baselineValues(new DateTime(convertToLocalTimestamp("2013-07-06 00:01:00"))) + .baselineValues((Object)null) + .go(); } @Test http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet b/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet new file mode 100644 index 0000000..3075cec Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv deleted file mode 100644 index 91b9b01..0000000 --- a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv +++ /dev/null @@ -1,12 +0,0 @@ -1970-01-01 00:00:01.000 -1971-01-01 00:00:01.000 -1972-01-01 00:00:01.000 -1973-01-01 00:00:01.000 -1974-01-01 00:00:01.000 -2010-01-01 00:00:01.000 -2011-01-01 00:00:01.000 -2012-01-01 00:00:01.000 -2013-01-01 00:00:01.000 -2014-01-01 00:00:01.000 -2015-01-01 00:00:01.000 -2016-01-01 00:00:01.000