Github user HyukjinKwon commented on the pull request:

    https://github.com/apache/spark/pull/10502#issuecomment-167933518
  
    # Benchmark (Removed Spark-side Filter)
    
    ## Motivation
    
    This PR simplifies the query plans for Parquet files by stripping 
duplicated Spark-side filtering,
    
    from:
    
    ```
    == Physical Plan ==
    Filter (a#8 = 2)
      +- Scan ParquetRelation[a#8] InputPaths: 
file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1,
 PushedFilters: [EqualTo(a,2)]
    ```
    
    to :
    
    ```
    == Physical Plan ==
    Scan ParquetRelation[a#8] InputPaths: 
file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1,
 PushedFilters: [EqualTo(a,2)]
    ```
    
    However, in terms of performance, it is unkown if there is benefit. So, 
this benchmark was performed.
    Simply, several quries were executed with duplicated Spark-side filtering 
and without it.
    
    ## Environment
    
    - Machine: MacBook Pro Retina
    - CPU: 4
    - Memory: 8G
    
    ## Method
    
    In this benchmark, 
     - The filters that Parquet can generate were composed with 9 simple 
quries. `IS NULL`, `IS NOT NULL`, `=`, `!=`, `<=>`, `<`, `>`, `<=` and `>=` 
operators were tested here. Each query was executed 5 times and calculated the 
average times.
     - The [commit right before this 
PR](https://github.com/apache/spark/commit/73862a1) was used against this PR. 
Namely, The tests were performed with Spark-side filtering and without 
Spark-side filtering.
     - In order to test cleanly, the target Parquet file was uncompressed and 
in order to enable Parquet filtering row by row, 
`spark.sql.parquet.enableUnsafeRowRecordReader` was disabled.
    
    
    ## Dataset
    #### Raw Data
    - [TPC-H](http://www.tpc.org/tpch/) Lineitem Table created with factor 1 
([generate data](https://github.com/rxin/TPC-H-Hive/tree/master/dbgen)) 
    - Size : 724.66 MB
    
    #### Create Target Parquet File
    - `case` class for Lineitem table
    ```scala
    case class Lineitem(l_orderkey: Int,
                        l_partkey: Int,
                        l_suppkey: Int,
                        l_linenumber: Int,
                        l_quantity: Float,
                        l_extendedprice: Float,
                        l_discount: Float,
                        l_tax: Float,
                        l_returnflag: String,
                        l_linestatus: String,
                        l_shipdate: String,
                        l_commitdate: String,
                        l_receiptdate: String,
                        l_shipinstruct: String,
                        l_shipmode: String,
                        l_comment: String)
    ```
    
    - Create Parquet file
    ```scala
    import sqlContext.implicits._
    var conf = new SparkConf()
    conf.setAppName("Test").setMaster("local")
    conf.set("spark.sql.parquet.compression.codec", "uncompressed")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    sc.textFile("lineitem.tbl").map(_.split('|')).map { v =>
      Lineitem(
        v(0).trim.toInt,
        v(1).trim.toInt,
        v(2).trim.toInt,
        v(3).trim.toInt,
        v(4).trim.toFloat,
        v(5).trim.toFloat,
        v(6).trim.toFloat,
        v(7).trim.toFloat,
        v(8),
        v(9),
        v(10),
        v(11),
        v(12),
        v(13),
        v(14),
        v(15))
    }.toDF()
    df.save("lineitem", "parquet")
    ```
    
    #### Parquet file
    
    ```
    -rw-r--r--  hyukjinkwon hyukjinkwon 0 B         _SUCCESS
    -rw-r--r--  hyukjinkwon hyukjinkwon 1.54 KB     _common_metadata
    -rw-r--r--  hyukjinkwon hyukjinkwon 13.72 KB    _metadata
    -rw-r--r--  hyukjinkwon hyukjinkwon 52.16 MB    
part-r-00000-2aa739b2-6194-47db-9dd5-461905cea976.parquet
    -rw-r--r--  hyukjinkwon hyukjinkwon 51.74 MB    
part-r-00001-2aa739b2-6194-47db-9dd5-461905cea976.parquet
    -rw-r--r--  hyukjinkwon hyukjinkwon 51.97 MB    
part-r-00002-2aa739b2-6194-47db-9dd5-461905cea976.parquet
    -rw-r--r--  hyukjinkwon hyukjinkwon 51.71 MB    
part-r-00003-2aa739b2-6194-47db-9dd5-461905cea976.parquet
    -rw-r--r--  hyukjinkwon hyukjinkwon 51.89 MB    
part-r-00004-2aa739b2-6194-47db-9dd5-461905cea976.parquet
    -rw-r--r--  hyukjinkwon hyukjinkwon 34.25 MB    
part-r-00005-2aa739b2-6194-47db-9dd5-461905cea976.parquet
    ```
    
    ### Test Codes
    
    - Function to measure time
    ```scala
    def time[A](f: => A) = {
      val s = System.nanoTime
      val ret = f
      println("time: "+(System.nanoTime-s)/1e6+"ms")
      ret
    }
    ```
    
    - Configuration and `SQLContext`
    ```scala
    var conf = new SparkConf()
    conf.setAppName("Test").setMaster("local")
    conf.set("spark.sql.parquet.enableUnsafeRowRecordReader", "false")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    ```
    
    - `IS NULL`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey IS NULL").select("l_orderkey")
    time(df.collect())
    ```
    
    - `IS NOT NULL`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey IS NOT NULL").select("l_orderkey")
    time(df.collect())
    ```
    
    - `=`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey = 1").select("l_orderkey")
    time(df.collect())
    ```
    
    - `!=`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey != 1").select("l_orderkey")
    time(df.collect())
    ```
    - `<=>`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey <=> 1").select("l_orderkey")
    time(df.collect())
    ```
    - `<`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey < 3000000").select("l_orderkey")
    time(df.collect())
    ```
    
    - `>`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey > 3000000").select("l_orderkey")
    time(df.collect())
    ```
    
    - `<=`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey <= 3000000").select("l_orderkey")
    time(df.collect())
    ```
    
    - `>=`
    ```scala
    val source = sqlContext.read.parquet("lineitem")
    val df = source.filter("l_orderkey >= 3000000").select("l_orderkey")
    time(df.collect())
    ```
    
    ## Results
    
    |Operator|Without Spark Filtering(ms)|With Spark Filtering(ms)|Time 
Decreased(%)|
    
|--------|---------------------------|------------------------|--------------|
    |`IS NULL`|645.015|669.038|3.590|
    |`IS NOT NULL`|8040.593|8394.950|4.221|
    |`=`| 885.764|906.658|2.304|
    |`!=`|7844.248|8082.113|2.943|
    |`<=>`|869.402|912.457|4.718|
    |`<`|4510.599|4625.214|2.478|
    |`>`|4732.729|4940.915|4.213|
    |`<=`|4868.453|4918.441|1.016|
    |`>=`|4751.772|4946.939|3.945|
    
    
    Basically, in a simple view, the difference was below.
    
    The original codes would work as below (**With Spark Filtering**):
    ```scala
    data
      // Parquet-side filtering
      .filter(pushedFilter)
      // Spark-side filtering
      .filter(pushedFilter)
    ```
    
    This PR would change this into below (**Without Spark Filtering**):
    ```scala
    data
      // Parquet-side filtering
      .filter(pushedFilter)
    ```
    
    Although both the same O(n) time complexity, the former was 2n and the 
latter was n. So, it seems there is performance benefit.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to