Repository: spark
Updated Branches:
  refs/heads/master f1a99ad58 -> e1de34113


[SPARK-17091][SQL] Add rule to convert IN predicate to equivalent Parquet filter

## What changes were proposed in this pull request?

The original pr is: https://github.com/apache/spark/pull/18424

Add a new optimizer rule to convert an IN predicate to an equivalent Parquet 
filter and add `spark.sql.parquet.pushdown.inFilterThreshold` to control limit 
thresholds. Different data types have different limit thresholds, this is a 
copy of data for reference:

Type | limit threshold
-- | --
string | 370
int | 210
long | 285
double | 270
float | 220
decimal | Won't provide better performance before 
[SPARK-24549](https://issues.apache.org/jira/browse/SPARK-24549)

## How was this patch tested?
unit tests and manual tests

Author: Yuming Wang <yumw...@ebay.com>

Closes #21603 from wangyum/SPARK-17091.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1de3411
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1de3411
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1de3411

Branch: refs/heads/master
Commit: e1de34113e057707dfc5ff54a8109b3ec7c16dfb
Parents: f1a99ad
Author: Yuming Wang <yumw...@ebay.com>
Authored: Sat Jul 14 17:50:54 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Sat Jul 14 17:50:54 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 15 +++
 .../FilterPushdownBenchmark-results.txt         | 96 ++++++++++----------
 .../datasources/parquet/ParquetFileFormat.scala | 15 ++-
 .../datasources/parquet/ParquetFilters.scala    | 20 +++-
 .../parquet/ParquetFilterSuite.scala            | 66 +++++++++++++-
 5 files changed, 153 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1de3411/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 14dd528..699e939 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -386,6 +386,18 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
+    buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
+      .doc("The maximum number of values to filter push-down optimization for 
IN predicate. " +
+        "Large threshold won't necessarily provide much better performance. " +
+        "The experiment argued that 300 is the limit threshold. " +
+        "By setting this value to 0 this feature can be disabled. " +
+        "This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' is enabled.")
+      .internal()
+      .intConf
+      .checkValue(threshold => threshold >= 0, "The threshold must not be 
negative.")
+      .createWithDefault(10)
+
   val PARQUET_WRITE_LEGACY_FORMAT = 
buildConf("spark.sql.parquet.writeLegacyFormat")
     .doc("Whether to be compatible with the legacy Parquet format adopted by 
Spark 1.4 and prior " +
       "versions, when converting Parquet schema to Spark SQL schema and vice 
versa.")
@@ -1485,6 +1497,9 @@ class SQLConf extends Serializable with Logging {
   def parquetFilterPushDownStringStartWith: Boolean =
     getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
 
+  def parquetFilterPushDownInFilterThreshold: Int =
+    getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)
+
   def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

http://git-wip-us.apache.org/repos/asf/spark/blob/e1de3411/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt 
b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
index 110669b..c44908b 100644
--- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
+++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
@@ -417,120 +417,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 5, distribution: 10): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7477 / 7587          2.1         
475.4       1.0X
-Parquet Vectorized (Pushdown)                 7862 / 8346          2.0         
499.9       1.0X
-Native ORC Vectorized                         6447 / 7021          2.4         
409.9       1.2X
-Native ORC Vectorized (Pushdown)               983 / 1003         16.0         
 62.5       7.6X
+Parquet Vectorized                            7993 / 8104          2.0         
508.2       1.0X
+Parquet Vectorized (Pushdown)                  507 /  532         31.0         
 32.2      15.8X
+Native ORC Vectorized                         6922 / 7163          2.3         
440.1       1.2X
+Native ORC Vectorized (Pushdown)              1017 / 1058         15.5         
 64.6       7.9X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 5, distribution: 50): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7107 / 7290          2.2         
451.9       1.0X
-Parquet Vectorized (Pushdown)                 7196 / 7258          2.2         
457.5       1.0X
-Native ORC Vectorized                         6102 / 6222          2.6         
388.0       1.2X
-Native ORC Vectorized (Pushdown)               926 /  958         17.0         
 58.9       7.7X
+Parquet Vectorized                            7855 / 7963          2.0         
499.4       1.0X
+Parquet Vectorized (Pushdown)                  503 /  516         31.3         
 32.0      15.6X
+Native ORC Vectorized                         6825 / 6954          2.3         
433.9       1.2X
+Native ORC Vectorized (Pushdown)              1019 / 1044         15.4         
 64.8       7.7X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 5, distribution: 90): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7374 / 7692          2.1         
