Repository: spark
Updated Branches:
  refs/heads/master 8aceb961c -> 43e4e851b


[SPARK-24718][SQL] Timestamp support pushdown to parquet data source

## What changes were proposed in this pull request?

`Timestamp` support pushdown to parquet data source.
Only `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS` support push down.

## How was this patch tested?

unit tests and benchmark tests

Author: Yuming Wang <yumw...@ebay.com>

Closes #21741 from wangyum/SPARK-24718.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43e4e851
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43e4e851
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43e4e851

Branch: refs/heads/master
Commit: 43e4e851b642bbee535d22e1b9e72ec6b99f6ed4
Parents: 8aceb96
Author: Yuming Wang <yumw...@ebay.com>
Authored: Sun Jul 15 11:13:49 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Sun Jul 15 11:13:49 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 ++
 .../FilterPushdownBenchmark-results.txt         | 124 +++++++++++++++++++
 .../datasources/parquet/ParquetFileFormat.scala |   3 +-
 .../datasources/parquet/ParquetFilters.scala    |  59 ++++++++-
 .../benchmark/FilterPushdownBenchmark.scala     |  37 +++++-
 .../parquet/ParquetFilterSuite.scala            |  74 ++++++++++-
 6 files changed, 301 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43e4e851/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 699e939..07d33fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -378,6 +378,15 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED =
