Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22804#discussion_r227582826 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala --- @@ -21,207 +21,212 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile -import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.hive.execution.TestingTypedCount -import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with TestHiveSingleton { - ignore("Hive UDAF vs Spark AF") { - val N = 2 << 15 - - val benchmark = new Benchmark( - name = "hive udaf vs spark af", - valuesPerIteration = N, - minNumIters = 5, - warmupTime = 5.seconds, - minTime = 10.seconds, - outputPerIteration = true - ) - - registerHiveFunction("hive_percentile_approx", classOf[GenericUDAFPercentileApprox]) - - sparkSession.range(N).createOrReplaceTempView("t") - - benchmark.addCase("hive udaf w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - sparkSession.sql("SELECT hive_percentile_approx(id, 0.5) FROM t").collect() - } - - benchmark.addCase("spark af w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.sql("SELECT percentile_approx(id, 0.5) FROM t").collect() - } - - benchmark.addCase("hive udaf w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - sparkSession.sql( - s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() - } - - benchmark.addCase("spark af w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.sql( - s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() - } - - benchmark.addCase("spark af w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - sparkSession.sql( - s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() - } - - benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - hive udaf w/o group by 5326 / 5408 0.0 81264.2 1.0X - spark af w/o group by 93 / 111 0.7 1415.6 57.4X - hive udaf w/ group by 3804 / 3946 0.0 58050.1 1.4X - spark af w/ group by w/o fallback 71 / 90 0.9 1085.7 74.8X - spark af w/ group by w/ fallback 98 / 111 0.7 1501.6 54.1X - */ - } - - ignore("ObjectHashAggregateExec vs SortAggregateExec - typed_count") { - val N: Long = 1024 * 1024 * 100 - - val benchmark = new Benchmark( - name = "object agg v.s. sort agg", - valuesPerIteration = N, - minNumIters = 1, - warmupTime = 10.seconds, - minTime = 45.seconds, - outputPerIteration = true - ) - - import sparkSession.implicits._ - - def typed_count(column: Column): Column = - Column(TestingTypedCount(column.expr).toAggregateExpression()) - - val df = sparkSession.range(N) - - benchmark.addCase("sort agg w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() - } - - benchmark.addCase("object agg w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() - } - - benchmark.addCase("object agg w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() - } - - benchmark.addCase("sort agg w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.select(typed_count($"id")).collect() - } - - benchmark.addCase("object agg w/o group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.select(typed_count($"id")).collect() - } - - benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sort agg w/ group by 31251 / 31908 3.4 298.0 1.0X - object agg w/ group by w/o fallback 6903 / 7141 15.2 65.8 4.5X - object agg w/ group by w/ fallback 20945 / 21613 5.0 199.7 1.5X - sort agg w/o group by 4734 / 5463 22.1 45.2 6.6X - object agg w/o group by w/o fallback 4310 / 4529 24.3 41.1 7.3X - */ - } - - ignore("ObjectHashAggregateExec vs SortAggregateExec - percentile_approx") { - val N = 2 << 20 - - val benchmark = new Benchmark( - name = "object agg v.s. sort agg", - valuesPerIteration = N, - minNumIters = 5, - warmupTime = 15.seconds, - minTime = 45.seconds, - outputPerIteration = true - ) - - import sparkSession.implicits._ - - val df = sparkSession.range(N).coalesce(1) - - benchmark.addCase("sort agg w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() - } - - benchmark.addCase("object agg w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() - } - - benchmark.addCase("object agg w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() +/** + * Benchmark to measure hash based aggregation. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class <this class> + * --jars <spark catalyst test jar>,<spark core test jar>,<spark hive jar> + * --packages org.spark-project.hive:hive-exec:1.2.1.spark2 + * <spark hive test jar> + * 2. build/sbt "hive/test:runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain <this class>" + * Results will be written to "benchmarks/ObjectHashAggregateExecBenchmark-results.txt". + * }}} + */ +object ObjectHashAggregateExecBenchmark extends BenchmarkBase with SQLHelper { + + val spark: SparkSession = TestHive.sparkSession + + override def runBenchmarkSuite(): Unit = { + runBenchmark("Hive UDAF vs Spark AF") { --- End diff -- Hi, @peter-toth . Thank you for making this PR. Currently, `runBenchmarkSuite` is too long. Could you make a separate function for each test case? For example, `ignore("Hive UDAF vs Spark AF")` can be a single function. And `runBenchmarkSuite` will call a series of those functions.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org