[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r220025726 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -104,23 +107,26 @@ object AggregateBenchmark extends SqlBasedBenchmark { def f(): Unit = spark.sql("select k, k, sum(id) from test group by k, k").collect() - benchmark.addCase(s"codegen = F", numIters = 2) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", value = false) -f() + benchmark.addCase(s"codegen = F", numIters = 2) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { + f() +} } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", value = true) -spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") -f() + benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> false.toString, + "spark.sql.codegen.aggregate.map.vectorized.enable" -> false.toString) { --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r220025625 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -73,23 +73,26 @@ object AggregateBenchmark extends SqlBasedBenchmark { spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() } - benchmark.addCase(s"codegen = F", numIters = 2) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", "false") -f() + benchmark.addCase(s"codegen = F", numIters = 2) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { + f() +} } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", "true") -spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") -f() + benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> false.toString, + "spark.sql.codegen.aggregate.map.vectorized.enable" -> false.toString) { + f() +} } - benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", "true") -spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") -f() + benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> true.toString, + "spark.sql.codegen.aggregate.map.vectorized.enable" -> true.toString) { --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r220025674 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -104,23 +107,26 @@ object AggregateBenchmark extends SqlBasedBenchmark { def f(): Unit = spark.sql("select k, k, sum(id) from test group by k, k").collect() - benchmark.addCase(s"codegen = F", numIters = 2) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", value = false) -f() + benchmark.addCase(s"codegen = F", numIters = 2) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { --- End diff -- `"false"` instead of `false.toString`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r220025591 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -73,23 +73,26 @@ object AggregateBenchmark extends SqlBasedBenchmark { spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() } - benchmark.addCase(s"codegen = F", numIters = 2) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", "false") -f() + benchmark.addCase(s"codegen = F", numIters = 2) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { + f() +} } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", "true") -spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") -f() + benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> false.toString, + "spark.sql.codegen.aggregate.map.vectorized.enable" -> false.toString) { --- End diff -- ```scala withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r220025291 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -73,23 +73,26 @@ object AggregateBenchmark extends SqlBasedBenchmark { spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() } - benchmark.addCase(s"codegen = F", numIters = 2) { iter => -spark.conf.set("spark.sql.codegen.wholeStage", "false") -f() + benchmark.addCase(s"codegen = F", numIters = 2) { _ => +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { --- End diff -- `"false"` instead of `false.toString`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219936138 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -34,621 +34,508 @@ import org.apache.spark.unsafe.map.BytesToBytesMap /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.AggregateBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/AggregateBenchmark-results.txt". + * }}} */ -class AggregateBenchmark extends BenchmarkWithCodegen { +object AggregateBenchmark extends SqlBasedBenchmark { - ignore("aggregate without grouping") { -val N = 500L << 22 -val benchmark = new Benchmark("agg without grouping", N) -runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + override def benchmark(): Unit = { +runBenchmark("aggregate without grouping") { + val N = 500L << 22 + runBenchmarkWithCodegen("agg w/o group", N) { +spark.range(N).selectExpr("sum(id)").collect() + } } -/* -agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - -agg w/o group wholestage off30136 / 31885 69.6 14.4 1.0X -agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X - */ - } - ignore("stat functions") { -val N = 100L << 20 +runBenchmark("stat functions") { + val N = 100L << 20 -runBenchmark("stddev", N) { - sparkSession.range(N).groupBy().agg("id" -> "stddev").collect() -} + runBenchmarkWithCodegen("stddev", N) { +spark.range(N).groupBy().agg("id" -> "stddev").collect() + } -runBenchmark("kurtosis", N) { - sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect() + runBenchmarkWithCodegen("kurtosis", N) { +spark.range(N).groupBy().agg("id" -> "kurtosis").collect() + } } -/* -Using ImperativeAggregate (as implemented in Spark 1.6): - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev:Avg Time(ms)Avg Rate(M/s) Relative Rate - --- - stddev w/o codegen 2019.0410.39 1.00 X - stddev w codegen2097.2910.00 0.96 X - kurtosis w/o codegen2108.99 9.94 0.96 X - kurtosis w codegen 2090.6910.03 0.97 X - - Using DeclarativeAggregate: - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative - --- - stddev codegen=false 5630 / 5776 18.0 55.6 1.0X - stddev codegen=true 1259 / 1314 83.0 12.0 4.5X - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - kurtosis: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative - --- - kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X - kurtosis codegen=true1652 / 2124 63.0 15.9 9.0X -*/ - } +runBenchmark("aggregate with linear keys") { + val N = 20 << 22 - ignore("aggregate with linear keys") { -val N = 20 << 22 + val benchmark = new Benchmark("Aggregate w keys", N, output = output) -val benchmark = new Benchmark("Aggregate w keys", N) -def f(): Unit = { - sparkSession.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() -} - -benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") - f() -}
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219725643 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/RunBenchmarkWithCodegen.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +/** + * Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together + * with other test suites). + */ +trait RunBenchmarkWithCodegen extends BenchmarkBase { + + val spark: SparkSession = getSparkSession + + /** Subclass can override this function to build their own SparkSession */ + def getSparkSession: SparkSession = { +SparkSession.builder() + .master("local[1]") + .appName(this.getClass.getCanonicalName) + .config(SQLConf.SHUFFLE_PARTITIONS.key, 1) + .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1) + .getOrCreate() + } + + /** Runs function `f` with whole stage codegen on and off. */ + def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { --- End diff -- How about `runBenchmark` -> `runSqlBaseBenchmark `? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219725606 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/RunBenchmarkWithCodegen.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +/** + * Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together + * with other test suites). + */ +trait RunBenchmarkWithCodegen extends BenchmarkBase { --- End diff -- How about `RunBenchmarkWithCodegen` -> `SqlBaseBenchmarkBase`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219706455 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -34,621 +34,508 @@ import org.apache.spark.unsafe.map.BytesToBytesMap /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.AggregateBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/AggregateBenchmark-results.txt". + * }}} */ -class AggregateBenchmark extends BenchmarkWithCodegen { +object AggregateBenchmark extends RunBenchmarkWithCodegen { - ignore("aggregate without grouping") { -val N = 500L << 22 -val benchmark = new Benchmark("agg without grouping", N) -runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + override def benchmark(): Unit = { +runBenchmark("aggregate without grouping") { + val N = 500L << 22 + runBenchmark("agg w/o group", N) { --- End diff -- Well I don't a good name in mind. How about make the method `runBenchmark` of `RunBenchmarkWithCodegen` overriding the one in `BenchmarkBase`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219683925 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -34,621 +34,508 @@ import org.apache.spark.unsafe.map.BytesToBytesMap /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.AggregateBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/AggregateBenchmark-results.txt". + * }}} */ -class AggregateBenchmark extends BenchmarkWithCodegen { +object AggregateBenchmark extends RunBenchmarkWithCodegen { - ignore("aggregate without grouping") { -val N = 500L << 22 -val benchmark = new Benchmark("agg without grouping", N) -runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + override def benchmark(): Unit = { +runBenchmark("aggregate without grouping") { + val N = 500L << 22 + runBenchmark("agg w/o group", N) { --- End diff -- Yes. Do you have a suggested name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219676161 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -34,621 +34,508 @@ import org.apache.spark.unsafe.map.BytesToBytesMap /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.AggregateBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/AggregateBenchmark-results.txt". + * }}} */ -class AggregateBenchmark extends BenchmarkWithCodegen { +object AggregateBenchmark extends RunBenchmarkWithCodegen { - ignore("aggregate without grouping") { -val N = 500L << 22 -val benchmark = new Benchmark("agg without grouping", N) -runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + override def benchmark(): Unit = { +runBenchmark("aggregate without grouping") { + val N = 500L << 22 + runBenchmark("agg w/o group", N) { --- End diff -- The `runBenchmark` here is different from the on in line 48, but they have the same name. We should have a different name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22484#discussion_r219104743 --- Diff: sql/core/benchmarks/AggregateBenchmark-results.txt --- @@ -0,0 +1,154 @@ + +aggregate without grouping + + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 +Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz + +agg w/o group: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +agg w/o group wholestage off39650 / 46049 52.9 18.9 1.0X +agg w/o group wholestage on 1224 / 1413 1713.5 0.6 32.4X + + + +stat functions + + --- End diff -- @davies Do you know how to generate there benchmark: https://github.com/apache/spark/blob/3c3eebc8734e36e61f4627e2c517fbbe342b3b42/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala#L70-L78 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22484: [SPARK-25476][TEST] Refactor AggregateBenchmark t...
GitHub user wangyum opened a pull request: https://github.com/apache/spark/pull/22484 [SPARK-25476][TEST] Refactor AggregateBenchmark to use main method ## What changes were proposed in this pull request? Refactor `AggregateBenchmark` to use main method. To gererate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.AggregateBenchmark" ``` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangyum/spark SPARK-25476 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22484 commit 649f2965188efcfa0b1d2b5acb4c0f057ecd3788 Author: Yuming Wang Date: 2018-09-20T07:23:46Z Refactor AggregateBenchmark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org