DRILL-6016: Fix for Error reading INT96 created by Apache Spark

closes #1166


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/127e4150
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/127e4150
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/127e4150

Branch: refs/heads/master
Commit: 127e4150b9495c465f8c37a534dfd50512013765
Parents: 67669a0
Author: Rahul Raj <rajra...@gmail.com>
Authored: Wed Mar 14 12:05:45 2018 +0530
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri Apr 6 12:01:04 2018 +0300

----------------------------------------------------------------------
 .../columnreaders/ColumnReaderFactory.java      |   7 +++-
 .../ParquetFixedWidthDictionaryReaders.java     |  27 +++++++++++++
 .../physical/impl/writer/TestParquetWriter.java |  40 ++++++++++++-------
 ...ark-generated-int96-timestamp.snappy.parquet | Bin 0 -> 2896 bytes
 .../testInt96DictChange/q1.tsv                  |  12 ------
 5 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 09cdc5d..ba5f1de 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -156,8 +156,13 @@ public class ColumnReaderFactory {
             case DOUBLE:
               return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, 
schemaElement);
             case FIXED_LEN_BYTE_ARRAY:
-            case INT96:
               return new 
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) 
v, schemaElement);
+            case INT96:
+              if 
(recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
 {
+                return new 
ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader,
 allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) 
v, schemaElement);
+              } else {
+                return new 
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) 
v, schemaElement);
+              }
             default:
               throw new ExecutionSetupException("Unsupported dictionary column 
type " + descriptor.getType().name() );
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 5fbac20..5033046 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -34,6 +34,8 @@ import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.io.api.Binary;
 
+import static 
org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
+
 public class ParquetFixedWidthDictionaryReaders {
 
   static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> {
@@ -294,6 +296,31 @@ public class ParquetFixedWidthDictionaryReaders {
     }
   }
 
+  static class DictionaryBinaryAsTimeStampReader extends 
FixedByteAlignedReader<TimeStampVector> {
+    DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
+                              ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, TimeStampVector v,
+                              SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+              - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+
+      for (int i = 0; i < recordsReadInThisIteration; i++){
+        try {
+          Binary binaryTimeStampValue = 
pageReader.dictionaryValueReader.readBytes();
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
getDateTimeValueFromBinary(binaryTimeStampValue, true));
+        } catch ( Exception ex) {
+          throw ex;
+        }
+      }
+    }
+  }
+
   static class DictionaryFloat4Reader extends 
FixedByteAlignedReader<Float4Vector> {
     DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, Float4Vector v,

http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index e3fc833..c359e69 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -39,7 +39,6 @@ import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.categories.UnlikelyTest;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
@@ -780,17 +779,31 @@ public class TestParquetWriter extends BaseTestQuery {
   Test the reading of a binary field as drill timestamp where data is in 
dictionary _and_ non-dictionary encoded pages
    */
   @Test
-  @Ignore("relies on particular time zone, works for UTC")
   public void testImpalaParquetBinaryAsTimeStamp_DictChange() throws Exception 
{
     try {
       testBuilder()
-          .sqlQuery("select int96_ts from dfs.`parquet/int96_dict_change` 
order by int96_ts")
+          .sqlQuery("select min(int96_ts) date_value from 
dfs.`parquet/int96_dict_change`")
           .optionSettingQueriesForTestQuery(
               "alter session set `%s` = true", 
ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
           .ordered()
-          
.csvBaselineFile("testframework/testParquetReader/testInt96DictChange/q1.tsv")
-          .baselineTypes(TypeProtos.MinorType.TIMESTAMP)
-          .baselineColumns("int96_ts")
+          .baselineColumns("date_value")
+          .baselineValues(new DateTime(convertToLocalTimestamp("1970-01-01 
00:00:01.000")))
+          .build().run();
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+    }
+  }
+
+  @Test
+  public void testSparkParquetBinaryAsTimeStamp_DictChange() throws Exception {
+    try {
+      testBuilder()
+          .sqlQuery("select distinct run_date from 
cp.`parquet/spark-generated-int96-timestamp.snappy.parquet`")
+          .optionSettingQueriesForTestQuery(
+               "alter session set `%s` = true", 
ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
+          .ordered()
+          .baselineColumns("run_date")
+          .baselineValues(new DateTime(convertToLocalTimestamp("2017-12-06 
16:38:43.988")))
           .build().run();
     } finally {
       resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
@@ -842,16 +855,15 @@ public class TestParquetWriter extends BaseTestQuery {
   Test the conversion from int96 to impala timestamp with hive data including 
nulls. Validate against expected values
   */
   @Test
-  @Ignore("relies on particular time zone")
   public void testHiveParquetTimestampAsInt96_basic() throws Exception {
     testBuilder()
-            .unOrdered()
-            .sqlQuery("SELECT cast(convert_from(timestamp_field, 
'TIMESTAMP_IMPALA') as varchar(19))  as timestamp_field "
-              + "from cp.`parquet/part1/hive_all_types.parquet` ")
-            .baselineColumns("timestamp_field")
-            .baselineValues("2013-07-05 17:01:00")
-            .baselineValues((Object)null)
-            .go();
+        .unOrdered()
+        .sqlQuery("SELECT convert_from(timestamp_field, 'TIMESTAMP_IMPALA')  
as timestamp_field "
+             + "from cp.`parquet/part1/hive_all_types.parquet` ")
+        .baselineColumns("timestamp_field")
+        .baselineValues(new DateTime(convertToLocalTimestamp("2013-07-06 
00:01:00")))
+        .baselineValues((Object)null)
+        .go();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet
 
b/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet
new file mode 100644
index 0000000..3075cec
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/spark-generated-int96-timestamp.snappy.parquet
 differ

http://git-wip-us.apache.org/repos/asf/drill/blob/127e4150/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
 
b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
deleted file mode 100644
index 91b9b01..0000000
--- 
a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
+++ /dev/null
@@ -1,12 +0,0 @@
-1970-01-01 00:00:01.000
-1971-01-01 00:00:01.000
-1972-01-01 00:00:01.000
-1973-01-01 00:00:01.000
-1974-01-01 00:00:01.000
-2010-01-01 00:00:01.000
-2011-01-01 00:00:01.000
-2012-01-01 00:00:01.000
-2013-01-01 00:00:01.000
-2014-01-01 00:00:01.000
-2015-01-01 00:00:01.000
-2016-01-01 00:00:01.000

Reply via email to