468.8       1.0X
-Parquet Vectorized (Pushdown)                 7771 / 7848          2.0         
494.1       0.9X
-Native ORC Vectorized                         6184 / 6356          2.5         
393.2       1.2X
-Native ORC Vectorized (Pushdown)               920 /  963         17.1         
 58.5       8.0X
+Parquet Vectorized                            7858 / 7928          2.0         
499.6       1.0X
+Parquet Vectorized (Pushdown)                  490 /  519         32.1         
 31.1      16.0X
+Native ORC Vectorized                         7079 / 7966          2.2         
450.1       1.1X
+Native ORC Vectorized (Pushdown)              1276 / 1673         12.3         
 81.1       6.2X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 10, distribution: 10): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7073 / 7326          2.2         
449.7       1.0X
-Parquet Vectorized (Pushdown)                 7304 / 7647          2.2         
464.4       1.0X
-Native ORC Vectorized                         6222 / 6579          2.5         
395.6       1.1X
-Native ORC Vectorized (Pushdown)               958 /  994         16.4         
 60.9       7.4X
+Parquet Vectorized                           8007 / 11155          2.0         
509.0       1.0X
+Parquet Vectorized (Pushdown)                  519 /  540         30.3         
 33.0      15.4X
+Native ORC Vectorized                         6848 / 7072          2.3         
435.4       1.2X
+Native ORC Vectorized (Pushdown)              1026 / 1050         15.3         
 65.2       7.8X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 10, distribution: 50): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7121 / 7501          2.2         
452.7       1.0X
-Parquet Vectorized (Pushdown)                 7751 / 8334          2.0         
492.8       0.9X
-Native ORC Vectorized                         6225 / 6680          2.5         
395.8       1.1X
-Native ORC Vectorized (Pushdown)               998 / 1020         15.8         
 63.5       7.1X
+Parquet Vectorized                            7876 / 7956          2.0         
500.7       1.0X
+Parquet Vectorized (Pushdown)                  521 /  535         30.2         
 33.1      15.1X
+Native ORC Vectorized                         7051 / 7368          2.2         
448.3       1.1X
+Native ORC Vectorized (Pushdown)              1014 / 1035         15.5         
 64.5       7.8X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 10, distribution: 90): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7157 / 7399          2.2         
455.1       1.0X
-Parquet Vectorized (Pushdown)                 7806 / 7911          2.0         
496.3       0.9X
-Native ORC Vectorized                         6548 / 6720          2.4         
416.3       1.1X
-Native ORC Vectorized (Pushdown)              1016 / 1050         15.5         
 64.6       7.0X
+Parquet Vectorized                            7897 / 8229          2.0         
502.1       1.0X
+Parquet Vectorized (Pushdown)                  513 /  530         30.7         
 32.6      15.4X
+Native ORC Vectorized                         6730 / 6990          2.3         
427.9       1.2X
+Native ORC Vectorized (Pushdown)              1003 / 1036         15.7         
 63.8       7.9X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 50, distribution: 10): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7662 / 7805          2.1         
487.1       1.0X
-Parquet Vectorized (Pushdown)                 7590 / 7861          2.1         
482.5       1.0X
-Native ORC Vectorized                         6840 / 8073          2.3         
434.9       1.1X
-Native ORC Vectorized (Pushdown)              1041 / 1075         15.1         
 66.2       7.4X
+Parquet Vectorized                            7967 / 8175          2.0         
506.5       1.0X
+Parquet Vectorized (Pushdown)                 8155 / 8434          1.9         
518.5       1.0X
+Native ORC Vectorized                         7002 / 7107          2.2         
445.2       1.1X
+Native ORC Vectorized (Pushdown)              1092 / 1139         14.4         
 69.4       7.3X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 50, distribution: 50): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            8230 / 9266          1.9         
523.2       1.0X
-Parquet Vectorized (Pushdown)                 7735 / 7960          2.0         
491.8       1.1X
-Native ORC Vectorized                         6945 / 7109          2.3         
441.6       1.2X
-Native ORC Vectorized (Pushdown)              1123 / 1144         14.0         
 71.4       7.3X
+Parquet Vectorized                            8032 / 8122          2.0         
510.7       1.0X
+Parquet Vectorized (Pushdown)                 8141 / 8908          1.9         
517.6       1.0X
+Native ORC Vectorized                         7140 / 7387          2.2         
454.0       1.1X
+Native ORC Vectorized (Pushdown)              1156 / 1220         13.6         
 73.5       6.9X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 50, distribution: 90): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7656 / 8058          2.1         
