Repository: spark
Updated Branches:
  refs/heads/master 348c13898 -> 695f0e919


[SPARK-15107][SQL] Allow varying # iterations by test case in Benchmark

## What changes were proposed in this pull request?
This patch changes our micro-benchmark util to allow setting different 
iteration numbers for different test cases. For some of our benchmarks, turning 
off whole-stage codegen can make the runtime 20X slower, making it very 
difficult to run a large number of times without substantially shortening the 
input cardinality.

With this change, I set the default num iterations to 2 for whole stage codegen 
off, and 5 for whole stage codegen on. I also updated some results.

## How was this patch tested?
N/A - this is a test util.

Author: Reynold Xin <r...@databricks.com>

Closes #12884 from rxin/SPARK-15107.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695f0e91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695f0e91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695f0e91

Branch: refs/heads/master
Commit: 695f0e9195209c75bfc62fc70bfc6d7d9f1047b3
Parents: 348c138
Author: Reynold Xin <r...@databricks.com>
Authored: Tue May 3 22:56:40 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue May 3 22:56:40 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Benchmark.scala |  21 ++-
 .../collection/unsafe/sort/RadixSortSuite.scala |   2 +-
 .../execution/BenchmarkWholeStageCodegen.scala  | 137 +++++++++++--------
 3 files changed, 93 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/695f0e91/core/src/main/scala/org/apache/spark/util/Benchmark.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala 
b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 1fc0ad7..0c685b1 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -38,7 +38,7 @@ import org.apache.commons.lang3.SystemUtils
 private[spark] class Benchmark(
     name: String,
     valuesPerIteration: Long,
-    iters: Int = 5,
+    defaultNumIters: Int = 5,
     outputPerIteration: Boolean = false) {
   val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]
 
