Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r220959695 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -34,621 +34,539 @@ import org.apache.spark.unsafe.map.BytesToBytesMap /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.AggregateBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar> + * 2. build/sbt "sql/test:runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" + * Results will be written to "benchmarks/AggregateBenchmark-results.txt". + * }}} */ -class AggregateBenchmark extends BenchmarkWithCodegen { +object AggregateBenchmark extends SqlBasedBenchmark { - ignore("aggregate without grouping") { - val N = 500L << 22 - val benchmark = new Benchmark("agg without grouping", N) - runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + override def benchmark(): Unit = { + runBenchmark("aggregate without grouping") { + val N = 500L << 22 + runBenchmarkWithCodegen("agg w/o group", N) { + spark.range(N).selectExpr("sum(id)").collect() + } } - /* - agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - agg w/o group wholestage off 30136 / 31885 69.6 14.4 1.0X - agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X - */ - } - ignore("stat functions") { - val N = 100L << 20 + runBenchmark("stat functions") { + val N = 100L << 20 - runBenchmark("stddev", N) { - sparkSession.range(N).groupBy().agg("id" -> "stddev").collect() - } + runBenchmarkWithCodegen("stddev", N) { + spark.range(N).groupBy().agg("id" -> "stddev").collect() + } - runBenchmark("kurtosis", N) { - sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect() + runBenchmarkWithCodegen("kurtosis", N) { + spark.range(N).groupBy().agg("id" -> "kurtosis").collect() + } } - /* - Using ImperativeAggregate (as implemented in Spark 1.6): - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - stddev w/o codegen 2019.04 10.39 1.00 X - stddev w codegen 2097.29 10.00 0.96 X - kurtosis w/o codegen 2108.99 9.94 0.96 X - kurtosis w codegen 2090.69 10.03 0.97 X - - Using DeclarativeAggregate: - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - stddev codegen=false 5630 / 5776 18.0 55.6 1.0X - stddev codegen=true 1259 / 1314 83.0 12.0 4.5X - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X - kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X - */ - } - - ignore("aggregate with linear keys") { - val N = 20 << 22 + runBenchmark("aggregate with linear keys") { + val N = 20 << 22 - val benchmark = new Benchmark("Aggregate w keys", N) - def f(): Unit = { - sparkSession.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() - } + val benchmark = new Benchmark("Aggregate w keys", N, output = output) - benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") - f() - } + def f(): Unit = { + spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") - f() - } + benchmark.addCase(s"codegen = F", numIters = 2) { _ => --- End diff -- Thanks @dongjoon-hyun I plan add `EmptyInterpolatedStringChecker` to scalastyle-config.xml to avoid this issue: [SPARK-25553](https://issues.apache.org/jira/browse/SPARK-25553)
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org