486.7       1.0X
-Parquet Vectorized (Pushdown)                 7860 / 8247          2.0         
499.7       1.0X
-Native ORC Vectorized                         6684 / 7003          2.4         
424.9       1.1X
-Native ORC Vectorized (Pushdown)              1085 / 1172         14.5         
 69.0       7.1X
+Parquet Vectorized                            8088 / 8350          1.9         
514.2       1.0X
+Parquet Vectorized (Pushdown)                 8629 / 8702          1.8         
548.6       0.9X
+Native ORC Vectorized                         7480 / 7886          2.1         
475.6       1.1X
+Native ORC Vectorized (Pushdown)              1106 / 1145         14.2         
 70.3       7.3X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 100, distribution: 10): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            7594 / 8128          2.1         
482.8       1.0X
-Parquet Vectorized (Pushdown)                 7845 / 7923          2.0         
498.8       1.0X
-Native ORC Vectorized                         5859 / 6421          2.7         
372.5       1.3X
-Native ORC Vectorized (Pushdown)              1037 / 1054         15.2         
 66.0       7.3X
+Parquet Vectorized                            8028 / 8165          2.0         
510.4       1.0X
+Parquet Vectorized (Pushdown)                 8349 / 8674          1.9         
530.8       1.0X
+Native ORC Vectorized                         7107 / 7354          2.2         
451.8       1.1X
+Native ORC Vectorized (Pushdown)              1175 / 1207         13.4         
 74.7       6.8X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 100, distribution: 50): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            6762 / 6775          2.3         
429.9       1.0X
-Parquet Vectorized (Pushdown)                 6911 / 6970          2.3         
439.4       1.0X
-Native ORC Vectorized                         5884 / 5960          2.7         
374.1       1.1X
-Native ORC Vectorized (Pushdown)              1028 / 1052         15.3         
 65.4       6.6X
+Parquet Vectorized                            8041 / 8195          2.0         
511.2       1.0X
+Parquet Vectorized (Pushdown)                 8466 / 8604          1.9         
538.2       0.9X
+Native ORC Vectorized                         7116 / 7286          2.2         
452.4       1.1X
+Native ORC Vectorized (Pushdown)              1197 / 1214         13.1         
 76.1       6.7X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 InSet -> InFilters (values count: 100, distribution: 90): Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------
-Parquet Vectorized                            6718 / 6767          2.3         
427.1       1.0X
-Parquet Vectorized (Pushdown)                 6812 / 6909          2.3         
433.1       1.0X
-Native ORC Vectorized                         5842 / 5883          2.7         
371.4       1.1X
-Native ORC Vectorized (Pushdown)              1040 / 1058         15.1         
 66.1       6.5X
+Parquet Vectorized                            7998 / 8311          2.0         
508.5       1.0X
+Parquet Vectorized (Pushdown)                9366 / 11257          1.7         
595.5       0.9X
+Native ORC Vectorized                         7856 / 9273          2.0         
499.5       1.0X
+Native ORC Vectorized (Pushdown)              1350 / 1747         11.7         
 85.8       5.9X
 
 
 
================================================================================================

http://git-wip-us.apache.org/repos/asf/spark/blob/e1de3411/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b86b97e..efddf8d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -334,17 +334,15 @@ class ParquetFileFormat
     val enableVectorizedReader: Boolean =
       sqlConf.parquetVectorizedReaderEnabled &&
       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-    val enableRecordFilter: Boolean =
-      sparkSession.sessionState.conf.parquetRecordFilterEnabled
-    val timestampConversion: Boolean =
-      sparkSession.sessionState.conf.isParquetINT96TimestampConversion
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
-    val enableParquetFilterPushDown: Boolean =
-      sparkSession.sessionState.conf.parquetFilterPushDown
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
     val pushDownDate = sqlConf.parquetFilterPushDownDate
     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
 
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
@@ -368,12 +366,13 @@ class ParquetFileFormat
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS)
           .getFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(pushDownDate,
+          pushDownStringStartWith, pushDownInFilterThreshold)
         filters
           // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
           // is used here.