+    buildConf("spark.sql.parquet.filterPushdown.timestamp")
+      .doc("If true, enables Parquet filter push-down optimization for 
Timestamp. " +
+        "This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' is " +
+        "enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS 
type.")
+    .internal()
+    .booleanConf
+    .createWithDefault(true)
+
   val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
     buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
     .doc("If true, enables Parquet filter push-down optimization for string 
startsWith function. " +
@@ -1494,6 +1503,8 @@ class SQLConf extends Serializable with Logging {
 
   def parquetFilterPushDownDate: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
 
+  def parquetFilterPushDownTimestamp: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED)
+
   def parquetFilterPushDownStringStartWith: Boolean =
     getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/43e4e851/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt 
b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
index c44908b..4f38cc4 100644
--- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
+++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
@@ -578,3 +578,127 @@ Native ORC Vectorized                       11622 / 12196 
         1.4         7
 Native ORC Vectorized (Pushdown)            11377 / 11654          1.4         
723.3       1.0X
 
 
+================================================================================================
+Pushdown benchmark for Timestamp
+================================================================================================
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 1 timestamp stored as INT96 row (value = CAST(7864320 AS timestamp)): 
Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            4784 / 4956          3.3         
304.2       1.0X
+Parquet Vectorized (Pushdown)                 4838 / 4917          3.3         
307.6       1.0X
+Native ORC Vectorized                         3923 / 4173          4.0         
249.4       1.2X
+Native ORC Vectorized (Pushdown)               894 /  943         17.6         
 56.8       5.4X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 10% timestamp stored as INT96 rows (value < CAST(1572864 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            5686 / 5901          2.8         
361.5       1.0X
+Parquet Vectorized (Pushdown)                 5555 / 5895          2.8         
353.2       1.0X
+Native ORC Vectorized                         4844 / 4957          3.2         
308.0       1.2X
+Native ORC Vectorized (Pushdown)              2141 / 2230          7.3         
136.1       2.7X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 50% timestamp stored as INT96 rows (value < CAST(7864320 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            9100 / 9421          1.7         
578.6       1.0X
+Parquet Vectorized (Pushdown)                 9122 / 9496          1.7         
580.0       1.0X
+Native ORC Vectorized                         8365 / 8874          1.9         
531.9       1.1X
+Native ORC Vectorized (Pushdown)              7128 / 7376          2.2         
453.2       1.3X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 90% timestamp stored as INT96 rows (value < CAST(14155776 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                          12764 / 13120          1.2         
811.5       1.0X
+Parquet Vectorized (Pushdown)               12656 / 13003          1.2         
804.7       1.0X
+Native ORC Vectorized                       13096 / 13233          1.2         
832.6       1.0X
+Native ORC Vectorized (Pushdown)            12710 / 15611          1.2         
808.1       1.0X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 1 timestamp stored as TIMESTAMP_MICROS row (value = CAST(7864320 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            4381 / 4796          3.6         
278.5       1.0X
+Parquet Vectorized (Pushdown)                  122 /  137        129.3         
  7.7      36.0X
+Native ORC Vectorized                         3913 / 3988          4.0         
248.8       1.1X
+Native ORC Vectorized (Pushdown)               905 /  945         17.4         
 57.6       4.8X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 10% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(1572864 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            5145 / 5184          3.1         
327.1       1.0X
+Parquet Vectorized (Pushdown)                 1426 / 1519         11.0         
 90.7       3.6X
+Native ORC Vectorized                         4827 / 4901          3.3         
306.9       1.1X
+Native ORC Vectorized (Pushdown)              2133 / 2210          7.4         
135.6       2.4X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 50% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(7864320 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            9234 / 9516          1.7         
587.1       1.0X
+Parquet Vectorized (Pushdown)                 6752 / 7046          2.3         
429.3       1.4X
+Native ORC Vectorized                         8418 / 8998          1.9         
535.2       1.1X
+Native ORC Vectorized (Pushdown)              7199 / 7314          2.2         
457.7       1.3X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 90% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(14155776 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                          12414 / 12458          1.3         
789.2       1.0X
+Parquet Vectorized (Pushdown)               12094 / 12249          1.3         
768.9       1.0X
+Native ORC Vectorized                       12198 / 13755          1.3         
775.5       1.0X
+Native ORC Vectorized (Pushdown)            12205 / 12431          1.3         
776.0       1.0X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 1 timestamp stored as TIMESTAMP_MILLIS row (value = CAST(7864320 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            4369 / 4515          3.6         
277.8       1.0X
+Parquet Vectorized (Pushdown)                  116 /  125        136.2         
  7.3      37.8X
+Native ORC Vectorized                         3965 / 4703          4.0         
252.1       1.1X
+Native ORC Vectorized (Pushdown)               892 / 1162         17.6         
 56.7       4.9X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 10% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(1572864 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            5211 / 5409          3.0         
331.3       1.0X
+Parquet Vectorized (Pushdown)                 1427 / 1438         11.0         
 90.7       3.7X
+Native ORC Vectorized                         4719 / 4883          3.3         
300.1       1.1X
+Native ORC Vectorized (Pushdown)              2191 / 2228          7.2         
139.3       2.4X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 50% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(7864320 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                            8716 / 8953          1.8         
554.2       1.0X
+Parquet Vectorized (Pushdown)                 6632 / 6968          2.4         
421.7       1.3X
+Native ORC Vectorized                         8376 / 9118          1.9         
532.5       1.0X
+Native ORC Vectorized (Pushdown)              7218 / 7609          2.2         
458.9       1.2X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
+
+Select 90% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(14155776 AS 
timestamp)): Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Parquet Vectorized                          12264 / 12452          1.3         
779.7       1.0X
+Parquet Vectorized (Pushdown)               11766 / 11927          1.3         
748.0       1.0X
+Native ORC Vectorized                       12101 / 12301          1.3         
769.3       1.0X
+Native ORC Vectorized (Pushdown)            11983 / 12651          1.3         
761.9       1.0X
+

http://git-wip-us.apache.org/repos/asf/spark/blob/43e4e851/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index efddf8d..3ec33b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -341,6 +341,7 @@ class ParquetFileFormat
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
     val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
     val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
 
@@ -366,7 +367,7 @@ class ParquetFileFormat
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS)
           .getFileMetaData.getSchema
-        val parquetFilters = new ParquetFilters(pushDownDate,
+        val parquetFilters = new ParquetFilters(pushDownDate, 
pushDownTimestamp,
           pushDownStringStartWith, pushDownInFilterThreshold)
         filters
           // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be

http://git-wip-us.apache.org/repos/asf/spark/blob/43e4e851/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index e590c15..0c146f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,14 +17,15 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.sql.Date
+import java.lang.{Long => JLong}
+import java.sql.{Date, Timestamp}
 
 import scala.collection.JavaConverters.asScalaBufferConverter
 
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, 
PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, 
PrimitiveComparator}
 import org.apache.parquet.schema.OriginalType._
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
@@ -39,6 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
  */
 private[parquet] class ParquetFilters(
     pushDownDate: Boolean,
+    pushDownTimestamp: Boolean,
     pushDownStartWith: Boolean,
     pushDownInFilterThreshold: Int) {
 
@@ -57,6 +59,8 @@ private[parquet] class ParquetFilters(
   private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
   private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
   private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, 
INT64, null)
+  private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, 
INT64, null)
 
   private def dateToDays(date: Date): SQLDate = {
     DateTimeUtils.fromJavaDate(date)
@@ -89,6 +93,15 @@ private[parquet] class ParquetFilters(
       (n: String, v: Any) => FilterApi.eq(
         intColumn(n),
         Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+    case ParquetTimestampMicrosType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.eq(
+        longColumn(n),
+        Option(v).map(t => 
DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
+          .asInstanceOf[JLong]).orNull)
+    case ParquetTimestampMillisType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.eq(
+        longColumn(n),
+        
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
   }
 
   private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
@@ -117,6 +130,15 @@ private[parquet] class ParquetFilters(
       (n: String, v: Any) => FilterApi.notEq(
         intColumn(n),
         Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+    case ParquetTimestampMicrosType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.notEq(
+        longColumn(n),
+        Option(v).map(t => 
DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
+          .asInstanceOf[JLong]).orNull)
+    case ParquetTimestampMillisType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.notEq(
+        longColumn(n),
+        
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
   }
 
   private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
@@ -139,6 +161,14 @@ private[parquet] class ParquetFilters(
     case ParquetDateType if pushDownDate =>
       (n: String, v: Any) =>
         FilterApi.lt(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+    case ParquetTimestampMicrosType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.lt(
+        longColumn(n),
+        
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+    case ParquetTimestampMillisType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.lt(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
   }
 
   private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
@@ -161,6 +191,14 @@ private[parquet] class ParquetFilters(
     case ParquetDateType if pushDownDate =>
       (n: String, v: Any) =>
         FilterApi.ltEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+    case ParquetTimestampMicrosType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.ltEq(
+        longColumn(n),
+        
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+    case ParquetTimestampMillisType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.ltEq(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
   }
 
   private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
@@ -183,6 +221,14 @@ private[parquet] class ParquetFilters(
     case ParquetDateType if pushDownDate =>
       (n: String, v: Any) =>
         FilterApi.gt(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+    case ParquetTimestampMicrosType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.gt(
+        longColumn(n),
+        
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+    case ParquetTimestampMillisType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.gt(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
   }
 
   private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
@@ -205,6 +251,14 @@ private[parquet] class ParquetFilters(
     case ParquetDateType if pushDownDate =>
       (n: String, v: Any) =>
         FilterApi.gtEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+    case ParquetTimestampMicrosType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.gtEq(
+        longColumn(n),
+        
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+    case ParquetTimestampMillisType if pushDownTimestamp =>
+      (n: String, v: Any) => FilterApi.gtEq(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
   }
 
   /**
@@ -241,6 +295,7 @@ private[parquet] class ParquetFilters(
            | ParquetLongType | ParquetFloatType | ParquetDoubleType | 
ParquetStringType
            | ParquetBinaryType => true
       case ParquetDateType if pushDownDate => true
+      case ParquetTimestampMicrosType | ParquetTimestampMillisType if 
pushDownTimestamp => true
       case _ => false
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/43e4e851/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index fc716de..567a8eb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -28,7 +28,8 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.functions.monotonically_increasing_id
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType}
+import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
+import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, 
TimestampType}
 import org.apache.spark.util.{Benchmark, Utils}
 
 /**
@@ -359,6 +360,40 @@ class FilterPushdownBenchmark extends SparkFunSuite with 
BenchmarkBeforeAndAfter
       }
     }
   }
+
+  ignore(s"Pushdown benchmark for Timestamp") {
+    withTempPath { dir =>
+      withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> 
true.toString) {
+        ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { 
fileType =>
+          withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
+            val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
+            val df = spark.range(numRows).selectExpr(columns: _*)
+              .withColumn("value", 
monotonically_increasing_id().cast(TimestampType))
+            withTempTable("orcTable", "patquetTable") {
+              saveAsTable(df, dir)
+
+              Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
+                val title = s"Select 1 timestamp stored as $fileType row 
($whereExpr)"
+                  .replace("value AND value", "value")
+                filterPushDownBenchmark(numRows, title, whereExpr)
+              }
+
+              val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", 
",", ", MAX(value)")
+              Seq(10, 50, 90).foreach { percent =>
+                filterPushDownBenchmark(
+                  numRows,
+                  s"Select $percent% timestamp stored as $fileType rows " +
+                    s"(value < CAST(${numRows * percent / 100} AS timestamp))",
+                  s"value < CAST(${numRows * percent / 100} as timestamp)",
+                  selectExpr
+                )
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
 
 trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { 
this: Suite =>

http://git-wip-us.apache.org/repos/asf/spark/blob/43e4e851/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 00c191f..924f136 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.charset.StandardCharsets
-import java.sql.Date
+import java.sql.{Date, Timestamp}
 
 import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, 
Operators}
 import org.apache.parquet.filter2.predicate.FilterApi._
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
@@ -56,8 +57,8 @@ import org.apache.spark.util.{AccumulatorContext, 
AccumulatorV2}
 class ParquetFilterSuite extends QueryTest with ParquetTest with 
SharedSQLContext {
 
   private lazy val parquetFilters =
-    new ParquetFilters(conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownStringStartWith,
-      conf.parquetFilterPushDownInFilterThreshold)
+    new ParquetFilters(conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownTimestamp,
+      conf.parquetFilterPushDownStringStartWith, 
conf.parquetFilterPushDownInFilterThreshold)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -84,6 +85,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
     withSQLConf(
       SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
       SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
+      SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true",
       SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true",
       SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val query = df
@@ -144,6 +146,39 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  private def testTimestampPushdown(data: Seq[Timestamp]): Unit = {
+    assert(data.size === 4)
+    val ts1 = data.head
+    val ts2 = data(1)
+    val ts3 = data(2)
+    val ts4 = data(3)
+
+    withParquetDataFrame(data.map(i => Tuple1(i))) { implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => 
Row.apply(i)))
+
+      checkFilterPredicate('_1 === ts1, classOf[Eq[_]], ts1)
+      checkFilterPredicate('_1 <=> ts1, classOf[Eq[_]], ts1)
+      checkFilterPredicate('_1 =!= ts1, classOf[NotEq[_]],
+        Seq(ts2, ts3, ts4).map(i => Row.apply(i)))
+
+      checkFilterPredicate('_1 < ts2, classOf[Lt[_]], ts1)
+      checkFilterPredicate('_1 > ts1, classOf[Gt[_]], Seq(ts2, ts3, ts4).map(i 
=> Row.apply(i)))
+      checkFilterPredicate('_1 <= ts1, classOf[LtEq[_]], ts1)
+      checkFilterPredicate('_1 >= ts4, classOf[GtEq[_]], ts4)
+
+      checkFilterPredicate(Literal(ts1) === '_1, classOf[Eq[_]], ts1)
+      checkFilterPredicate(Literal(ts1) <=> '_1, classOf[Eq[_]], ts1)
+      checkFilterPredicate(Literal(ts2) > '_1, classOf[Lt[_]], ts1)
+      checkFilterPredicate(Literal(ts3) < '_1, classOf[Gt[_]], ts4)
+      checkFilterPredicate(Literal(ts1) >= '_1, classOf[LtEq[_]], ts1)
+      checkFilterPredicate(Literal(ts4) <= '_1, classOf[GtEq[_]], ts4)
+
+      checkFilterPredicate(!('_1 < ts4), classOf[GtEq[_]], ts4)
+      checkFilterPredicate('_1 < ts2 || '_1 > ts3, classOf[Operators.Or], 
Seq(Row(ts1), Row(ts4)))
+    }
+  }
+
   // This function tests that exactly go through the `canDrop` and 
`inverseCanDrop`.
   private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit 
= {
     withTempPath { dir =>
@@ -444,6 +479,39 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     }
   }
 
+  test("filter pushdown - timestamp") {
+    // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
+    val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
+      Timestamp.valueOf("2018-06-15 08:28:53.123"),
+      Timestamp.valueOf("2018-06-16 08:28:53.123"),
+      Timestamp.valueOf("2018-06-17 08:28:53.123"))
+    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+      ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
+      testTimestampPushdown(millisData)
+    }
+
+    // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
+    val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
+      Timestamp.valueOf("2018-06-15 08:28:53.123456"),
+      Timestamp.valueOf("2018-06-16 08:28:53.123456"),
+      Timestamp.valueOf("2018-06-17 08:28:53.123456"))
+    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+      ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
+      testTimestampPushdown(microsData)
+    }
+
+    // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
+    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+      ParquetOutputTimestampType.INT96.toString) {
+      withParquetDataFrame(millisData.map(i => Tuple1(i))) { implicit df =>
+        assertResult(None) {
+          parquetFilters.createFilter(
+            new SparkToParquetSchemaConverter(conf).convert(df.schema), 
sources.IsNull("_1"))
+        }
+      }
+    }
+  }
+
   test("SPARK-6554: don't push down predicates which reference partition 
columns") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to