Repository: spark Updated Branches: refs/heads/master 348c13898 -> 695f0e919
[SPARK-15107][SQL] Allow varying # iterations by test case in Benchmark ## What changes were proposed in this pull request? This patch changes our micro-benchmark util to allow setting different iteration numbers for different test cases. For some of our benchmarks, turning off whole-stage codegen can make the runtime 20X slower, making it very difficult to run a large number of times without substantially shortening the input cardinality. With this change, I set the default num iterations to 2 for whole stage codegen off, and 5 for whole stage codegen on. I also updated some results. ## How was this patch tested? N/A - this is a test util. Author: Reynold Xin <r...@databricks.com> Closes #12884 from rxin/SPARK-15107. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695f0e91 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695f0e91 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695f0e91 Branch: refs/heads/master Commit: 695f0e9195209c75bfc62fc70bfc6d7d9f1047b3 Parents: 348c138 Author: Reynold Xin <r...@databricks.com> Authored: Tue May 3 22:56:40 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue May 3 22:56:40 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Benchmark.scala | 21 ++- .../collection/unsafe/sort/RadixSortSuite.scala | 2 +- .../execution/BenchmarkWholeStageCodegen.scala | 137 +++++++++++-------- 3 files changed, 93 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/695f0e91/core/src/main/scala/org/apache/spark/util/Benchmark.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala index 1fc0ad7..0c685b1 100644 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala @@ -38,7 +38,7 @@ import org.apache.commons.lang3.SystemUtils private[spark] class Benchmark( name: String, valuesPerIteration: Long, - iters: Int = 5, + defaultNumIters: Int = 5, outputPerIteration: Boolean = false) { val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case] @@ -46,8 +46,8 @@ private[spark] class Benchmark( * Adds a case to run when run() is called. The given function will be run for several * iterations to collect timing statistics. */ - def addCase(name: String)(f: Int => Unit): Unit = { - addTimerCase(name) { timer => + def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { + addTimerCase(name, numIters) { timer => timer.startTiming() f(timer.iteration) timer.stopTiming() @@ -59,8 +59,8 @@ private[spark] class Benchmark( * until timer.startTiming() is called within the given function. The corresponding * timer.stopTiming() method must be called before the function returns. */ - def addTimerCase(name: String)(f: Benchmark.Timer => Unit): Unit = { - benchmarks += Benchmark.Case(name, f) + def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = { + benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters else numIters) } /** @@ -75,7 +75,7 @@ private[spark] class Benchmark( val results = benchmarks.map { c => println(" Running case: " + c.name) - Benchmark.measure(valuesPerIteration, iters, outputPerIteration)(c.fn) + Benchmark.measure(valuesPerIteration, c.numIters, outputPerIteration)(c.fn) } println @@ -83,12 +83,11 @@ private[spark] class Benchmark( // The results are going to be processor specific so it is useful to include that. println(Benchmark.getJVMOSInfo()) println(Benchmark.getProcessorName()) - printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", + printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", "Per Row(ns)", "Relative") - println("-----------------------------------------------------------------------------------" + - "--------") + println("-" * 96) results.zip(benchmarks).foreach { case (result, benchmark) => - printf("%-35s %16s %12s %13s %10s\n", + printf("%-40s %16s %12s %13s %10s\n", benchmark.name, "%5.0f / %4.0f" format (result.bestMs, result.avgMs), "%10.1f" format result.bestRate, @@ -128,7 +127,7 @@ private[spark] object Benchmark { } } - case class Case(name: String, fn: Timer => Unit) + case class Case(name: String, fn: Timer => Unit, numIters: Int) case class Result(avgMs: Double, bestRate: Double, bestMs: Double) /** http://git-wip-us.apache.org/repos/asf/spark/blob/695f0e91/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index 5242863..b03df1a 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -244,7 +244,7 @@ class RadixSortSuite extends SparkFunSuite with Logging { RadixSort.sortKeyPrefixArray(buf2, size, 0, 7, false, false) timer.stopTiming() } - benchmark.run + benchmark.run() /** Running benchmark: radix sort 25000000 http://git-wip-us.apache.org/repos/asf/spark/blob/695f0e91/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 841263d..7ca4b75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -36,6 +36,8 @@ import org.apache.spark.util.Benchmark * Benchmark to measure whole stage codegen performance. * To run this: * build/sbt "sql/test-only *BenchmarkWholeStageCodegen" + * + * Benchmarks in this file are skipped in normal builds. */ class BenchmarkWholeStageCodegen extends SparkFunSuite { lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark") @@ -44,31 +46,50 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { lazy val sc = SparkContext.getOrCreate(conf) lazy val sqlContext = SQLContext.getOrCreate(sc) - def runBenchmark(name: String, values: Long)(f: => Unit): Unit = { - val benchmark = new Benchmark(name, values) + /** Runs function `f` with whole stage codegen on and off. */ + def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, cardinality) - Seq(false, true).foreach { enabled => - benchmark.addCase(s"$name codegen=$enabled") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString) - f - } + benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + f + } + + benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + f } benchmark.run() } - // These benchmark are skipped in normal build - ignore("range/filter/sum") { - val N = 500L << 20 - runBenchmark("rang/filter/sum", N) { + ignore("aggregate without grouping") { + val N = 500L << 22 + val benchmark = new Benchmark("agg without grouping", N) + runBenchmark("agg w/o group", N) { + sqlContext.range(N).selectExpr("sum(id)").collect() + } + /* + agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + agg w/o group wholestage off 30136 / 31885 69.6 14.4 1.0X + agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X + */ + } + + ignore("filter & aggregate without group") { + val N = 500L << 22 + runBenchmark("range/filter/sum", N) { sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect() } /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X - rang/filter/sum codegen=true 897 / 1022 584.6 1.7 16.4X + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + range/filter/sum codegen=false 30663 / 31216 68.4 14.6 1.0X + range/filter/sum codegen=true 2399 / 2409 874.1 1.1 12.8X */ } @@ -86,28 +107,32 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("range/sample/sum") { - val N = 500 << 20 - runBenchmark("range/sample/sum", N) { - sqlContext.range(N).sample(true, 0.01).groupBy().sum().collect() + ignore("sample") { + val N = 500 << 18 + runBenchmark("sample with replacement", N) { + sqlContext.range(N).sample(withReplacement = true, 0.01).groupBy().sum().collect() } /* - Westmere E56xx/L56xx/X56xx (Nehalem-C) - range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - range/sample/sum codegen=false 53888 / 56592 9.7 102.8 1.0X - range/sample/sum codegen=true 41614 / 42607 12.6 79.4 1.3X + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + sample with replacement codegen=false 7073 / 7227 18.5 54.0 1.0X + sample with replacement codegen=true 5199 / 5203 25.2 39.7 1.4X */ - runBenchmark("range/sample/sum", N) { - sqlContext.range(N).sample(false, 0.01).groupBy().sum().collect() + runBenchmark("sample without replacement", N) { + sqlContext.range(N).sample(withReplacement = false, 0.01).groupBy().sum().collect() } /* - Westmere E56xx/L56xx/X56xx (Nehalem-C) - range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - range/sample/sum codegen=false 12982 / 13384 40.4 24.8 1.0X - range/sample/sum codegen=true 7074 / 7383 74.1 13.5 1.8X + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + sample without replacement codegen=false 1508 / 1529 86.9 11.5 1.0X + sample without replacement codegen=true 644 / 662 203.5 4.9 2.3X */ } @@ -151,23 +176,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } ignore("aggregate with linear keys") { - val N = 20 << 20 + val N = 20 << 22 val benchmark = new Benchmark("Aggregate w keys", N) def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() - benchmark.addCase(s"codegen = F") { iter => + benchmark.addCase(s"codegen = F", numIters = 2) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "false") f() } - benchmark.addCase(s"codegen = T hashmap = F") { iter => + benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } - benchmark.addCase(s"codegen = T hashmap = T") { iter => + benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() @@ -176,36 +201,37 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - codegen = F 2067 / 2166 10.1 98.6 1.0X - codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X - codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X + + Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + codegen = F 6619 / 6780 12.7 78.9 1.0X + codegen = T hashmap = F 3935 / 4059 21.3 46.9 1.7X + codegen = T hashmap = T 897 / 971 93.5 10.7 7.4X */ } ignore("aggregate with randomized keys") { - val N = 20 << 20 + val N = 20 << 22 val benchmark = new Benchmark("Aggregate w keys", N) sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test") def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect() - benchmark.addCase(s"codegen = F") { iter => + benchmark.addCase(s"codegen = F", numIters = 2) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "false") f() } - benchmark.addCase(s"codegen = T hashmap = F") { iter => + benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } - benchmark.addCase(s"codegen = T hashmap = T") { iter => + benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() @@ -214,13 +240,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - codegen = F 2517 / 2608 8.3 120.0 1.0X - codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X - codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X + + Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + codegen = F 7445 / 7517 11.3 88.7 1.0X + codegen = T hashmap = F 4672 / 4703 18.0 55.7 1.6X + codegen = T hashmap = T 1764 / 1958 47.6 21.0 4.2X */ } @@ -231,18 +258,18 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { def f(): Unit = sqlContext.range(N).selectExpr("id", "cast(id & 1023 as string) as k") .groupBy("k").count().collect() - benchmark.addCase(s"codegen = F") { iter => + benchmark.addCase(s"codegen = F", numIters = 2) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "false") f() } - benchmark.addCase(s"codegen = T hashmap = F") { iter => + benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } - benchmark.addCase(s"codegen = T hashmap = T") { iter => + benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org