Github user HyukjinKwon commented on the pull request: https://github.com/apache/spark/pull/10502#issuecomment-167933518 # Benchmark (Removed Spark-side Filter) ## Motivation This PR simplifies the query plans for Parquet files by stripping duplicated Spark-side filtering, from: ``` == Physical Plan == Filter (a#8 = 2) +- Scan ParquetRelation[a#8] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1, PushedFilters: [EqualTo(a,2)] ``` to : ``` == Physical Plan == Scan ParquetRelation[a#8] InputPaths: file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-ef271ec6-95e1-43ae-9b3e-1d4dae6f69c3/part=1, PushedFilters: [EqualTo(a,2)] ``` However, in terms of performance, it is unkown if there is benefit. So, this benchmark was performed. Simply, several quries were executed with duplicated Spark-side filtering and without it. ## Environment - Machine: MacBook Pro Retina - CPU: 4 - Memory: 8G ## Method In this benchmark, - The filters that Parquet can generate were composed with 9 simple quries. `IS NULL`, `IS NOT NULL`, `=`, `!=`, `<=>`, `<`, `>`, `<=` and `>=` operators were tested here. Each query was executed 5 times and calculated the average times. - The [commit right before this PR](https://github.com/apache/spark/commit/73862a1) was used against this PR. Namely, The tests were performed with Spark-side filtering and without Spark-side filtering. - In order to test cleanly, the target Parquet file was uncompressed and in order to enable Parquet filtering row by row, `spark.sql.parquet.enableUnsafeRowRecordReader` was disabled. ## Dataset #### Raw Data - [TPC-H](http://www.tpc.org/tpch/) Lineitem Table created with factor 1 ([generate data](https://github.com/rxin/TPC-H-Hive/tree/master/dbgen)) - Size : 724.66 MB #### Create Target Parquet File - `case` class for Lineitem table ```scala case class Lineitem(l_orderkey: Int, l_partkey: Int, l_suppkey: Int, l_linenumber: Int, l_quantity: Float, l_extendedprice: Float, l_discount: Float, l_tax: Float, l_returnflag: String, l_linestatus: String, l_shipdate: String, l_commitdate: String, l_receiptdate: String, l_shipinstruct: String, l_shipmode: String, l_comment: String) ``` - Create Parquet file ```scala import sqlContext.implicits._ var conf = new SparkConf() conf.setAppName("Test").setMaster("local") conf.set("spark.sql.parquet.compression.codec", "uncompressed") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sc.textFile("lineitem.tbl").map(_.split('|')).map { v => Lineitem( v(0).trim.toInt, v(1).trim.toInt, v(2).trim.toInt, v(3).trim.toInt, v(4).trim.toFloat, v(5).trim.toFloat, v(6).trim.toFloat, v(7).trim.toFloat, v(8), v(9), v(10), v(11), v(12), v(13), v(14), v(15)) }.toDF() df.save("lineitem", "parquet") ``` #### Parquet file ``` -rw-r--r-- hyukjinkwon hyukjinkwon 0 B _SUCCESS -rw-r--r-- hyukjinkwon hyukjinkwon 1.54 KB _common_metadata -rw-r--r-- hyukjinkwon hyukjinkwon 13.72 KB _metadata -rw-r--r-- hyukjinkwon hyukjinkwon 52.16 MB part-r-00000-2aa739b2-6194-47db-9dd5-461905cea976.parquet -rw-r--r-- hyukjinkwon hyukjinkwon 51.74 MB part-r-00001-2aa739b2-6194-47db-9dd5-461905cea976.parquet -rw-r--r-- hyukjinkwon hyukjinkwon 51.97 MB part-r-00002-2aa739b2-6194-47db-9dd5-461905cea976.parquet -rw-r--r-- hyukjinkwon hyukjinkwon 51.71 MB part-r-00003-2aa739b2-6194-47db-9dd5-461905cea976.parquet -rw-r--r-- hyukjinkwon hyukjinkwon 51.89 MB part-r-00004-2aa739b2-6194-47db-9dd5-461905cea976.parquet -rw-r--r-- hyukjinkwon hyukjinkwon 34.25 MB part-r-00005-2aa739b2-6194-47db-9dd5-461905cea976.parquet ``` ### Test Codes - Function to measure time ```scala def time[A](f: => A) = { val s = System.nanoTime val ret = f println("time: "+(System.nanoTime-s)/1e6+"ms") ret } ``` - Configuration and `SQLContext` ```scala var conf = new SparkConf() conf.setAppName("Test").setMaster("local") conf.set("spark.sql.parquet.enableUnsafeRowRecordReader", "false") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) ``` - `IS NULL` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey IS NULL").select("l_orderkey") time(df.collect()) ``` - `IS NOT NULL` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey IS NOT NULL").select("l_orderkey") time(df.collect()) ``` - `=` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey = 1").select("l_orderkey") time(df.collect()) ``` - `!=` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey != 1").select("l_orderkey") time(df.collect()) ``` - `<=>` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey <=> 1").select("l_orderkey") time(df.collect()) ``` - `<` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey < 3000000").select("l_orderkey") time(df.collect()) ``` - `>` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey > 3000000").select("l_orderkey") time(df.collect()) ``` - `<=` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey <= 3000000").select("l_orderkey") time(df.collect()) ``` - `>=` ```scala val source = sqlContext.read.parquet("lineitem") val df = source.filter("l_orderkey >= 3000000").select("l_orderkey") time(df.collect()) ``` ## Results |Operator|Without Spark Filtering(ms)|With Spark Filtering(ms)|Time Decreased(%)| |--------|---------------------------|------------------------|--------------| |`IS NULL`|645.015|669.038|3.590| |`IS NOT NULL`|8040.593|8394.950|4.221| |`=`| 885.764|906.658|2.304| |`!=`|7844.248|8082.113|2.943| |`<=>`|869.402|912.457|4.718| |`<`|4510.599|4625.214|2.478| |`>`|4732.729|4940.915|4.213| |`<=`|4868.453|4918.441|1.016| |`>=`|4751.772|4946.939|3.945| Basically, in a simple view, the difference was below. The original codes would work as below (**With Spark Filtering**): ```scala data // Parquet-side filtering .filter(pushedFilter) // Spark-side filtering .filter(pushedFilter) ``` This PR would change this into below (**Without Spark Filtering**): ```scala data // Parquet-side filtering .filter(pushedFilter) ``` Although both the same O(n) time complexity, the former was 2n and the latter was n. So, it seems there is performance benefit.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org