-          .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
-          .createFilter(parquetSchema, _))
+          .flatMap(parquetFilters.createFilter(parquetSchema, _))
           .reduceOption(FilterApi.and)
       } else {
         None

http://git-wip-us.apache.org/repos/asf/spark/blob/e1de3411/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 4c9b940..e590c15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -37,7 +37,10 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+    pushDownDate: Boolean,
+    pushDownStartWith: Boolean,
+    pushDownInFilterThreshold: Int) {
 
   private case class ParquetSchemaType(
       originalType: OriginalType,
@@ -232,6 +235,15 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
     // See SPARK-20364.
     def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && 
!name.contains(".")
 
+    // All DataTypes that support `makeEq` can provide better performance.
+    def shouldConvertInPredicate(name: String): Boolean = nameToType(name) 
match {
+      case ParquetBooleanType | ParquetByteType | ParquetShortType | 
ParquetIntegerType
+           | ParquetLongType | ParquetFloatType | ParquetDoubleType | 
ParquetStringType
+           | ParquetBinaryType => true
+      case ParquetDateType if pushDownDate => true
+      case _ => false
+    }
+
     // NOTE:
     //
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
@@ -295,6 +307,12 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
       case sources.Not(pred) =>
         createFilter(schema, pred).map(FilterApi.not)
 
+      case sources.In(name, values) if canMakeFilterOn(name) && 
shouldConvertInPredicate(name)
+        && values.distinct.length <= pushDownInFilterThreshold =>
+        values.distinct.flatMap { v =>
+          makeEq.lift(nameToType(name)).map(_(name, v))
+        }.reduceLeftOption(FilterApi.or)
+
       case sources.StringStartsWith(name, prefix) if pushDownStartWith && 
canMakeFilterOn(name) =>
         Option(prefix).map { v =>
           FilterApi.userDefined(binaryColumn(name),

http://git-wip-us.apache.org/repos/asf/spark/blob/e1de3411/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 067d2fe..00c191f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.nio.charset.StandardCharsets
 import java.sql.Date
 
-import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, 
Operators}
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
 
@@ -56,7 +56,8 @@ import org.apache.spark.util.{AccumulatorContext, 
AccumulatorV2}
 class ParquetFilterSuite extends QueryTest with ParquetTest with 
SharedSQLContext {
 
   private lazy val parquetFilters =
-    new ParquetFilters(conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownStringStartWith)
+    new ParquetFilters(conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownStringStartWith,
+      conf.parquetFilterPushDownInFilterThreshold)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -803,6 +804,67 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     // Test inverseCanDrop() has taken effect
     testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not 
like '10%'")
   }
+
+  test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
+    val schema = StructType(Seq(
+      StructField("a", IntegerType, nullable = false)
+    ))
+
+    val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
+
+    assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(null)))
+    }
+
+    assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10)))
+    }
+
+    // Remove duplicates
+    assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 
10)))
+    }
+
+    assertResult(Some(or(or(
+      FilterApi.eq(intColumn("a"), 10: Integer),
+      FilterApi.eq(intColumn("a"), 20: Integer)),
+      FilterApi.eq(intColumn("a"), 30: Integer)))
+    ) {
+      parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 20, 
30)))
+    }
+
+    assert(parquetFilters.createFilter(parquetSchema, sources.In("a",
+      Range(0, 
conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined)
+    assert(parquetFilters.createFilter(parquetSchema, sources.In("a",
+      Range(0, conf.parquetFilterPushDownInFilterThreshold + 
1).toArray)).isEmpty)
+
+    import testImplicits._
+    withTempPath { path =>
+      val data = 0 to 1024
+      data.toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 
to null
+        .coalesce(1).write.option("parquet.block.size", 512)
+        .parquet(path.getAbsolutePath)
+      val df = spark.read.parquet(path.getAbsolutePath)
+      Seq(true, false).foreach { pushEnabled =>
+        withSQLConf(
+          SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushEnabled.toString) 
{
+          Seq(1, 5, 10, 11).foreach { count =>
+            val filter = s"a in(${Range(0, count).mkString(",")})"
+            assert(df.where(filter).count() === count)
+            val actual = stripSparkFilter(df.where(filter)).collect().length
+            if (pushEnabled && count <= 
conf.parquetFilterPushDownInFilterThreshold) {
+              assert(actual > 1 && actual < data.length)
+            } else {
+              assert(actual === data.length)
+            }
+          }
+          assert(df.where("a in(null)").count() === 0)
+          assert(df.where("a = null").count() === 0)
+          assert(df.where("a is null").count() === 1)
+        }
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to