Repository: spark
Updated Branches:
  refs/heads/master d8741b2b0 -> 673c67046


[SPARK-17310][SQL] Add an option to disable record-level filter in Parquet-side

## What changes were proposed in this pull request?

There is a concern that Spark-side codegen row-by-row filtering might be faster 
than Parquet's one in general due to type-boxing and additional fuction calls 
which Spark's one tries to avoid.

So, this PR adds an option to disable/enable record-by-record filtering in 
Parquet side.

It sets the default to `false` to take the advantage of the improvement.

This was also discussed in https://github.com/apache/spark/pull/14671.
## How was this patch tested?

Manually benchmarks were performed. I generated a billion (1,000,000,000) 
records and tested equality comparison concatenated with `OR`. This filter 
combinations were made from 5 to 30.

It seem indeed Spark-filtering is faster in the test case and the gap increased 
as the filter tree becomes larger.

The details are as below:

**Code**

``` scala
test("Parquet-side filter vs Spark-side filter - record by record") {
  withTempPath { path =>
    val N = 1000 * 1000 * 1000
    val df = spark.range(N).toDF("a")
    df.write.parquet(path.getAbsolutePath)

    val benchmark = new Benchmark("Parquet-side vs Spark-side", N)
    Seq(5, 10, 20, 30).foreach { num =>
      val filterExpr = (0 to num).map(i => s"a = $i").mkString(" OR ")

      benchmark.addCase(s"Parquet-side filter - number of filters [$num]", 3) { 
_ =>
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
false.toString,
          SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> true.toString) {

          // We should strip Spark-side filter to compare correctly.
          stripSparkFilter(
            spark.read.parquet(path.getAbsolutePath).filter(filterExpr)).count()
        }
      }

      benchmark.addCase(s"Spark-side filter - number of filters [$num]", 3) { _ 
=>
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
false.toString,
          SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> false.toString) {

          spark.read.parquet(path.getAbsolutePath).filter(filterExpr).count()
        }
      }
    }

    benchmark.run()
  }
}
```

**Result**

```
Parquet-side vs Spark-side:              Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet-side filter - number of filters [5]      4268 / 4367        234.3       
    4.3       0.8X
Spark-side filter - number of filters [5]      3709 / 3741        269.6         
  3.7       0.9X
Parquet-side filter - number of filters [10]      5673 / 5727        176.3      
     5.7       0.6X
Spark-side filter - number of filters [10]      3588 / 3632        278.7        
   3.6       0.9X
Parquet-side filter - number of filters [20]      8024 / 8440        124.6      
     8.0       0.4X
Spark-side filter - number of filters [20]      3912 / 3946        255.6        
   3.9       0.8X
Parquet-side filter - number of filters [30]    11936 / 12041         83.8      
    11.9       0.3X
Spark-side filter - number of filters [30]      3929 / 3978        254.5        
   3.9       0.8X
```

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #15049 from HyukjinKwon/SPARK-17310.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/673c6704
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/673c6704
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/673c6704

Branch: refs/heads/master
Commit: 673c67046598d33b9ecf864024ca7a937c1998d6
Parents: d8741b2
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Tue Nov 14 12:34:21 2017 +0100
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Nov 14 12:34:21 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++
 .../datasources/parquet/ParquetFileFormat.scala | 14 +++---
 .../parquet/ParquetFilterSuite.scala            | 51 +++++++++++++++++++-
 3 files changed, 65 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/673c6704/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 831ef62..0cb58fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -327,6 +327,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val PARQUET_RECORD_FILTER_ENABLED = 
buildConf("spark.sql.parquet.recordLevelFilter.enabled")
+    .doc("If true, enables Parquet's native record-level filtering using the 
pushed down " +
+      "filters. This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' " +
+      "is enabled.")
+    .booleanConf
+    .createWithDefault(false)
+
   val PARQUET_OUTPUT_COMMITTER_CLASS = 
buildConf("spark.sql.parquet.output.committer.class")
     .doc("The output committer class used by Parquet. The specified class 
needs to be a " +
       "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, 
it's also a subclass " +
@@ -1173,6 +1180,8 @@ class SQLConf extends Serializable with Logging {
 
   def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
 
+  def parquetRecordFilterEnabled: Boolean = 
getConf(PARQUET_RECORD_FILTER_ENABLED)
+
   def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
 
   def columnNameOfCorruptRecord: String = 
getConf(COLUMN_NAME_OF_CORRUPT_RECORD)

http://git-wip-us.apache.org/repos/asf/spark/blob/673c6704/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index a48f8d5..044b1a8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -335,6 +335,8 @@ class ParquetFileFormat
     val enableVectorizedReader: Boolean =
       sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean =
+      sparkSession.sessionState.conf.parquetRecordFilterEnabled
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
 
@@ -374,13 +376,11 @@ class ParquetFileFormat
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
-        val reader = pushed match {
-          case Some(filter) =>
-            new ParquetRecordReader[UnsafeRow](
-              new ParquetReadSupport,
-              FilterCompat.get(filter, null))
-          case _ =>
-            new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport, 
parquetFilter)
+        } else {
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
         }
         reader.initialize(split, hadoopAttemptContext)
         reader

http://git-wip-us.apache.org/repos/asf/spark/blob/673c6704/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 90f6620..3380195 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -45,8 +45,29 @@ import org.apache.spark.util.{AccumulatorContext, 
AccumulatorV2}
  *
  * 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to 
ensure the inferred
  *    data type is nullable.
+ *
+ * NOTE:
+ *
+ * This file intendedly enables record-level filtering explicitly. If new test 
cases are
+ * dependent on this configuration, don't forget you better explicitly set 
this configuration
+ * within the test.
  */
 class ParquetFilterSuite extends QueryTest with ParquetTest with 
SharedSQLContext {
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    // Note that there are many tests here that require record-level filtering 
set to be true.
+    spark.conf.set(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key, "true")
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      spark.conf.unset(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key)
+    } finally {
+      super.afterEach()
+    }
+  }
+
   private def checkFilterPredicate(
       df: DataFrame,
       predicate: Predicate,
@@ -369,7 +390,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
 
   test("Filter applied on merged Parquet schema with new column should work") {
     import testImplicits._
-    Seq("true", "false").map { vectorized =>
+    Seq("true", "false").foreach { vectorized =>
       withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
         SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
         SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
@@ -491,7 +512,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
     }
   }
 
-  test("Fiters should be pushed down for vectorized Parquet reader at row 
group level") {
+  test("Filters should be pushed down for vectorized Parquet reader at row 
group level") {
     import testImplicits._
 
     withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
@@ -555,6 +576,32 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("Filters should be pushed down for Parquet readers at row group level") 
{
+    import testImplicits._
+
+    withSQLConf(
+      // Makes sure disabling 'spark.sql.parquet.recordFilter' still enables
+      // row group level filtering.
+      SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false",
+      SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+      withTempPath { path =>
+        val data = (1 to 1024)
+        data.toDF("a").coalesce(1)
+          .write.option("parquet.block.size", 512)
+          .parquet(path.getAbsolutePath)
+        val df = spark.read.parquet(path.getAbsolutePath).filter("a == 500")
+        // Here, we strip the Spark side filter and check the actual results 
from Parquet.
+        val actual = stripSparkFilter(df).collect().length
+        // Since those are filtered at row group level, the result count 
should be less
+        // than the total length but should not be a single record.
+        // Note that, if record level filtering is enabled, it should be a 
single record.
+        // If no filter is pushed down to Parquet, it should be the total 
length of data.
+        assert(actual > 1 && actual < data.length)
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to