This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 01a586f3a perf: Improve aggregate expression microbenchmarks (#3021)
01a586f3a is described below
commit 01a586f3a61a421d755da4f119c3e37716f324b7
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 6 08:59:07 2026 -0700
perf: Improve aggregate expression microbenchmarks (#3021)
---
.../sql/benchmark/CometAggregateBenchmark.scala | 298 ---------------------
.../CometAggregateExpressionBenchmark.scala | 137 ++++++++++
2 files changed, 137 insertions(+), 298 deletions(-)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
deleted file mode 100644
index d63b3e710..000000000
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.benchmark
-
-import scala.util.Try
-
-import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.DecimalType
-
-import org.apache.comet.CometConf
-
-/**
- * Benchmark to measure Comet execution performance. To run this benchmark:
- * {{{
- * SPARK_GENERATE_BENCHMARK_FILES=1 make
benchmark-org.apache.spark.sql.benchmark.CometAggregateBenchmark
- * }}}
- *
- * Results will be written to
"spark/benchmarks/CometAggregateBenchmark-**results.txt".
- */
-object CometAggregateBenchmark extends CometBenchmarkBase {
- override def getSparkSession: SparkSession = {
- val session = super.getSparkSession
- session.conf.set("parquet.enable.dictionary", "false")
- session.conf.set("spark.sql.shuffle.partitions", "2")
- session
- }
-
- // Wrapper on SQL aggregation function
- case class BenchAggregateFunction(name: String, distinct: Boolean = false) {
- override def toString: String = if (distinct) s"$name(DISTINCT)" else name
- }
-
- // Aggregation functions to test
- private val benchmarkAggFuncs = Seq(
- BenchAggregateFunction("SUM"),
- BenchAggregateFunction("MIN"),
- BenchAggregateFunction("MAX"),
- BenchAggregateFunction("COUNT"),
- BenchAggregateFunction("COUNT", distinct = true),
- BenchAggregateFunction("AVG"))
-
- def aggFunctionSQL(aggregateFunction: BenchAggregateFunction, input:
String): String = {
- s"${aggregateFunction.name}(${if (aggregateFunction.distinct) s"DISTINCT
$input" else input})"
- }
-
- def singleGroupAndAggregate(
- values: Int,
- groupingKeyCardinality: Int,
- aggregateFunction: BenchAggregateFunction,
- isAnsiMode: Boolean): Unit = {
- val benchmark =
- new Benchmark(
- s"Grouped HashAgg Exec: single group key (cardinality
$groupingKeyCardinality), " +
- s"single aggregate ${aggregateFunction.toString}, ansi mode enabled
: ${isAnsiMode}",
- values,
- output = output)
-
- withTempPath { dir =>
- withTempTable("parquetV1Table") {
- prepareTable(
- dir,
- spark.sql(s"SELECT value, floor(rand() * $groupingKeyCardinality) as
key FROM $tbl"))
-
- val functionSQL = aggFunctionSQL(aggregateFunction, "value")
- val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY
key"
-
- benchmark.addCase(
- s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
- _ =>
- withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.addCase(
- s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
- _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.run()
- }
- }
- }
-
- def singleGroupAndAggregateDecimal(
- values: Int,
- dataType: DecimalType,
- groupingKeyCardinality: Int,
- aggregateFunction: BenchAggregateFunction,
- isAnsiMode: Boolean): Unit = {
- val benchmark =
- new Benchmark(
- s"Grouped HashAgg Exec: single group key (cardinality
$groupingKeyCardinality), " +
- s"single aggregate ${aggregateFunction.toString} on decimal",
- values,
- output = output)
-
- val df = makeDecimalDataFrame(values, dataType, false);
-
- withTempPath { dir =>
- withTempTable("parquetV1Table") {
- df.createOrReplaceTempView(tbl)
- prepareTable(
- dir,
- spark.sql(
- s"SELECT dec as value, floor(rand() * $groupingKeyCardinality) as
key FROM $tbl"))
-
- val functionSQL = aggFunctionSQL(aggregateFunction, "value")
- val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY
key"
-
- withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- benchmark.addCase(
- s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
- _ =>
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.addCase(
- s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
- _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.run()
- }
- }
- }
-
- def multiGroupKeys(
- values: Int,
- groupingKeyCard: Int,
- aggregateFunction: BenchAggregateFunction,
- isAnsiMode: Boolean): Unit = {
- val benchmark =
- new Benchmark(
- s"Grouped HashAgg Exec: multiple group keys (cardinality
$groupingKeyCard), " +
- s"single aggregate ${aggregateFunction.toString}",
- values,
- output = output)
-
- withTempPath { dir =>
- withTempTable("parquetV1Table") {
- prepareTable(
- dir,
- spark.sql(
- s"SELECT value, floor(rand() * $groupingKeyCard) as key1, " +
- s"floor(rand() * $groupingKeyCard) as key2 FROM $tbl"))
-
- val functionSQL = aggFunctionSQL(aggregateFunction, "value")
- val query =
- s"SELECT key1, key2, $functionSQL FROM parquetV1Table GROUP BY key1,
key2"
-
- benchmark.addCase(
- s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
- _ =>
- withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.addCase(
- s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
- _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G",
- SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.run()
- }
- }
- }
-
- def multiAggregates(
- values: Int,
- groupingKeyCard: Int,
- aggregateFunction: BenchAggregateFunction,
- isAnsiMode: Boolean): Unit = {
- val benchmark =
- new Benchmark(
- s"Grouped HashAgg Exec: single group key (cardinality
$groupingKeyCard), " +
- s"multiple aggregates ${aggregateFunction.toString} isANSIMode:
${isAnsiMode.toString}",
- values,
- output = output)
-
- withTempPath { dir =>
- withTempTable("parquetV1Table") {
- prepareTable(
- dir,
- spark.sql(
- s"SELECT value as value1, value as value2, floor(rand() *
$groupingKeyCard) as key " +
- s"FROM $tbl"))
-
- val functionSQL1 = aggFunctionSQL(aggregateFunction, "value1")
- val functionSQL2 = aggFunctionSQL(aggregateFunction, "value2")
-
- val query = s"SELECT key, $functionSQL1, $functionSQL2 " +
- "FROM parquetV1Table GROUP BY key"
-
- benchmark.addCase(
- s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
- _ =>
- withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.addCase(
- s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
- _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- Try { spark.sql(query).noop() }
- }
- }
-
- benchmark.run()
- }
- }
- }
-
- override def runCometBenchmark(mainArgs: Array[String]): Unit = {
- val total = 1024 * 1024 * 10
- val combinations = List(100, 1024, 1024 * 1024) // number of distinct
groups
- benchmarkAggFuncs.foreach { aggFunc =>
- Seq(true, false).foreach(k => {
- runBenchmarkWithTable(
- s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
- total) { v =>
- for (card <- combinations) {
- singleGroupAndAggregate(v, card, aggFunc, k)
- }
- }
-
- runBenchmarkWithTable(
- s"Grouped Aggregate (multiple group keys + single aggregate
$aggFunc)",
- total) { v =>
- for (card <- combinations) {
- multiGroupKeys(v, card, aggFunc, k)
- }
- }
-
- runBenchmarkWithTable(
- s"Grouped Aggregate (single group key + multiple aggregates
$aggFunc)",
- total) { v =>
- for (card <- combinations) {
- multiAggregates(v, card, aggFunc, k)
- }
- }
-
- runBenchmarkWithTable(
- s"Grouped Aggregate (single group key + single aggregate $aggFunc on
decimal)",
- total) { v =>
- for (card <- combinations) {
- singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card,
aggFunc, k)
- }
- }
- })
- }
- }
-}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala
new file mode 100644
index 000000000..4ea06109c
--- /dev/null
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+case class AggExprConfig(
+ name: String,
+ query: String,
+ extraCometConfigs: Map[String, String] = Map.empty)
+
+/**
+ * Comprehensive benchmark for Comet aggregate functions. To run this
benchmark:
+ * {{{
+ * SPARK_GENERATE_BENCHMARK_FILES=1 make
benchmark-org.apache.spark.sql.benchmark.CometAggregateExpressionBenchmark
+ * }}}
+ * Results will be written to
"spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt".
+ */
+object CometAggregateExpressionBenchmark extends CometBenchmarkBase {
+
+ private val basicAggregates = List(
+ AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"),
+ AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP
BY grp"),
+ AggExprConfig(
+ "count_distinct",
+ "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"),
+ AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig(
+ "first_ignore_nulls",
+ "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"),
+ AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig(
+ "last_ignore_nulls",
+ "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp"))
+
+ private val statisticalAggregates = List(
+ AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM
parquetV1Table GROUP BY grp"),
+ AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM
parquetV1Table GROUP BY grp"),
+ AggExprConfig(
+ "covar_samp",
+ "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig(
+ "covar_pop",
+ "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY
grp"),
+ AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM
parquetV1Table GROUP BY grp"))
+
+ private val bitwiseAggregates = List(
+ AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP
BY grp"),
+ AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP
BY grp"),
+ AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP
BY grp"))
+
+ // Additional structural tests (multiple group keys, multiple aggregates)
+ private val multiKeyAggregates = List(
+ AggExprConfig("sum_multi_key", "SELECT SUM(c_int) FROM parquetV1Table
GROUP BY grp, grp2"),
+ AggExprConfig("avg_multi_key", "SELECT AVG(c_double) FROM parquetV1Table
GROUP BY grp, grp2"))
+
+ private val multiAggregates = List(
+ AggExprConfig("sum_sum", "SELECT SUM(c_int), SUM(c_long) FROM
parquetV1Table GROUP BY grp"),
+ AggExprConfig("min_max", "SELECT MIN(c_int), MAX(c_int) FROM
parquetV1Table GROUP BY grp"),
+ AggExprConfig(
+ "count_sum_avg",
+ "SELECT COUNT(*), SUM(c_int), AVG(c_double) FROM parquetV1Table GROUP BY
grp"))
+
+ // Decimal aggregates
+ private val decimalAggregates = List(
+ AggExprConfig("sum_decimal", "SELECT SUM(c_decimal) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("avg_decimal", "SELECT AVG(c_decimal) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("min_decimal", "SELECT MIN(c_decimal) FROM parquetV1Table
GROUP BY grp"),
+ AggExprConfig("max_decimal", "SELECT MAX(c_decimal) FROM parquetV1Table
GROUP BY grp"))
+
+ // High cardinality tests
+ private val highCardinalityAggregates = List(
+ AggExprConfig(
+ "sum_high_card",
+ "SELECT SUM(c_int) FROM parquetV1Table GROUP BY high_card_grp"),
+ AggExprConfig(
+ "count_distinct_high_card",
+ "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY
high_card_grp"))
+
+ override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+ val values = 1024 * 1024
+
+ runBenchmarkWithTable("Aggregate function benchmarks", values) { v =>
+ withTempPath { dir =>
+ withTempTable("parquetV1Table") {
+ prepareTable(
+ dir,
+ spark.sql(s"""
+ SELECT
+ CAST(value % 1000 AS INT) AS grp,
+ CAST(value % 100 AS INT) AS grp2,
+ CAST(value % 100000 AS INT) AS high_card_grp,
+ CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000)
- 5000 AS INT) END AS c_int,
+ CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS
LONG) END AS c_long,
+ CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000)
/ 100.0 AS DOUBLE) END AS c_double,
+ CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) /
50.0 AS DOUBLE) END AS c_double2,
+ CASE WHEN value % 100 = 4 THEN NULL ELSE CAST((value % 10000 -
5000) / 100.0 AS DECIMAL(18, 10)) END AS c_decimal
+ FROM $tbl
+ """))
+
+ val allAggregates = basicAggregates ++ statisticalAggregates ++
bitwiseAggregates ++
+ multiKeyAggregates ++ multiAggregates ++ decimalAggregates ++
highCardinalityAggregates
+
+ allAggregates.foreach { config =>
+ runExpressionBenchmark(config.name, v, config.query,
config.extraCometConfigs)
+ }
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]