This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 01a586f3a perf: Improve aggregate expression microbenchmarks (#3021)
01a586f3a is described below

commit 01a586f3a61a421d755da4f119c3e37716f324b7
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 6 08:59:07 2026 -0700

    perf: Improve aggregate expression microbenchmarks (#3021)
---
 .../sql/benchmark/CometAggregateBenchmark.scala    | 298 ---------------------
 .../CometAggregateExpressionBenchmark.scala        | 137 ++++++++++
 2 files changed, 137 insertions(+), 298 deletions(-)

diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
deleted file mode 100644
index d63b3e710..000000000
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.benchmark
-
-import scala.util.Try
-
-import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.DecimalType
-
-import org.apache.comet.CometConf
-
-/**
- * Benchmark to measure Comet execution performance. To run this benchmark:
- * {{{
- *   SPARK_GENERATE_BENCHMARK_FILES=1 make 
benchmark-org.apache.spark.sql.benchmark.CometAggregateBenchmark
- * }}}
- *
- * Results will be written to 
"spark/benchmarks/CometAggregateBenchmark-**results.txt".
- */
-object CometAggregateBenchmark extends CometBenchmarkBase {
-  override def getSparkSession: SparkSession = {
-    val session = super.getSparkSession
-    session.conf.set("parquet.enable.dictionary", "false")
-    session.conf.set("spark.sql.shuffle.partitions", "2")
-    session
-  }
-
-  // Wrapper on SQL aggregation function
-  case class BenchAggregateFunction(name: String, distinct: Boolean = false) {
-    override def toString: String = if (distinct) s"$name(DISTINCT)" else name
-  }
-
-  // Aggregation functions to test
-  private val benchmarkAggFuncs = Seq(
-    BenchAggregateFunction("SUM"),
-    BenchAggregateFunction("MIN"),
-    BenchAggregateFunction("MAX"),
-    BenchAggregateFunction("COUNT"),
-    BenchAggregateFunction("COUNT", distinct = true),
-    BenchAggregateFunction("AVG"))
-
-  def aggFunctionSQL(aggregateFunction: BenchAggregateFunction, input: 
String): String = {
-    s"${aggregateFunction.name}(${if (aggregateFunction.distinct) s"DISTINCT 
$input" else input})"
-  }
-
-  def singleGroupAndAggregate(
-      values: Int,
-      groupingKeyCardinality: Int,
-      aggregateFunction: BenchAggregateFunction,
-      isAnsiMode: Boolean): Unit = {
-    val benchmark =
-      new Benchmark(
-        s"Grouped HashAgg Exec: single group key (cardinality 
$groupingKeyCardinality), " +
-          s"single aggregate ${aggregateFunction.toString}, ansi mode enabled 
: ${isAnsiMode}",
-        values,
-        output = output)
-
-    withTempPath { dir =>
-      withTempTable("parquetV1Table") {
-        prepareTable(
-          dir,
-          spark.sql(s"SELECT value, floor(rand() * $groupingKeyCardinality) as 
key FROM $tbl"))
-
-        val functionSQL = aggFunctionSQL(aggregateFunction, "value")
-        val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY 
key"
-
-        benchmark.addCase(
-          s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode 
enabled : ${isAnsiMode}") {
-          _ =>
-            withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.addCase(
-          s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode 
enabled : ${isAnsiMode}") {
-          _ =>
-            withSQLConf(
-              CometConf.COMET_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_ENABLED.key -> "true",
-              SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.run()
-      }
-    }
-  }
-
-  def singleGroupAndAggregateDecimal(
-      values: Int,
-      dataType: DecimalType,
-      groupingKeyCardinality: Int,
-      aggregateFunction: BenchAggregateFunction,
-      isAnsiMode: Boolean): Unit = {
-    val benchmark =
-      new Benchmark(
-        s"Grouped HashAgg Exec: single group key (cardinality 
$groupingKeyCardinality), " +
-          s"single aggregate ${aggregateFunction.toString} on decimal",
-        values,
-        output = output)
-
-    val df = makeDecimalDataFrame(values, dataType, false);
-
-    withTempPath { dir =>
-      withTempTable("parquetV1Table") {
-        df.createOrReplaceTempView(tbl)
-        prepareTable(
-          dir,
-          spark.sql(
-            s"SELECT dec as value, floor(rand() * $groupingKeyCardinality) as 
key FROM $tbl"))
-
-        val functionSQL = aggFunctionSQL(aggregateFunction, "value")
-        val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY 
key"
-
-        withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-          benchmark.addCase(
-            s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode 
enabled : ${isAnsiMode}") {
-            _ =>
-              Try { spark.sql(query).noop() }
-          }
-        }
-
-        benchmark.addCase(
-          s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode 
enabled : ${isAnsiMode}") {
-          _ =>
-            withSQLConf(
-              CometConf.COMET_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_ENABLED.key -> "true",
-              SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.run()
-      }
-    }
-  }
-
-  def multiGroupKeys(
-      values: Int,
-      groupingKeyCard: Int,
-      aggregateFunction: BenchAggregateFunction,
-      isAnsiMode: Boolean): Unit = {
-    val benchmark =
-      new Benchmark(
-        s"Grouped HashAgg Exec: multiple group keys (cardinality 
$groupingKeyCard), " +
-          s"single aggregate ${aggregateFunction.toString}",
-        values,
-        output = output)
-
-    withTempPath { dir =>
-      withTempTable("parquetV1Table") {
-        prepareTable(
-          dir,
-          spark.sql(
-            s"SELECT value, floor(rand() * $groupingKeyCard) as key1, " +
-              s"floor(rand() * $groupingKeyCard) as key2 FROM $tbl"))
-
-        val functionSQL = aggFunctionSQL(aggregateFunction, "value")
-        val query =
-          s"SELECT key1, key2, $functionSQL FROM parquetV1Table GROUP BY key1, 
key2"
-
-        benchmark.addCase(
-          s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: 
${isAnsiMode.toString}") {
-          _ =>
-            withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.addCase(
-          s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: 
${isAnsiMode.toString}") {
-          _ =>
-            withSQLConf(
-              CometConf.COMET_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_ENABLED.key -> "true",
-              CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G",
-              SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.run()
-      }
-    }
-  }
-
-  def multiAggregates(
-      values: Int,
-      groupingKeyCard: Int,
-      aggregateFunction: BenchAggregateFunction,
-      isAnsiMode: Boolean): Unit = {
-    val benchmark =
-      new Benchmark(
-        s"Grouped HashAgg Exec: single group key (cardinality 
$groupingKeyCard), " +
-          s"multiple aggregates ${aggregateFunction.toString} isANSIMode: 
${isAnsiMode.toString}",
-        values,
-        output = output)
-
-    withTempPath { dir =>
-      withTempTable("parquetV1Table") {
-        prepareTable(
-          dir,
-          spark.sql(
-            s"SELECT value as value1, value as value2, floor(rand() * 
$groupingKeyCard) as key " +
-              s"FROM $tbl"))
-
-        val functionSQL1 = aggFunctionSQL(aggregateFunction, "value1")
-        val functionSQL2 = aggFunctionSQL(aggregateFunction, "value2")
-
-        val query = s"SELECT key, $functionSQL1, $functionSQL2 " +
-          "FROM parquetV1Table GROUP BY key"
-
-        benchmark.addCase(
-          s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: 
${isAnsiMode.toString}") {
-          _ =>
-            withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.addCase(
-          s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: 
${isAnsiMode.toString}") {
-          _ =>
-            withSQLConf(
-              CometConf.COMET_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_ENABLED.key -> "true",
-              SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-              Try { spark.sql(query).noop() }
-            }
-        }
-
-        benchmark.run()
-      }
-    }
-  }
-
-  override def runCometBenchmark(mainArgs: Array[String]): Unit = {
-    val total = 1024 * 1024 * 10
-    val combinations = List(100, 1024, 1024 * 1024) // number of distinct 
groups
-    benchmarkAggFuncs.foreach { aggFunc =>
-      Seq(true, false).foreach(k => {
-        runBenchmarkWithTable(
-          s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
-          total) { v =>
-          for (card <- combinations) {
-            singleGroupAndAggregate(v, card, aggFunc, k)
-          }
-        }
-
-        runBenchmarkWithTable(
-          s"Grouped Aggregate (multiple group keys + single aggregate 
$aggFunc)",
-          total) { v =>
-          for (card <- combinations) {
-            multiGroupKeys(v, card, aggFunc, k)
-          }
-        }
-
-        runBenchmarkWithTable(
-          s"Grouped Aggregate (single group key + multiple aggregates 
$aggFunc)",
-          total) { v =>
-          for (card <- combinations) {
-            multiAggregates(v, card, aggFunc, k)
-          }
-        }
-
-        runBenchmarkWithTable(
-          s"Grouped Aggregate (single group key + single aggregate $aggFunc on 
decimal)",
-          total) { v =>
-          for (card <- combinations) {
-            singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, 
aggFunc, k)
-          }
-        }
-      })
-    }
-  }
-}
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala
new file mode 100644
index 000000000..4ea06109c
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.benchmark
+
+case class AggExprConfig(
+    name: String,
+    query: String,
+    extraCometConfigs: Map[String, String] = Map.empty)
+
+/**
+ * Comprehensive benchmark for Comet aggregate functions. To run this 
benchmark:
+ * {{{
+ *   SPARK_GENERATE_BENCHMARK_FILES=1 make 
benchmark-org.apache.spark.sql.benchmark.CometAggregateExpressionBenchmark
+ * }}}
+ * Results will be written to 
"spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt".
+ */
+object CometAggregateExpressionBenchmark extends CometBenchmarkBase {
+
+  private val basicAggregates = List(
+    AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"),
+    AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP 
BY grp"),
+    AggExprConfig(
+      "count_distinct",
+      "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"),
+    AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig(
+      "first_ignore_nulls",
+      "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"),
+    AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig(
+      "last_ignore_nulls",
+      "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp"))
+
+  private val statisticalAggregates = List(
+    AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM 
parquetV1Table GROUP BY grp"),
+    AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM 
parquetV1Table GROUP BY grp"),
+    AggExprConfig(
+      "covar_samp",
+      "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig(
+      "covar_pop",
+      "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY 
grp"),
+    AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM 
parquetV1Table GROUP BY grp"))
+
+  private val bitwiseAggregates = List(
+    AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP 
BY grp"),
+    AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP 
BY grp"),
+    AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP 
BY grp"))
+
+  // Additional structural tests (multiple group keys, multiple aggregates)
+  private val multiKeyAggregates = List(
+    AggExprConfig("sum_multi_key", "SELECT SUM(c_int) FROM parquetV1Table 
GROUP BY grp, grp2"),
+    AggExprConfig("avg_multi_key", "SELECT AVG(c_double) FROM parquetV1Table 
GROUP BY grp, grp2"))
+
+  private val multiAggregates = List(
+    AggExprConfig("sum_sum", "SELECT SUM(c_int), SUM(c_long) FROM 
parquetV1Table GROUP BY grp"),
+    AggExprConfig("min_max", "SELECT MIN(c_int), MAX(c_int) FROM 
parquetV1Table GROUP BY grp"),
+    AggExprConfig(
+      "count_sum_avg",
+      "SELECT COUNT(*), SUM(c_int), AVG(c_double) FROM parquetV1Table GROUP BY 
grp"))
+
+  // Decimal aggregates
+  private val decimalAggregates = List(
+    AggExprConfig("sum_decimal", "SELECT SUM(c_decimal) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("avg_decimal", "SELECT AVG(c_decimal) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("min_decimal", "SELECT MIN(c_decimal) FROM parquetV1Table 
GROUP BY grp"),
+    AggExprConfig("max_decimal", "SELECT MAX(c_decimal) FROM parquetV1Table 
GROUP BY grp"))
+
+  // High cardinality tests
+  private val highCardinalityAggregates = List(
+    AggExprConfig(
+      "sum_high_card",
+      "SELECT SUM(c_int) FROM parquetV1Table GROUP BY high_card_grp"),
+    AggExprConfig(
+      "count_distinct_high_card",
+      "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY 
high_card_grp"))
+
+  override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+    val values = 1024 * 1024
+
+    runBenchmarkWithTable("Aggregate function benchmarks", values) { v =>
+      withTempPath { dir =>
+        withTempTable("parquetV1Table") {
+          prepareTable(
+            dir,
+            spark.sql(s"""
+              SELECT
+                CAST(value % 1000 AS INT) AS grp,
+                CAST(value % 100 AS INT) AS grp2,
+                CAST(value % 100000 AS INT) AS high_card_grp,
+                CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000) 
- 5000 AS INT) END AS c_int,
+                CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS 
LONG) END AS c_long,
+                CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000) 
/ 100.0 AS DOUBLE) END AS c_double,
+                CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) / 
50.0 AS DOUBLE) END AS c_double2,
+                CASE WHEN value % 100 = 4 THEN NULL ELSE CAST((value % 10000 - 
5000) / 100.0 AS DECIMAL(18, 10)) END AS c_decimal
+              FROM $tbl
+            """))
+
+          val allAggregates = basicAggregates ++ statisticalAggregates ++ 
bitwiseAggregates ++
+            multiKeyAggregates ++ multiAggregates ++ decimalAggregates ++ 
highCardinalityAggregates
+
+          allAggregates.foreach { config =>
+            runExpressionBenchmark(config.name, v, config.query, 
config.extraCometConfigs)
+          }
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to