Repository: spark Updated Branches: refs/heads/master 6540c2f8f -> ccd07b736
[SPARK-25665][SQL][TEST] Refactor ObjectHashAggregateExecBenchmark to⦠## What changes were proposed in this pull request? Refactor ObjectHashAggregateExecBenchmark to use main method ## How was this patch tested? Manually tested: ``` bin/spark-submit --class org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark --jars sql/catalyst/target/spark-catalyst_2.11-3.0.0-SNAPSHOT-tests.jar,core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar,sql/hive/target/spark-hive_2.11-3.0.0-SNAPSHOT.jar --packages org.spark-project.hive:hive-exec:1.2.1.spark2 sql/hive/target/spark-hive_2.11-3.0.0-SNAPSHOT-tests.jar ``` Generated results with: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark" ``` Closes #22804 from peter-toth/SPARK-25665. Lead-authored-by: Peter Toth <peter.t...@gmail.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccd07b73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccd07b73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccd07b73 Branch: refs/heads/master Commit: ccd07b736640c87ac6980a1c7c2d706ef3bab1bf Parents: 6540c2f Author: Peter Toth <peter.t...@gmail.com> Authored: Thu Oct 25 12:42:31 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Thu Oct 25 12:42:31 2018 -0700 ---------------------------------------------------------------------- ...ObjectHashAggregateExecBenchmark-results.txt | 45 ++++ .../ObjectHashAggregateExecBenchmark.scala | 218 +++++++++---------- 2 files changed, 152 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ccd07b73/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt ---------------------------------------------------------------------- diff --git a/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt b/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt new file mode 100644 index 0000000..f3044da --- /dev/null +++ b/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt @@ -0,0 +1,45 @@ +================================================================================================ +Hive UDAF vs Spark AF +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +hive udaf w/o group by 6370 / 6400 0.0 97193.6 1.0X +spark af w/o group by 54 / 63 1.2 820.8 118.4X +hive udaf w/ group by 4492 / 4507 0.0 68539.5 1.4X +spark af w/ group by w/o fallback 58 / 64 1.1 881.7 110.2X +spark af w/ group by w/ fallback 136 / 142 0.5 2075.0 46.8X + + +================================================================================================ +ObjectHashAggregateExec vs SortAggregateExec - typed_count +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +sort agg w/ group by 41500 / 41630 2.5 395.8 1.0X +object agg w/ group by w/o fallback 10075 / 10122 10.4 96.1 4.1X +object agg w/ group by w/ fallback 28131 / 28205 3.7 268.3 1.5X +sort agg w/o group by 6182 / 6221 17.0 59.0 6.7X +object agg w/o group by w/o fallback 5435 / 5468 19.3 51.8 7.6X + + +================================================================================================ +ObjectHashAggregateExec vs SortAggregateExec - percentile_approx +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +sort agg w/ group by 970 / 1025 2.2 462.5 1.0X +object agg w/ group by w/o fallback 772 / 798 2.7 368.1 1.3X +object agg w/ group by w/ fallback 1013 / 1044 2.1 483.1 1.0X +sort agg w/o group by 751 / 781 2.8 358.0 1.3X +object agg w/o group by w/o fallback 772 / 814 2.7 368.0 1.3X + + http://git-wip-us.apache.org/repos/asf/spark/blob/ccd07b73/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala index 3b33785..50ee096 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala @@ -21,207 +21,189 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile -import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.hive.execution.TestingTypedCount -import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with TestHiveSingleton { - ignore("Hive UDAF vs Spark AF") { - val N = 2 << 15 +/** + * Benchmark to measure hash based aggregation. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class <this class> + * --jars <spark catalyst test jar>,<spark core test jar>,<spark hive jar> + * --packages org.spark-project.hive:hive-exec:1.2.1.spark2 + * <spark hive test jar> + * 2. build/sbt "hive/test:runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain <this class>" + * Results will be written to "benchmarks/ObjectHashAggregateExecBenchmark-results.txt". + * }}} + */ +object ObjectHashAggregateExecBenchmark extends BenchmarkBase with SQLHelper { + + private val spark: SparkSession = TestHive.sparkSession + private val sql = spark.sql _ + import spark.implicits._ + private def hiveUDAFvsSparkAF(N: Int): Unit = { val benchmark = new Benchmark( name = "hive udaf vs spark af", valuesPerIteration = N, minNumIters = 5, warmupTime = 5.seconds, minTime = 10.seconds, - outputPerIteration = true + outputPerIteration = true, + output = output ) - registerHiveFunction("hive_percentile_approx", classOf[GenericUDAFPercentileApprox]) + sql( + s"CREATE TEMPORARY FUNCTION hive_percentile_approx AS '" + + s"${classOf[GenericUDAFPercentileApprox].getName}'" + ) - sparkSession.range(N).createOrReplaceTempView("t") + spark.range(N).createOrReplaceTempView("t") benchmark.addCase("hive udaf w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - sparkSession.sql("SELECT hive_percentile_approx(id, 0.5) FROM t").collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + sql("SELECT hive_percentile_approx(id, 0.5) FROM t").collect() + } } benchmark.addCase("spark af w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.sql("SELECT percentile_approx(id, 0.5) FROM t").collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + sql("SELECT percentile_approx(id, 0.5) FROM t").collect() + } } benchmark.addCase("hive udaf w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - sparkSession.sql( - s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + sql( + s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" + ).collect() + } } benchmark.addCase("spark af w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.sql( - s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + sql(s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)") + .collect() + } } benchmark.addCase("spark af w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - sparkSession.sql( - s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)" - ).collect() + withSQLConf( + SQLConf.USE_OBJECT_HASH_AGG.key -> "true", + SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "2") { + sql(s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} AS BIGINT)") + .collect() + } } benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - hive udaf w/o group by 5326 / 5408 0.0 81264.2 1.0X - spark af w/o group by 93 / 111 0.7 1415.6 57.4X - hive udaf w/ group by 3804 / 3946 0.0 58050.1 1.4X - spark af w/ group by w/o fallback 71 / 90 0.9 1085.7 74.8X - spark af w/ group by w/ fallback 98 / 111 0.7 1501.6 54.1X - */ } - ignore("ObjectHashAggregateExec vs SortAggregateExec - typed_count") { - val N: Long = 1024 * 1024 * 100 - + private def objectHashAggregateExecVsSortAggregateExecUsingTypedCount(N: Int): Unit = { val benchmark = new Benchmark( name = "object agg v.s. sort agg", valuesPerIteration = N, minNumIters = 1, warmupTime = 10.seconds, minTime = 45.seconds, - outputPerIteration = true + outputPerIteration = true, + output = output ) - import sparkSession.implicits._ - def typed_count(column: Column): Column = Column(TestingTypedCount(column.expr).toAggregateExpression()) - val df = sparkSession.range(N) + val df = spark.range(N) benchmark.addCase("sort agg w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() + } } benchmark.addCase("object agg w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() + } } benchmark.addCase("object agg w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() + withSQLConf( + SQLConf.USE_OBJECT_HASH_AGG.key -> "true", + SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "2") { + df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect() + } } benchmark.addCase("sort agg w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.select(typed_count($"id")).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + df.select(typed_count($"id")).collect() + } } benchmark.addCase("object agg w/o group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.select(typed_count($"id")).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + df.select(typed_count($"id")).collect() + } } benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sort agg w/ group by 31251 / 31908 3.4 298.0 1.0X - object agg w/ group by w/o fallback 6903 / 7141 15.2 65.8 4.5X - object agg w/ group by w/ fallback 20945 / 21613 5.0 199.7 1.5X - sort agg w/o group by 4734 / 5463 22.1 45.2 6.6X - object agg w/o group by w/o fallback 4310 / 4529 24.3 41.1 7.3X - */ } - ignore("ObjectHashAggregateExec vs SortAggregateExec - percentile_approx") { - val N = 2 << 20 - + private def objectHashAggregateExecVsSortAggregateExecUsingPercentileApprox(N: Int): Unit = { val benchmark = new Benchmark( name = "object agg v.s. sort agg", valuesPerIteration = N, minNumIters = 5, warmupTime = 15.seconds, minTime = 45.seconds, - outputPerIteration = true + outputPerIteration = true, + output = output ) - import sparkSession.implicits._ - - val df = sparkSession.range(N).coalesce(1) + val df = spark.range(N).coalesce(1) benchmark.addCase("sort agg w/ group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() + } } benchmark.addCase("object agg w/ group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() + } } benchmark.addCase("object agg w/ group by w/ fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2") - df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() + withSQLConf( + SQLConf.USE_OBJECT_HASH_AGG.key -> "true", + SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "2") { + df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 0.5)).collect() + } } benchmark.addCase("sort agg w/o group by") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false") - df.select(percentile_approx($"id", 0.5)).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + df.select(percentile_approx($"id", 0.5)).collect() + } } benchmark.addCase("object agg w/o group by w/o fallback") { _ => - sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true") - df.select(percentile_approx($"id", 0.5)).collect() + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + df.select(percentile_approx($"id", 0.5)).collect() + } } benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sort agg w/ group by 3418 / 3530 0.6 1630.0 1.0X - object agg w/ group by w/o fallback 3210 / 3314 0.7 1530.7 1.1X - object agg w/ group by w/ fallback 3419 / 3511 0.6 1630.1 1.0X - sort agg w/o group by 4336 / 4499 0.5 2067.3 0.8X - object agg w/o group by w/o fallback 4271 / 4372 0.5 2036.7 0.8X - */ - } - - private def registerHiveFunction(functionName: String, clazz: Class[_]): Unit = { - val sessionCatalog = sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog] - val functionIdentifier = FunctionIdentifier(functionName, database = None) - val func = CatalogFunction(functionIdentifier, clazz.getName, resources = Nil) - sessionCatalog.registerFunction(func, overrideIfExists = false) } private def percentile_approx( @@ -229,4 +211,18 @@ class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with TestHiv val approxPercentile = new ApproximatePercentile(column.expr, Literal(percentage)) Column(approxPercentile.toAggregateExpression(isDistinct)) } + + override def runBenchmarkSuite(): Unit = { + runBenchmark("Hive UDAF vs Spark AF") { + hiveUDAFvsSparkAF(2 << 15) + } + + runBenchmark("ObjectHashAggregateExec vs SortAggregateExec - typed_count") { + objectHashAggregateExecVsSortAggregateExecUsingTypedCount(1024 * 1024 * 100) + } + + runBenchmark("ObjectHashAggregateExec vs SortAggregateExec - percentile_approx") { + objectHashAggregateExecVsSortAggregateExecUsingPercentileApprox(2 << 20) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org