Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22500#discussion_r219218036
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
    @@ -17,251 +17,154 @@
     
     package org.apache.spark.sql.execution.benchmark
     
    -import org.apache.spark.util.Benchmark
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.util.{Benchmark, BenchmarkBase => 
FileBenchmarkBase}
     
     /**
      * Benchmark to measure whole stage codegen performance.
    - * To run this:
    - *  build/sbt "sql/test-only *benchmark.MiscBenchmark"
    - *
    - * Benchmarks in this file are skipped in normal builds.
    + * To run this benchmark:
    + * 1. without sbt: bin/spark-submit --class <this class> <spark sql test 
jar>
    + * 2. build/sbt "sql/test:runMain <this class>"
    + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/test:runMain <this class>"
    + *    Results will be written to "benchmarks/MiscBenchmark-results.txt".
      */
    -class MiscBenchmark extends BenchmarkBase {
    -
    -  ignore("filter & aggregate without group") {
    -    val N = 500L << 22
    -    runBenchmark("range/filter/sum", N) {
    -      sparkSession.range(N).filter("(id & 1) = 
1").groupBy().sum().collect()
    +object MiscBenchmark extends FileBenchmarkBase {
    +
    +  lazy val sparkSession = SparkSession.builder
    +    .master("local[1]")
    +    .appName("microbenchmark")
    +    .config("spark.sql.shuffle.partitions", 1)
    +    .config("spark.sql.autoBroadcastJoinThreshold", 1)
    +    .getOrCreate()
    +
    +  /** Runs function `f` with whole stage codegen on and off. */
    +  def runMiscBenchmark(name: String, cardinality: Long)(f: => Unit): Unit 
= {
    +    val benchmark = new Benchmark(name, cardinality, output = output)
    +
    +    benchmark.addCase(s"$name wholestage off", numIters = 2) { iter =>
    +      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false)
    +      f
         }
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    -
    -    range/filter/sum:                        Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    range/filter/sum codegen=false              30663 / 31216         68.4 
         14.6       1.0X
    -    range/filter/sum codegen=true                 2399 / 2409        874.1 
          1.1      12.8X
    -    */
    -  }
     
    -  ignore("range/limit/sum") {
    -    val N = 500L << 20
    -    runBenchmark("range/limit/sum", N) {
    -      sparkSession.range(N).limit(1000000).groupBy().sum().collect()
    +    benchmark.addCase(s"$name wholestage on", numIters = 5) { iter =>
    +      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
    +      f
         }
    -    /*
    -    Westmere E56xx/L56xx/X56xx (Nehalem-C)
    -    range/limit/sum:                    Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -    
-------------------------------------------------------------------------------------------
    -    range/limit/sum codegen=false             609 /  672        861.6      
     1.2       1.0X
    -    range/limit/sum codegen=true              561 /  621        935.3      
     1.1       1.1X
    -    */
    -  }
     
    -  ignore("sample") {
    -    val N = 500 << 18
    -    runBenchmark("sample with replacement", N) {
    -      sparkSession.range(N).sample(withReplacement = true, 
0.01).groupBy().sum().collect()
    -    }
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    -
    -    sample with replacement:                 Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    sample with replacement codegen=false         7073 / 7227         18.5 
         54.0       1.0X
    -    sample with replacement codegen=true          5199 / 5203         25.2 
         39.7       1.4X
    -    */
    -
    -    runBenchmark("sample without replacement", N) {
    -      sparkSession.range(N).sample(withReplacement = false, 
0.01).groupBy().sum().collect()
    -    }
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    -
    -    sample without replacement:              Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    sample without replacement codegen=false      1508 / 1529         86.9 
         11.5       1.0X
    -    sample without replacement codegen=true        644 /  662        203.5 
          4.9       2.3X
    -    */
    -  }
    -
    -  ignore("collect") {
    -    val N = 1 << 20
    -
    -    val benchmark = new Benchmark("collect", N)
    -    benchmark.addCase("collect 1 million") { iter =>
    -      sparkSession.range(N).collect()
    -    }
    -    benchmark.addCase("collect 2 millions") { iter =>
    -      sparkSession.range(N * 2).collect()
    -    }
    -    benchmark.addCase("collect 4 millions") { iter =>
    -      sparkSession.range(N * 4).collect()
    -    }
         benchmark.run()
    -
    -    /*
    -    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -    collect:                            Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -    
-------------------------------------------------------------------------------------------
    -    collect 1 million                         439 /  654          2.4      
   418.7       1.0X
    -    collect 2 millions                        961 / 1907          1.1      
   916.4       0.5X
    -    collect 4 millions                       3193 / 3895          0.3      
  3044.7       0.1X
    -     */
       }
     
    -  ignore("collect limit") {
    -    val N = 1 << 20
    -
    -    val benchmark = new Benchmark("collect limit", N)
    -    benchmark.addCase("collect limit 1 million") { iter =>
    -      sparkSession.range(N * 4).limit(N).collect()
    -    }
    -    benchmark.addCase("collect limit 2 millions") { iter =>
    -      sparkSession.range(N * 4).limit(N * 2).collect()
    +  override def benchmark(): Unit = {
    +    runBenchmark("filter & aggregate without group") {
    +      val N = 500L << 22
    +      runMiscBenchmark("range/filter/sum", N) {
    +        sparkSession.range(N).filter("(id & 1) = 
1").groupBy().sum().collect()
    +      }
         }
    -    benchmark.run()
     
    -    /*
    -    model name      : Westmere E56xx/L56xx/X56xx (Nehalem-C)
    -    collect limit:                      Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -    
-------------------------------------------------------------------------------------------
    -    collect limit 1 million                   833 / 1284          1.3      
   794.4       1.0X
    -    collect limit 2 millions                 3348 / 4005          0.3      
  3193.3       0.2X
    -     */
    -  }
    -
    -  ignore("generate explode") {
    -    val N = 1 << 24
    -    runBenchmark("generate explode array", N) {
    -      val df = sparkSession.range(N).selectExpr(
    -        "id as key",
    -        "array(rand(), rand(), rand(), rand(), rand()) as values")
    -      df.selectExpr("key", "explode(values) value").count()
    +    runBenchmark("range/limit/sum") {
    +      val N = 500L << 20
    +      runMiscBenchmark("range/limit/sum", N) {
    +        sparkSession.range(N).limit(1000000).groupBy().sum().collect()
    +      }
         }
     
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
    -    Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
    -
    -    generate explode array:                  Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    generate explode array wholestage off         6920 / 7129          2.4 
        412.5       1.0X
    -    generate explode array wholestage on           623 /  646         26.9 
         37.1      11.1X
    -     */
    +    runBenchmark("sample") {
    +      val N = 500 << 18
    +      runMiscBenchmark("sample with replacement", N) {
    +        sparkSession.range(N).sample(withReplacement = true, 
0.01).groupBy().sum().collect()
    +      }
     
    -    runBenchmark("generate explode map", N) {
    -      val df = sparkSession.range(N).selectExpr(
    -        "id as key",
    -        "map('a', rand(), 'b', rand(), 'c', rand(), 'd', rand(), 'e', 
rand()) pairs")
    -      df.selectExpr("key", "explode(pairs) as (k, v)").count()
    +      runMiscBenchmark("sample without replacement", N) {
    +        sparkSession.range(N).sample(withReplacement = false, 
0.01).groupBy().sum().collect()
    +      }
         }
     
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
    -    Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
    -
    -    generate explode map:                    Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    generate explode map wholestage off         11978 / 11993          1.4 
        714.0       1.0X
    -    generate explode map wholestage on             866 /  919         19.4 
         51.6      13.8X
    -     */
    -
    -    runBenchmark("generate posexplode array", N) {
    -      val df = sparkSession.range(N).selectExpr(
    -        "id as key",
    -        "array(rand(), rand(), rand(), rand(), rand()) as values")
    -      df.selectExpr("key", "posexplode(values) as (idx, value)").count()
    +    runBenchmark("collect") {
    +      val N = 1 << 20
    +
    +      val benchmark = new Benchmark("collect", N, output = output)
    +      benchmark.addCase("collect 1 million") { iter =>
    +        sparkSession.range(N).collect()
    +      }
    +      benchmark.addCase("collect 2 millions") { iter =>
    +        sparkSession.range(N * 2).collect()
    +      }
    +      benchmark.addCase("collect 4 millions") { iter =>
    +        sparkSession.range(N * 4).collect()
    +      }
    +      benchmark.run()
         }
     
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
    -    Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
    -
    -    generate posexplode array:               Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    generate posexplode array wholestage off      7502 / 7513          2.2 
        447.1       1.0X
    -    generate posexplode array wholestage on        617 /  623         27.2 
         36.8      12.2X
    -     */
    -
    -    runBenchmark("generate inline array", N) {
    -      val df = sparkSession.range(N).selectExpr(
    -        "id as key",
    -        "array((rand(), rand()), (rand(), rand()), (rand(), 0.0d)) as 
values")
    -      df.selectExpr("key", "inline(values) as (r1, r2)").count()
    +    runBenchmark("collect limit") {
    +      val N = 1 << 20
    +
    +      val benchmark = new Benchmark("collect limit", N, output = output)
    +      benchmark.addCase("collect limit 1 million") { iter =>
    +        sparkSession.range(N * 4).limit(N).collect()
    +      }
    +      benchmark.addCase("collect limit 2 millions") { iter =>
    +        sparkSession.range(N * 4).limit(N * 2).collect()
    +      }
    +      benchmark.run()
         }
     
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
    -    Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
    -
    -    generate inline array:                   Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    generate inline array wholestage off          6901 / 6928          2.4 
        411.3       1.0X
    -    generate inline array wholestage on           1001 / 1010         16.8 
         59.7       6.9X
    -     */
    -
    -    val M = 60000
    -    runBenchmark("generate big struct array", M) {
    -      import sparkSession.implicits._
    -      val df = sparkSession.sparkContext.parallelize(Seq(("1",
    -        Array.fill(M)({
    -          val i = math.random
    -          (i.toString, (i + 1).toString, (i + 2).toString, (i + 
3).toString)
    -        })))).toDF("col", "arr")
    -
    -      df.selectExpr("*", "expode(arr) as arr_col")
    -        .select("col", "arr_col.*").count
    +    runBenchmark("generate explode") {
    +      val N = 1 << 24
    +      runMiscBenchmark("generate explode array", N) {
    +        val df = sparkSession.range(N).selectExpr(
    +          "id as key",
    +          "array(rand(), rand(), rand(), rand(), rand()) as values")
    +        df.selectExpr("key", "explode(values) value").count()
    +      }
    +
    +      runMiscBenchmark("generate explode map", N) {
    +        val df = sparkSession.range(N).selectExpr(
    +          "id as key",
    +          "map('a', rand(), 'b', rand(), 'c', rand(), 'd', rand(), 'e', 
rand()) pairs")
    +        df.selectExpr("key", "explode(pairs) as (k, v)").count()
    +      }
    +
    +      runMiscBenchmark("generate posexplode array", N) {
    +        val df = sparkSession.range(N).selectExpr(
    +          "id as key",
    +          "array(rand(), rand(), rand(), rand(), rand()) as values")
    +        df.selectExpr("key", "posexplode(values) as (idx, value)").count()
    +      }
    +
    +      runMiscBenchmark("generate inline array", N) {
    +        val df = sparkSession.range(N).selectExpr(
    +          "id as key",
    +          "array((rand(), rand()), (rand(), rand()), (rand(), 0.0d)) as 
values")
    +        df.selectExpr("key", "inline(values) as (r1, r2)").count()
    +      }
    +
    +      val M = 60000
    +      runMiscBenchmark("generate big struct array", M) {
    +        import sparkSession.implicits._
    +        val df = sparkSession.sparkContext.parallelize(Seq(("1",
    +          Array.fill(M)({
    +            val i = math.random
    +            (i.toString, (i + 1).toString, (i + 2).toString, (i + 
3).toString)
    +          })))).toDF("col", "arr")
    +
    +        df.selectExpr("*", "explode(arr) as arr_col")
    --- End diff --
    
    Function name should be `explode`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to