@@ -46,8 +46,8 @@ private[spark] class Benchmark(
    * Adds a case to run when run() is called. The given function will be run 
for several
    * iterations to collect timing statistics.
    */
-  def addCase(name: String)(f: Int => Unit): Unit = {
-    addTimerCase(name) { timer =>
+  def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
+    addTimerCase(name, numIters) { timer =>
       timer.startTiming()
       f(timer.iteration)
       timer.stopTiming()
@@ -59,8 +59,8 @@ private[spark] class Benchmark(
    * until timer.startTiming() is called within the given function. The 
corresponding
    * timer.stopTiming() method must be called before the function returns.
    */
-  def addTimerCase(name: String)(f: Benchmark.Timer => Unit): Unit = {
-    benchmarks += Benchmark.Case(name, f)
+  def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => 
Unit): Unit = {
+    benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters 
else numIters)
   }
 
   /**
@@ -75,7 +75,7 @@ private[spark] class Benchmark(
 
     val results = benchmarks.map { c =>
       println("  Running case: " + c.name)
-      Benchmark.measure(valuesPerIteration, iters, outputPerIteration)(c.fn)
+      Benchmark.measure(valuesPerIteration, c.numIters, 
outputPerIteration)(c.fn)
     }
     println
 
@@ -83,12 +83,11 @@ private[spark] class Benchmark(
     // The results are going to be processor specific so it is useful to 
include that.
     println(Benchmark.getJVMOSInfo())
     println(Benchmark.getProcessorName())
-    printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", 
"Rate(M/s)",
+    printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", 
"Rate(M/s)",
       "Per Row(ns)", "Relative")
-    
println("-----------------------------------------------------------------------------------"
 +
-      "--------")
+    println("-" * 96)
     results.zip(benchmarks).foreach { case (result, benchmark) =>
-      printf("%-35s %16s %12s %13s %10s\n",
+      printf("%-40s %16s %12s %13s %10s\n",
         benchmark.name,
         "%5.0f / %4.0f" format (result.bestMs, result.avgMs),
         "%10.1f" format result.bestRate,
@@ -128,7 +127,7 @@ private[spark] object Benchmark {
     }
   }
 
-  case class Case(name: String, fn: Timer => Unit)
+  case class Case(name: String, fn: Timer => Unit, numIters: Int)
   case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/695f0e91/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index 5242863..b03df1a 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -244,7 +244,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
       RadixSort.sortKeyPrefixArray(buf2, size, 0, 7, false, false)
       timer.stopTiming()
     }
-    benchmark.run
+    benchmark.run()
 
     /**
       Running benchmark: radix sort 25000000

http://git-wip-us.apache.org/repos/asf/spark/blob/695f0e91/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 841263d..7ca4b75 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -36,6 +36,8 @@ import org.apache.spark.util.Benchmark
  * Benchmark to measure whole stage codegen performance.
  * To run this:
  *  build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
+ *
+ * Benchmarks in this file are skipped in normal builds.
  */
 class BenchmarkWholeStageCodegen extends SparkFunSuite {
   lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
@@ -44,31 +46,50 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
   lazy val sc = SparkContext.getOrCreate(conf)
   lazy val sqlContext = SQLContext.getOrCreate(sc)
 
-  def runBenchmark(name: String, values: Long)(f: => Unit): Unit = {
-    val benchmark = new Benchmark(name, values)
+  /** Runs function `f` with whole stage codegen on and off. */
+  def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
+    val benchmark = new Benchmark(name, cardinality)
 
-    Seq(false, true).foreach { enabled =>
-      benchmark.addCase(s"$name codegen=$enabled") { iter =>
-        sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
-        f
-      }
+    benchmark.addCase(s"$name wholestage off", numIters = 2) { iter =>
+      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
+      f
+    }
+
+    benchmark.addCase(s"$name wholestage on", numIters = 5) { iter =>
+      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+      f
     }
 
     benchmark.run()
   }
 
-  // These benchmark are skipped in normal build
-  ignore("range/filter/sum") {
-    val N = 500L << 20
-    runBenchmark("rang/filter/sum", N) {
+  ignore("aggregate without grouping") {
+    val N = 500L << 22
+    val benchmark = new Benchmark("agg without grouping", N)
+    runBenchmark("agg w/o group", N) {
+      sqlContext.range(N).selectExpr("sum(id)").collect()
+    }
+    /*
+    agg w/o group:                           Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+    
------------------------------------------------------------------------------------------------
+    agg w/o group wholestage off                30136 / 31885         69.6     
     14.4       1.0X
+    agg w/o group wholestage on                   1851 / 1860       1132.9     
      0.9      16.3X
+     */
+  }
+
+  ignore("filter & aggregate without group") {
+    val N = 500L << 22
+    runBenchmark("range/filter/sum", N) {
       sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
     }
     /*
-    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-    rang/filter/sum:                    Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    rang/filter/sum codegen=false          14332 / 16646         36.0          
27.8       1.0X
-    rang/filter/sum codegen=true              897 / 1022        584.6          
 1.7      16.4X
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
+    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
+    range/filter/sum:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+    
------------------------------------------------------------------------------------------------
+    range/filter/sum codegen=false              30663 / 31216         68.4     
     14.6       1.0X
+    range/filter/sum codegen=true                 2399 / 2409        874.1     
      1.1      12.8X
     */
   }
 
@@ -86,28 +107,32 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     */
   }
 
-  ignore("range/sample/sum") {
-    val N = 500 << 20
-    runBenchmark("range/sample/sum", N) {
-      sqlContext.range(N).sample(true, 0.01).groupBy().sum().collect()
+  ignore("sample") {
+    val N = 500 << 18
+    runBenchmark("sample with replacement", N) {
+      sqlContext.range(N).sample(withReplacement = true, 
0.01).groupBy().sum().collect()
     }
     /*
-    Westmere E56xx/L56xx/X56xx (Nehalem-C)
-    range/sample/sum:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    range/sample/sum codegen=false         53888 / 56592          9.7         
102.8       1.0X
-    range/sample/sum codegen=true          41614 / 42607         12.6          
79.4       1.3X
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
+    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
+    sample with replacement:                 Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+    
------------------------------------------------------------------------------------------------
+    sample with replacement codegen=false         7073 / 7227         18.5     
     54.0       1.0X
+    sample with replacement codegen=true          5199 / 5203         25.2     
     39.7       1.4X
     */
 
-    runBenchmark("range/sample/sum", N) {
-      sqlContext.range(N).sample(false, 0.01).groupBy().sum().collect()
+    runBenchmark("sample without replacement", N) {
+      sqlContext.range(N).sample(withReplacement = false, 
0.01).groupBy().sum().collect()
     }
     /*
-    Westmere E56xx/L56xx/X56xx (Nehalem-C)
-    range/sample/sum:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    range/sample/sum codegen=false         12982 / 13384         40.4          
24.8       1.0X
-    range/sample/sum codegen=true            7074 / 7383         74.1          
13.5       1.8X
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
+    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+
+    sample without replacement:              Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+    
------------------------------------------------------------------------------------------------
+    sample without replacement codegen=false      1508 / 1529         86.9     
     11.5       1.0X
+    sample without replacement codegen=true        644 /  662        203.5     
      4.9       2.3X
     */
   }
 
@@ -151,23 +176,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
   }
 
   ignore("aggregate with linear keys") {
-    val N = 20 << 20
+    val N = 20 << 22
 
     val benchmark = new Benchmark("Aggregate w keys", N)
     def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as 
k").groupBy("k").sum().collect()
 
-    benchmark.addCase(s"codegen = F") { iter =>
+    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
       f()
     }
 
-    benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
       sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
-    benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
       sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()
@@ -176,36 +201,37 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     benchmark.run()
 
     /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
     Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    codegen = F                              2067 / 2166         10.1          
98.6       1.0X
-    codegen = T hashmap = F                  1149 / 1321         18.3          
54.8       1.8X
-    codegen = T hashmap = T                   388 /  475         54.0          
18.5       5.3X
+
+    Aggregate w keys:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+    
------------------------------------------------------------------------------------------------
+    codegen = F                                   6619 / 6780         12.7     
     78.9       1.0X
+    codegen = T hashmap = F                       3935 / 4059         21.3     
     46.9       1.7X
+    codegen = T hashmap = T                        897 /  971         93.5     
     10.7       7.4X
     */
   }
 
   ignore("aggregate with randomized keys") {
-    val N = 20 << 20
+    val N = 20 << 22
 
     val benchmark = new Benchmark("Aggregate w keys", N)
     sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as 
k").registerTempTable("test")
 
     def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, 
k").collect()
 
-    benchmark.addCase(s"codegen = F") { iter =>
+    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
       f()
     }
 
-    benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
       sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
-    benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
       sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()
@@ -214,13 +240,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     benchmark.run()
 
     /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
     Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    codegen = F                              2517 / 2608          8.3         
120.0       1.0X
-    codegen = T hashmap = F                  1484 / 1560         14.1          
70.8       1.7X
-    codegen = T hashmap = T                   794 /  908         26.4          
37.9       3.2X
+
+    Aggregate w keys:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+    
------------------------------------------------------------------------------------------------
+    codegen = F                                   7445 / 7517         11.3     
     88.7       1.0X
+    codegen = T hashmap = F                       4672 / 4703         18.0     
     55.7       1.6X
+    codegen = T hashmap = T                       1764 / 1958         47.6     
     21.0       4.2X
     */
   }
 
@@ -231,18 +258,18 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     def f(): Unit = sqlContext.range(N).selectExpr("id", "cast(id & 1023 as 
string) as k")
       .groupBy("k").count().collect()
 
-    benchmark.addCase(s"codegen = F") { iter =>
+    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
       f()
     }
 
-    benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
       sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
-    benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
       sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to