This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f45808878 minor: Refactor expression microbenchmarks to remove 
duplicate code (#2956)
f45808878 is described below

commit f458088788c50152f4942ab3e0cbc5ed64f137f6
Author: Andy Grove <[email protected]>
AuthorDate: Mon Dec 22 14:13:41 2025 -0700

    minor: Refactor expression microbenchmarks to remove duplicate code (#2956)
---
 .../sql/benchmark/CometArithmeticBenchmark.scala   | 53 +++-------------------
 .../spark/sql/benchmark/CometBenchmarkBase.scala   | 48 ++++++++++++++++++++
 .../spark/sql/benchmark/CometCastBenchmark.scala   | 42 ++---------------
 .../CometConditionalExpressionBenchmark.scala      | 49 ++------------------
 .../CometDatetimeExpressionBenchmark.scala         | 12 ++---
 .../benchmark/CometJsonExpressionBenchmark.scala   | 31 ++-----------
 .../CometPredicateExpressionBenchmark.scala        | 27 +----------
 .../benchmark/CometStringExpressionBenchmark.scala | 31 ++-----------
 8 files changed, 77 insertions(+), 216 deletions(-)

diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
index c6fe55b56..a513aa1a7 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
@@ -19,11 +19,8 @@
 
 package org.apache.spark.sql.benchmark
 
-import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.types._
 
-import org.apache.comet.CometConf
-
 /**
  * Benchmark to measure Comet expression evaluation performance. To run this 
benchmark:
  * `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -35,10 +32,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
 
   def integerArithmeticBenchmark(values: Int, op: BinaryOp, useDictionary: 
Boolean): Unit = {
     val dataType = IntegerType
-    val benchmark = new Benchmark(
-      s"Binary op ${dataType.sql}, dictionary = $useDictionary",
-      values,
-      output = output)
 
     withTempPath { dir =>
       withTempTable(table) {
@@ -48,25 +41,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
             s"SELECT CAST(value AS ${dataType.sql}) AS c1, " +
               s"CAST(value AS ${dataType.sql}) c2 FROM $tbl"))
 
-        benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
-          spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
-        }
-
-        benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
-          }
-        }
-
-        benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true") {
-            spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
-          }
-        }
+        val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
+        val query = s"SELECT c1 ${op.sig} c2 FROM $table"
 
-        benchmark.run()
+        runExpressionBenchmark(name, values, query)
       }
     }
   }
@@ -76,10 +54,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
       dataType: DecimalType,
       op: BinaryOp,
       useDictionary: Boolean): Unit = {
-    val benchmark = new Benchmark(
-      s"Binary op ${dataType.sql}, dictionary = $useDictionary",
-      values,
-      output = output)
     val df = makeDecimalDataFrame(values, dataType, useDictionary)
 
     withTempPath { dir =>
@@ -87,25 +61,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
         df.createOrReplaceTempView(tbl)
         prepareTable(dir, spark.sql(s"SELECT dec AS c1, dec AS c2 FROM $tbl"))
 
-        benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
-          spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
-        }
-
-        benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
-          }
-        }
-
-        benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true") {
-            spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
-          }
-        }
+        val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
+        val query = s"SELECT c1 ${op.sig} c2 FROM $table"
 
-        benchmark.run()
+        runExpressionBenchmark(name, values, query)
       }
     }
   }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
index 5ee787ad9..8d56cefa0 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
@@ -110,6 +110,54 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
     benchmark.run()
   }
 
+  /**
+   * Runs an expression benchmark with standard cases: Spark, Comet (Scan), 
Comet (Scan + Exec).
+   * This provides a consistent benchmark structure for expression evaluation.
+   *
+   * @param name
+   *   Benchmark name
+   * @param cardinality
+   *   Number of rows being processed
+   * @param query
+   *   SQL query to benchmark
+   * @param extraCometConfigs
+   *   Additional configurations to apply for Comet cases (optional)
+   */
+  final def runExpressionBenchmark(
+      name: String,
+      cardinality: Long,
+      query: String,
+      extraCometConfigs: Map[String, String] = Map.empty): Unit = {
+    val benchmark = new Benchmark(name, cardinality, output = output)
+
+    benchmark.addCase("Spark") { _ =>
+      withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+        spark.sql(query).noop()
+      }
+    }
+
+    benchmark.addCase("Comet (Scan)") { _ =>
+      withSQLConf(
+        CometConf.COMET_ENABLED.key -> "true",
+        CometConf.COMET_EXEC_ENABLED.key -> "false") {
+        spark.sql(query).noop()
+      }
+    }
+
+    val cometExecConfigs = Map(
+      CometConf.COMET_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_ENABLED.key -> "true",
+      "spark.sql.optimizer.constantFolding.enabled" -> "false") ++ 
extraCometConfigs
+
+    benchmark.addCase("Comet (Scan + Exec)") { _ =>
+      withSQLConf(cometExecConfigs.toSeq: _*) {
+        spark.sql(query).noop()
+      }
+    }
+
+    benchmark.run()
+  }
+
   protected def prepareTable(dir: File, df: DataFrame, partition: 
Option[String] = None): Unit = {
     val testDf = if (partition.isDefined) {
       df.write.partitionBy(partition.get)
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
index b2212dfd0..975abd632 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala
@@ -19,14 +19,10 @@
 
 package org.apache.spark.sql.benchmark
 
-import scala.util.Try
-
-import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, LongType}
 
-import org.apache.comet.CometConf
 import org.apache.comet.expressions.{CometCast, CometEvalMode}
 import org.apache.comet.serde.{Compatible, Incompatible, Unsupported}
 
@@ -81,48 +77,20 @@ object CometCastBenchmark extends CometBenchmarkBase {
       toDataType: DataType,
       isAnsiMode: Boolean): Unit = {
 
-    val benchmark =
-      new Benchmark(
-        s"Cast function to : ${toDataType} , ansi mode enabled : 
${isAnsiMode}",
-        values,
-        output = output)
-
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         prepareTable(dir, spark.sql(s"SELECT value FROM $tbl"))
+
         val functionSQL = castExprSQL(toDataType, "value")
         val query = s"SELECT $functionSQL FROM parquetV1Table"
+        val name =
+          s"Cast function to : ${toDataType} , ansi mode enabled : 
${isAnsiMode}"
 
-        benchmark.addCase(
-          s"SQL Parquet - Spark Cast expr from ${fromDataType.sql} to : 
${toDataType.sql} , " +
-            s"ansi mode enabled : ${isAnsiMode}") { _ =>
-          withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-            if (isAnsiMode) {
-              Try { spark.sql(query).noop() }
-            } else {
-              spark.sql(query).noop()
-            }
-          }
-        }
+        val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString)
 
-        benchmark.addCase(
-          s"SQL Parquet - Comet Cast expr from ${fromDataType.sql} to : 
${toDataType.sql} , " +
-            s"ansi mode enabled : ${isAnsiMode}") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true",
-            SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
-            if (isAnsiMode) {
-              Try { spark.sql(query).noop() }
-            } else {
-              spark.sql(query).noop()
-            }
-          }
-        }
-        benchmark.run()
+        runExpressionBenchmark(name, values, query, extraConfigs)
       }
     }
