This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ecbaa7fa6 Add ANSI support to aggregate microbenchmarks (#2901)
ecbaa7fa6 is described below
commit ecbaa7fa61d6399f8fbc9bebd77bdda7b81a5c3f
Author: B Vadlamani <[email protected]>
AuthorDate: Sat Dec 13 12:52:05 2025 -0800
Add ANSI support to aggregate microbenchmarks (#2901)
---
.../sql/benchmark/CometAggregateBenchmark.scala | 167 +++++++++++++--------
1 file changed, 102 insertions(+), 65 deletions(-)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
index 5fb2d63fb..d63b3e710 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
@@ -19,8 +19,11 @@
package org.apache.spark.sql.benchmark
+import scala.util.Try
+
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DecimalType
import org.apache.comet.CometConf
@@ -52,7 +55,8 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
BenchAggregateFunction("MIN"),
BenchAggregateFunction("MAX"),
BenchAggregateFunction("COUNT"),
- BenchAggregateFunction("COUNT", distinct = true))
+ BenchAggregateFunction("COUNT", distinct = true),
+ BenchAggregateFunction("AVG"))
def aggFunctionSQL(aggregateFunction: BenchAggregateFunction, input:
String): String = {
s"${aggregateFunction.name}(${if (aggregateFunction.distinct) s"DISTINCT
$input" else input})"
@@ -61,11 +65,12 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
def singleGroupAndAggregate(
values: Int,
groupingKeyCardinality: Int,
- aggregateFunction: BenchAggregateFunction): Unit = {
+ aggregateFunction: BenchAggregateFunction,
+ isAnsiMode: Boolean): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality
$groupingKeyCardinality), " +
- s"single aggregate ${aggregateFunction.toString}",
+ s"single aggregate ${aggregateFunction.toString}, ansi mode enabled
: ${isAnsiMode}",
values,
output = output)
@@ -78,16 +83,23 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
val functionSQL = aggFunctionSQL(aggregateFunction, "value")
val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY
key"
- benchmark.addCase(s"SQL Parquet - Spark
(${aggregateFunction.toString})") { _ =>
- spark.sql(query).noop()
+ benchmark.addCase(
+ s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
+ _ =>
+ withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
- benchmark.addCase(s"SQL Parquet - Comet
(${aggregateFunction.toString})") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
+ benchmark.addCase(
+ s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
+ _ =>
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
benchmark.run()
@@ -99,7 +111,8 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
values: Int,
dataType: DecimalType,
groupingKeyCardinality: Int,
- aggregateFunction: BenchAggregateFunction): Unit = {
+ aggregateFunction: BenchAggregateFunction,
+ isAnsiMode: Boolean): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality
$groupingKeyCardinality), " +
@@ -120,16 +133,23 @@ object CometAggregateBenchmark extends CometBenchmarkBase
{
val functionSQL = aggFunctionSQL(aggregateFunction, "value")
val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY
key"
- benchmark.addCase(s"SQL Parquet - Spark
(${aggregateFunction.toString})") { _ =>
- spark.sql(query).noop()
+ withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ benchmark.addCase(
+ s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
+ _ =>
+ Try { spark.sql(query).noop() }
+ }
}
- benchmark.addCase(s"SQL Parquet - Comet
(${aggregateFunction.toString})") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
+ benchmark.addCase(
+ s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode
enabled : ${isAnsiMode}") {
+ _ =>
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
benchmark.run()
@@ -140,7 +160,8 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
def multiGroupKeys(
values: Int,
groupingKeyCard: Int,
- aggregateFunction: BenchAggregateFunction): Unit = {
+ aggregateFunction: BenchAggregateFunction,
+ isAnsiMode: Boolean): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: multiple group keys (cardinality
$groupingKeyCard), " +
@@ -160,17 +181,24 @@ object CometAggregateBenchmark extends CometBenchmarkBase
{
val query =
s"SELECT key1, key2, $functionSQL FROM parquetV1Table GROUP BY key1,
key2"
- benchmark.addCase(s"SQL Parquet - Spark
(${aggregateFunction.toString})") { _ =>
- spark.sql(query).noop()
+ benchmark.addCase(
+ s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
+ _ =>
+ withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
- benchmark.addCase(s"SQL Parquet - Comet
(${aggregateFunction.toString})") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G") {
- spark.sql(query).noop()
- }
+ benchmark.addCase(
+ s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
+ _ =>
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G",
+ SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
benchmark.run()
@@ -181,11 +209,12 @@ object CometAggregateBenchmark extends CometBenchmarkBase
{
def multiAggregates(
values: Int,
groupingKeyCard: Int,
- aggregateFunction: BenchAggregateFunction): Unit = {
+ aggregateFunction: BenchAggregateFunction,
+ isAnsiMode: Boolean): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality
$groupingKeyCard), " +
- s"multiple aggregates ${aggregateFunction.toString}",
+ s"multiple aggregates ${aggregateFunction.toString} isANSIMode:
${isAnsiMode.toString}",
values,
output = output)
@@ -203,16 +232,23 @@ object CometAggregateBenchmark extends CometBenchmarkBase
{
val query = s"SELECT key, $functionSQL1, $functionSQL2 " +
"FROM parquetV1Table GROUP BY key"
- benchmark.addCase(s"SQL Parquet - Spark
(${aggregateFunction.toString})") { _ =>
- spark.sql(query).noop()
+ benchmark.addCase(
+ s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
+ _ =>
+ withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
- benchmark.addCase(s"SQL Parquet - Comet
(${aggregateFunction.toString})") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
+ benchmark.addCase(
+ s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode:
${isAnsiMode.toString}") {
+ _ =>
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
+ Try { spark.sql(query).noop() }
+ }
}
benchmark.run()
@@ -223,39 +259,40 @@ object CometAggregateBenchmark extends CometBenchmarkBase
{
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val total = 1024 * 1024 * 10
val combinations = List(100, 1024, 1024 * 1024) // number of distinct
groups
-
benchmarkAggFuncs.foreach { aggFunc =>
- runBenchmarkWithTable(
- s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
- total) { v =>
- for (card <- combinations) {
- singleGroupAndAggregate(v, card, aggFunc)
+ Seq(true, false).foreach(k => {
+ runBenchmarkWithTable(
+ s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
+ total) { v =>
+ for (card <- combinations) {
+ singleGroupAndAggregate(v, card, aggFunc, k)
+ }
}
- }
- runBenchmarkWithTable(
- s"Grouped Aggregate (multiple group keys + single aggregate $aggFunc)",
- total) { v =>
- for (card <- combinations) {
- multiGroupKeys(v, card, aggFunc)
+ runBenchmarkWithTable(
+ s"Grouped Aggregate (multiple group keys + single aggregate
$aggFunc)",
+ total) { v =>
+ for (card <- combinations) {
+ multiGroupKeys(v, card, aggFunc, k)
+ }
}
- }
- runBenchmarkWithTable(
- s"Grouped Aggregate (single group key + multiple aggregates $aggFunc)",
- total) { v =>
- for (card <- combinations) {
- multiAggregates(v, card, aggFunc)
+ runBenchmarkWithTable(
+ s"Grouped Aggregate (single group key + multiple aggregates
$aggFunc)",
+ total) { v =>
+ for (card <- combinations) {
+ multiAggregates(v, card, aggFunc, k)
+ }
}
- }
- runBenchmarkWithTable(
- s"Grouped Aggregate (single group key + single aggregate $aggFunc on
decimal)",
- total) { v =>
- for (card <- combinations) {
- singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, aggFunc)
+ runBenchmarkWithTable(
+ s"Grouped Aggregate (single group key + single aggregate $aggFunc on
decimal)",
+ total) { v =>
+ for (card <- combinations) {
+ singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card,
aggFunc, k)
+ }
}
- }
+ })
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]