This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f45808878 minor: Refactor expression microbenchmarks to remove
duplicate code (#2956)
f45808878 is described below
commit f458088788c50152f4942ab3e0cbc5ed64f137f6
Author: Andy Grove <[email protected]>
AuthorDate: Mon Dec 22 14:13:41 2025 -0700
minor: Refactor expression microbenchmarks to remove duplicate code (#2956)
---
.../sql/benchmark/CometArithmeticBenchmark.scala | 53 +++-------------------
.../spark/sql/benchmark/CometBenchmarkBase.scala | 48 ++++++++++++++++++++
.../spark/sql/benchmark/CometCastBenchmark.scala | 42 ++---------------
.../CometConditionalExpressionBenchmark.scala | 49 ++------------------
.../CometDatetimeExpressionBenchmark.scala | 12 ++---
.../benchmark/CometJsonExpressionBenchmark.scala | 31 ++-----------
.../CometPredicateExpressionBenchmark.scala | 27 +----------
.../benchmark/CometStringExpressionBenchmark.scala | 31 ++-----------
8 files changed, 77 insertions(+), 216 deletions(-)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
index c6fe55b56..a513aa1a7 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
@@ -19,11 +19,8 @@
package org.apache.spark.sql.benchmark
-import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.types._
-import org.apache.comet.CometConf
-
/**
* Benchmark to measure Comet expression evaluation performance. To run this
benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -35,10 +32,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
def integerArithmeticBenchmark(values: Int, op: BinaryOp, useDictionary:
Boolean): Unit = {
val dataType = IntegerType
- val benchmark = new Benchmark(
- s"Binary op ${dataType.sql}, dictionary = $useDictionary",
- values,
- output = output)
withTempPath { dir =>
withTempTable(table) {
@@ -48,25 +41,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
s"SELECT CAST(value AS ${dataType.sql}) AS c1, " +
s"CAST(value AS ${dataType.sql}) c2 FROM $tbl"))
- benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
- spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
- }
-
- benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
- }
- }
-
- benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
- }
- }
+ val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
+ val query = s"SELECT c1 ${op.sig} c2 FROM $table"
- benchmark.run()
+ runExpressionBenchmark(name, values, query)
}
}
}
@@ -76,10 +54,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
dataType: DecimalType,
op: BinaryOp,
useDictionary: Boolean): Unit = {
- val benchmark = new Benchmark(
- s"Binary op ${dataType.sql}, dictionary = $useDictionary",
- values,
- output = output)
val df = makeDecimalDataFrame(values, dataType, useDictionary)
withTempPath { dir =>
@@ -87,25 +61,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
df.createOrReplaceTempView(tbl)
prepareTable(dir, spark.sql(s"SELECT dec AS c1, dec AS c2 FROM $tbl"))
- benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
- spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
- }
-
- benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
- }
- }
-
- benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
- }
- }
+ val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
+ val query = s"SELECT c1 ${op.sig} c2 FROM $table"
- benchmark.run()
+ runExpressionBenchmark(name, values, query)
}
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
index 5ee787ad9..8d56cefa0 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
@@ -110,6 +110,54 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
benchmark.run()
}
+ /**
+ * Runs an expression benchmark with standard cases: Spark, Comet (Scan),
Comet (Scan + Exec).
+ * This provides a consistent benchmark structure for expression evaluation.
+ *
+ * @param name
+ * Benchmark name
+ * @param cardinality
+ * Number of rows being processed
+ * @param query
+ * SQL query to benchmark
+ * @param extraCometConfigs
+ * Additional configurations to apply for Comet cases (optional)
+ */
+ final def runExpressionBenchmark(
+ name: String,
+ cardinality: Long,
+ query: String,
+ extraCometConfigs: Map[String, String] = Map.empty): Unit = {
+ val benchmark = new Benchmark(name, cardinality, output = output)
+
+ benchmark.addCase("Spark") { _ =>
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ spark.sql(query).noop()
+ }
+ }
+
+ benchmark.addCase("Comet (Scan)") { _ =>
+ withSQLConf(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "false") {
+ spark.sql(query).noop()
+ }
+ }
+
+ val cometExecConfigs = Map(
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ "spark.sql.optimizer.constantFolding.enabled" -> "false") ++
extraCometConfigs
+
+ benchmark.addCase("Comet (Scan + Exec)") { _ =>
+ withSQLConf(cometExecConfigs.toSeq: _*) {
+ spark.sql(query).noop()
+ }
+ }
+
+ benchmark.run()
+ }
+
protected def prepareTable(dir: File, df: DataFrame, partition:
Option[String] = None): Unit = {
val testDf = if (partition.isDefined) {
df.write.partitionBy(partition.get)
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
index b2212dfd0..975abd632 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
@@ -19,14 +19,10 @@
package org.apache.spark.sql.benchmark
-import scala.util.Try
-
-import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, LongType}
-import org.apache.comet.CometConf
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{Compatible, Incompatible, Unsupported}
@@ -81,48 +77,20 @@ object CometCastBenchmark extends CometBenchmarkBase {
toDataType: DataType,
isAnsiMode: Boolean): Unit = {
- val benchmark =
- new Benchmark(
- s"Cast function to : ${toDataType} , ansi mode enabled :
${isAnsiMode}",
- values,
- output = output)
-
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT value FROM $tbl"))
+
val functionSQL = castExprSQL(toDataType, "value")
val query = s"SELECT $functionSQL FROM parquetV1Table"
+ val name =
+ s"Cast function to : ${toDataType} , ansi mode enabled :
${isAnsiMode}"
- benchmark.addCase(
- s"SQL Parquet - Spark Cast expr from ${fromDataType.sql} to :
${toDataType.sql} , " +
- s"ansi mode enabled : ${isAnsiMode}") { _ =>
- withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- if (isAnsiMode) {
- Try { spark.sql(query).noop() }
- } else {
- spark.sql(query).noop()
- }
- }
- }
+ val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString)
- benchmark.addCase(
- s"SQL Parquet - Comet Cast expr from ${fromDataType.sql} to :
${toDataType.sql} , " +
- s"ansi mode enabled : ${isAnsiMode}") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
- if (isAnsiMode) {
- Try { spark.sql(query).noop() }
- } else {
- spark.sql(query).noop()
- }
- }
- }
- benchmark.run()
+ runExpressionBenchmark(name, values, query, extraConfigs)
}
}
-
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
index 0dddfb36a..c5eb9ea39 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
@@ -19,10 +19,6 @@
package org.apache.spark.sql.benchmark
-import org.apache.spark.benchmark.Benchmark
-
-import org.apache.comet.CometConf
-
/**
* Benchmark to measure Comet execution performance. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -32,8 +28,6 @@ import org.apache.comet.CometConf
object CometConditionalExpressionBenchmark extends CometBenchmarkBase {
def caseWhenExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Case When Expr", values, output = output)
-
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
@@ -41,56 +35,19 @@ object CometConditionalExpressionBenchmark extends
CometBenchmarkBase {
val query =
"select CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0'
END from parquetV1Table"
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(query).noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
- }
-
- benchmark.run()
+ runExpressionBenchmark("Case When Expr", values, query)
}
}
}
def ifExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("If Expr", values, output = output)
-
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
- val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"
-
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(query).noop()
- }
- benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
- }
+ val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"
- benchmark.run()
+ runExpressionBenchmark("If Expr", values, query)
}
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
index 0af1ecade..47eff41bb 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
@@ -39,9 +39,9 @@ object CometDatetimeExpressionBenchmark extends
CometBenchmarkBase {
s"select cast(timestamp_micros(cast(value/100000 as integer)) as
date) as dt FROM $tbl"))
Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level =>
val isDictionary = if (useDictionary) "(Dictionary)" else ""
- runWithComet(s"Date Truncate $isDictionary - $level", values) {
- spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop()
- }
+ val name = s"Date Truncate $isDictionary - $level"
+ val query = s"select trunc(dt, '$level') from parquetV1Table"
+ runExpressionBenchmark(name, values, query)
}
}
}
@@ -68,9 +68,9 @@ object CometDatetimeExpressionBenchmark extends
CometBenchmarkBase {
"WEEK",
"QUARTER").foreach { level =>
val isDictionary = if (useDictionary) "(Dictionary)" else ""
- runWithComet(s"Timestamp Truncate $isDictionary - $level", values) {
- spark.sql(s"select date_trunc('$level', ts) from
parquetV1Table").noop()
- }
+ val name = s"Timestamp Truncate $isDictionary - $level"
+ val query = s"select date_trunc('$level', ts) from parquetV1Table"
+ runExpressionBenchmark(name, values, query)
}
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
index e8bd00bd9..5b4741ba6 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
@@ -19,7 +19,6 @@
package org.apache.spark.sql.benchmark
-import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
import org.apache.comet.CometConf
@@ -54,8 +53,6 @@ object CometJsonExpressionBenchmark extends
CometBenchmarkBase {
* Generic method to run a JSON expression benchmark with the given
configuration.
*/
def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = {
- val benchmark = new Benchmark(config.name, values, output = output)
-
withTempPath { dir =>
withTempTable("parquetV1Table") {
// Generate data with specified JSON patterns
@@ -119,31 +116,11 @@ object CometJsonExpressionBenchmark extends
CometBenchmarkBase {
prepareTable(dir, jsonData)
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(config.query).noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(config.query).noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
- val baseConfigs =
- Map(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs])
-> "true",
- "spark.sql.optimizer.constantFolding.enabled" -> "false")
- val allConfigs = baseConfigs ++ config.extraCometConfigs
-
- withSQLConf(allConfigs.toSeq: _*) {
- spark.sql(config.query).noop()
- }
- }
+ val extraConfigs = Map(
+ CometConf.getExprAllowIncompatConfigKey(
+ classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs
- benchmark.run()
+ runExpressionBenchmark(config.name, values, config.query, extraConfigs)
}
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
index 2ca924821..6506c5665 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
@@ -19,10 +19,6 @@
package org.apache.spark.sql.benchmark
-import org.apache.spark.benchmark.Benchmark
-
-import org.apache.comet.CometConf
-
/**
* Benchmark to measure Comet execution performance. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -32,8 +28,6 @@ import org.apache.comet.CometConf
object CometPredicateExpressionBenchmark extends CometBenchmarkBase {
def inExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("in Expr", values, output = output)
-
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(
@@ -41,27 +35,10 @@ object CometPredicateExpressionBenchmark extends
CometBenchmarkBase {
spark.sql(
"select CASE WHEN value < 0 THEN 'negative'" +
s" WHEN value = 0 THEN 'zero' ELSE 'positive' END c1 from $tbl"))
- val query = "select * from parquetV1Table where c1 in ('positive',
'zero')"
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(query).noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true") {
- spark.sql(query).noop()
- }
- }
+ val query = "select * from parquetV1Table where c1 in ('positive',
'zero')"
- benchmark.run()
+ runExpressionBenchmark("in Expr", values, query)
}
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
index d1ed8702a..41eabb851 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
@@ -19,8 +19,6 @@
package org.apache.spark.sql.benchmark
-import org.apache.spark.benchmark.Benchmark
-
import org.apache.comet.CometConf
/**
@@ -50,37 +48,14 @@ object CometStringExpressionBenchmark extends
CometBenchmarkBase {
* Generic method to run a string expression benchmark with the given
configuration.
*/
def runStringExprBenchmark(config: StringExprConfig, values: Int): Unit = {
- val benchmark = new Benchmark(config.name, values, output = output)
-
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT REPEAT(CAST(value AS STRING),
100) AS c1 FROM $tbl"))
- benchmark.addCase("SQL Parquet - Spark") { _ =>
- spark.sql(config.query).noop()
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
- spark.sql(config.query).noop()
- }
- }
-
- benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
- val baseConfigs =
- Map(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true",
- "spark.sql.optimizer.constantFolding.enabled" -> "false")
- val allConfigs = baseConfigs ++ config.extraCometConfigs
-
- withSQLConf(allConfigs.toSeq: _*) {
- spark.sql(config.query).noop()
- }
- }
+ val extraConfigs =
+ Map(CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") ++
config.extraCometConfigs
- benchmark.run()
+ runExpressionBenchmark(config.name, values, config.query, extraConfigs)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]