-
   }
 
 }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
index 0dddfb36a..c5eb9ea39 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
@@ -19,10 +19,6 @@
 
 package org.apache.spark.sql.benchmark
 
-import org.apache.spark.benchmark.Benchmark
-
-import org.apache.comet.CometConf
-
 /**
  * Benchmark to measure Comet execution performance. To run this benchmark:
  * `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -32,8 +28,6 @@ import org.apache.comet.CometConf
 object CometConditionalExpressionBenchmark extends CometBenchmarkBase {
 
   def caseWhenExprBenchmark(values: Int): Unit = {
-    val benchmark = new Benchmark("Case When Expr", values, output = output)
-
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
@@ -41,56 +35,19 @@ object CometConditionalExpressionBenchmark extends 
CometBenchmarkBase {
         val query =
           "select CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0' 
END from parquetV1Table"
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(query).noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(query).noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true") {
-            spark.sql(query).noop()
-          }
-        }
-
-        benchmark.run()
+        runExpressionBenchmark("Case When Expr", values, query)
       }
     }
   }
 
   def ifExprBenchmark(values: Int): Unit = {
-    val benchmark = new Benchmark("If Expr", values, output = output)
-
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
-        val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"
-
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(query).noop()
-        }
 
-        benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(query).noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true") {
-            spark.sql(query).noop()
-          }
-        }
+        val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"
 
-        benchmark.run()
+        runExpressionBenchmark("If Expr", values, query)
       }
     }
   }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
index 0af1ecade..47eff41bb 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
@@ -39,9 +39,9 @@ object CometDatetimeExpressionBenchmark extends 
CometBenchmarkBase {
             s"select cast(timestamp_micros(cast(value/100000 as integer)) as 
date) as dt FROM $tbl"))
         Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level =>
           val isDictionary = if (useDictionary) "(Dictionary)" else ""
-          runWithComet(s"Date Truncate $isDictionary - $level", values) {
-            spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop()
-          }
+          val name = s"Date Truncate $isDictionary - $level"
+          val query = s"select trunc(dt, '$level') from parquetV1Table"
+          runExpressionBenchmark(name, values, query)
         }
       }
     }
@@ -68,9 +68,9 @@ object CometDatetimeExpressionBenchmark extends 
CometBenchmarkBase {
           "WEEK",
           "QUARTER").foreach { level =>
           val isDictionary = if (useDictionary) "(Dictionary)" else ""
-          runWithComet(s"Timestamp Truncate $isDictionary - $level", values) {
-            spark.sql(s"select date_trunc('$level', ts) from 
parquetV1Table").noop()
-          }
+          val name = s"Timestamp Truncate $isDictionary - $level"
+          val query = s"select date_trunc('$level', ts) from parquetV1Table"
+          runExpressionBenchmark(name, values, query)
         }
       }
     }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
index e8bd00bd9..5b4741ba6 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
@@ -19,7 +19,6 @@
 
 package org.apache.spark.sql.benchmark
 
-import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.catalyst.expressions.JsonToStructs
 
 import org.apache.comet.CometConf
@@ -54,8 +53,6 @@ object CometJsonExpressionBenchmark extends 
CometBenchmarkBase {
    * Generic method to run a JSON expression benchmark with the given 
configuration.
    */
   def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = {
-    val benchmark = new Benchmark(config.name, values, output = output)
-
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         // Generate data with specified JSON patterns
@@ -119,31 +116,11 @@ object CometJsonExpressionBenchmark extends 
CometBenchmarkBase {
 
         prepareTable(dir, jsonData)
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(config.query).noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(config.query).noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
-          val baseConfigs =
-            Map(
-              CometConf.COMET_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_ENABLED.key -> "true",
-              CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) 
-> "true",
-              "spark.sql.optimizer.constantFolding.enabled" -> "false")
-          val allConfigs = baseConfigs ++ config.extraCometConfigs
-
-          withSQLConf(allConfigs.toSeq: _*) {
-            spark.sql(config.query).noop()
-          }
-        }
+        val extraConfigs = Map(
+          CometConf.getExprAllowIncompatConfigKey(
+            classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs
 
-        benchmark.run()
+        runExpressionBenchmark(config.name, values, config.query, extraConfigs)
       }
     }
   }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
