This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 77899ee1a chore: Add checks to microbenchmarks for plan running
natively in Comet (#3045)
77899ee1a is described below
commit 77899ee1aeef087d6ef9e84e607c65c5f16a0f5b
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 6 22:29:05 2026 -0700
chore: Add checks to microbenchmarks for plan running natively in Comet
(#3045)
---
.../scala/org/apache/spark/sql/CometTestBase.scala | 33 +----------
.../spark/sql/benchmark/CometBenchmarkBase.scala | 52 +++++++++--------
.../apache/spark/sql/comet/CometPlanChecker.scala | 68 ++++++++++++++++++++++
3 files changed, 100 insertions(+), 53 deletions(-)
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 7dba24bff..81ac72247 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -37,7 +37,7 @@ import
org.apache.parquet.hadoop.example.{ExampleParquetWriter, GroupWriteSuppor
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark._
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED,
MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
-import org.apache.spark.sql.comet._
+import org.apache.spark.sql.comet.CometPlanChecker
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometNativeShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -58,7 +58,8 @@ abstract class CometTestBase
with BeforeAndAfterEach
with AdaptiveSparkPlanHelper
with ShimCometSparkSessionExtensions
- with ShimCometTestBase {
+ with ShimCometTestBase
+ with CometPlanChecker {
import testImplicits._
protected val shuffleManager: String =
@@ -396,26 +397,6 @@ abstract class CometTestBase
checkPlanNotMissingInput(plan)
}
- protected def findFirstNonCometOperator(
- plan: SparkPlan,
- excludedClasses: Class[_]*): Option[SparkPlan] = {
- val wrapped = wrapCometSparkToColumnar(plan)
- wrapped.foreach {
- case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
- _: CometIcebergNativeScanExec =>
- case _: CometSinkPlaceHolder | _: CometScanWrapper =>
- case _: CometColumnarToRowExec =>
- case _: CometSparkToColumnarExec =>
- case _: CometExec | _: CometShuffleExchangeExec =>
- case _: CometBroadcastExchangeExec =>
- case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter =>
- case op if !excludedClasses.exists(c => c.isAssignableFrom(op.getClass))
=>
- return Some(op)
- case _ =>
- }
- None
- }
-
// checks the plan node has no missing inputs
// such nodes represented in plan with exclamation mark !
// example: !CometWindowExec
@@ -449,14 +430,6 @@ abstract class CometTestBase
}
}
- /** Wraps the CometRowToColumn as ScanWrapper, so the child operators will
not be checked */
- private def wrapCometSparkToColumnar(plan: SparkPlan): SparkPlan = {
- plan.transformDown {
- // don't care the native operators
- case p: CometSparkToColumnarExec => CometScanWrapper(null, p)
- }
- }
-
private var _spark: SparkSessionType = _
override protected implicit def spark: SparkSessionType = _spark
protected implicit def sqlContext: SQLContext = _spark.sqlContext
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
index 8d56cefa0..5d1d0c571 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
@@ -31,6 +31,8 @@ import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
+import org.apache.spark.sql.comet.CometPlanChecker
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DecimalType
@@ -38,7 +40,10 @@ import org.apache.spark.sql.types.DecimalType
import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions
-trait CometBenchmarkBase extends SqlBasedBenchmark {
+trait CometBenchmarkBase
+ extends SqlBasedBenchmark
+ with AdaptiveSparkPlanHelper
+ with CometPlanChecker {
override def getSparkSession: SparkSession = {
val conf = new SparkConf()
.setAppName("CometReadBenchmark")
@@ -88,28 +93,6 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
}
}
- /** Runs function `f` with Comet on and off. */
- final def runWithComet(name: String, cardinality: Long)(f: => Unit): Unit = {
- val benchmark = new Benchmark(name, cardinality, output = output)
-
- benchmark.addCase(s"$name - Spark ") { _ =>
- withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
- f
- }
- }
-
- benchmark.addCase(s"$name - Comet") { _ =>
- withSQLConf(
- CometConf.COMET_ENABLED.key -> "true",
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- SQLConf.ANSI_ENABLED.key -> "false") {
- f
- }
- }
-
- benchmark.run()
- }
-
/**
* Runs an expression benchmark with standard cases: Spark, Comet (Scan),
Comet (Scan + Exec).
* This provides a consistent benchmark structure for expression evaluation.
@@ -149,6 +132,29 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
CometConf.COMET_EXEC_ENABLED.key -> "true",
"spark.sql.optimizer.constantFolding.enabled" -> "false") ++
extraCometConfigs
+ // Check that the plan is fully Comet native before running the benchmark
+ withSQLConf(cometExecConfigs.toSeq: _*) {
+ val df = spark.sql(query)
+ df.noop()
+ val plan = stripAQEPlan(df.queryExecution.executedPlan)
+ findFirstNonCometOperator(plan) match {
+ case Some(op) =>
+ // scalastyle:off println
+ println()
+ println("=" * 80)
+ println("WARNING: Benchmark plan is NOT fully Comet native!")
+ println(s"First non-Comet operator: ${op.nodeName}")
+ println("=" * 80)
+ println("Query plan:")
+ println(plan.treeString)
+ println("=" * 80)
+ println()
+ // scalastyle:on println
+ case None =>
+ // All operators are Comet native, no warning needed
+ }
+ }
+
benchmark.addCase("Comet (Scan + Exec)") { _ =>
withSQLConf(cometExecConfigs.toSeq: _*) {
spark.sql(query).noop()
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanChecker.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanChecker.scala
new file mode 100644
index 000000000..7caac7135
--- /dev/null
+++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanChecker.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter,
SparkPlan, WholeStageCodegenExec}
+
+/**
+ * Trait providing utilities to check if a Spark plan is fully running on
Comet native operators.
+ * Used by both CometTestBase and CometBenchmarkBase.
+ */
+trait CometPlanChecker {
+
+ /**
+ * Finds the first non-Comet operator in the plan, if any.
+ *
+ * @param plan
+ * The SparkPlan to check
+ * @param excludedClasses
+ * Classes to exclude from the check (these are allowed to be non-Comet)
+ * @return
+ * Some(operator) if a non-Comet operator is found, None otherwise
+ */
+ protected def findFirstNonCometOperator(
+ plan: SparkPlan,
+ excludedClasses: Class[_]*): Option[SparkPlan] = {
+ val wrapped = wrapCometSparkToColumnar(plan)
+ wrapped.foreach {
+ case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
+ _: CometIcebergNativeScanExec =>
+ case _: CometSinkPlaceHolder | _: CometScanWrapper =>
+ case _: CometColumnarToRowExec =>
+ case _: CometSparkToColumnarExec =>
+ case _: CometExec | _: CometShuffleExchangeExec =>
+ case _: CometBroadcastExchangeExec =>
+ case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter =>
+ case op if !excludedClasses.exists(c => c.isAssignableFrom(op.getClass))
=>
+ return Some(op)
+ case _ =>
+ }
+ None
+ }
+
+ /** Wraps the CometSparkToColumnar as ScanWrapper, so the child operators
will not be checked */
+ private def wrapCometSparkToColumnar(plan: SparkPlan): SparkPlan = {
+ plan.transformDown {
+ // don't care the native operators
+ case p: CometSparkToColumnarExec => CometScanWrapper(null, p)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]