index 2ca924821..6506c5665 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
@@ -19,10 +19,6 @@
 
 package org.apache.spark.sql.benchmark
 
-import org.apache.spark.benchmark.Benchmark
-
-import org.apache.comet.CometConf
-
 /**
  * Benchmark to measure Comet execution performance. To run this benchmark:
  * `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -32,8 +28,6 @@ import org.apache.comet.CometConf
 object CometPredicateExpressionBenchmark extends CometBenchmarkBase {
 
   def inExprBenchmark(values: Int): Unit = {
-    val benchmark = new Benchmark("in Expr", values, output = output)
-
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         prepareTable(
@@ -41,27 +35,10 @@ object CometPredicateExpressionBenchmark extends 
CometBenchmarkBase {
           spark.sql(
             "select CASE WHEN value < 0 THEN 'negative'" +
               s" WHEN value = 0 THEN 'zero' ELSE 'positive' END c1 from $tbl"))
-        val query = "select * from parquetV1Table where c1 in ('positive', 
'zero')"
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(query).noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(query).noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
-          withSQLConf(
-            CometConf.COMET_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_ENABLED.key -> "true") {
-            spark.sql(query).noop()
-          }
-        }
+        val query = "select * from parquetV1Table where c1 in ('positive', 
'zero')"
 
-        benchmark.run()
+        runExpressionBenchmark("in Expr", values, query)
       }
     }
   }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
index d1ed8702a..41eabb851 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
@@ -19,8 +19,6 @@
 
 package org.apache.spark.sql.benchmark
 
-import org.apache.spark.benchmark.Benchmark
-
 import org.apache.comet.CometConf
 
 /**
@@ -50,37 +48,14 @@ object CometStringExpressionBenchmark extends 
CometBenchmarkBase {
    * Generic method to run a string expression benchmark with the given 
configuration.
    */
   def runStringExprBenchmark(config: StringExprConfig, values: Int): Unit = {
-    val benchmark = new Benchmark(config.name, values, output = output)
-
     withTempPath { dir =>
       withTempTable("parquetV1Table") {
         prepareTable(dir, spark.sql(s"SELECT REPEAT(CAST(value AS STRING), 
100) AS c1 FROM $tbl"))
 
-        benchmark.addCase("SQL Parquet - Spark") { _ =>
-          spark.sql(config.query).noop()
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
-          withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
-            spark.sql(config.query).noop()
-          }
-        }
-
-        benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
-          val baseConfigs =
-            Map(
-              CometConf.COMET_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_ENABLED.key -> "true",
-              CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true",
-              "spark.sql.optimizer.constantFolding.enabled" -> "false")
-          val allConfigs = baseConfigs ++ config.extraCometConfigs
-
-          withSQLConf(allConfigs.toSeq: _*) {
-            spark.sql(config.query).noop()
-          }
-        }
+        val extraConfigs =
+          Map(CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") ++ 
config.extraCometConfigs
 
-        benchmark.run()
+        runExpressionBenchmark(config.name, values, config.query, extraConfigs)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to