This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new fa5910efd chore: Override node name for CometSparkToColumnar (#1577)
fa5910efd is described below
commit fa5910efd927e115d1717b5f0c78fad0ece75c6c
Author: Ćukasz <[email protected]>
AuthorDate: Mon Mar 31 19:38:54 2025 +0200
chore: Override node name for CometSparkToColumnar (#1577)
Co-authored-by: Lukasz <[email protected]>
---
.../spark/sql/comet/CometSparkToColumnarExec.scala | 6 +++
.../org/apache/comet/exec/CometExecSuite.scala | 51 ++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index 9602a2868..e6ab425d0 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -52,6 +52,12 @@ case class CometSparkToColumnarExec(child: SparkPlan)
override def supportsColumnar: Boolean = true
+ override def nodeName: String = if (child.supportsColumnar) {
+ "CometSparkColumnarToColumnar"
+ } else {
+ "CometSparkRowToColumnar"
+ }
+
override lazy val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
output batches"),
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 91a8bdf8b..d14191c09 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -1905,6 +1905,57 @@ class CometExecSuite extends CometTestBase {
assert(!CometScanExec.isFileFormatSupported(new CustomParquetFileFormat()))
}
+
+ test("SparkToColumnar override node name for row input") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
+ val df = spark
+ .range(1000)
+ .selectExpr("id as key", "id % 8 as value")
+ .toDF("key", "value")
+ .groupBy("key")
+ .count()
+ df.collect()
+
+ val planAfter = df.queryExecution.executedPlan
+ assert(planAfter.toString.startsWith("AdaptiveSparkPlan
isFinalPlan=true"))
+ val adaptivePlan =
planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec
=>
+ c.nodeName
+ }
+ assert(nodeNames.length == 1)
+ assert(nodeNames.head == "CometSparkRowToColumnar")
+ }
+ }
+
+ test("SparkToColumnar override node name for columnar input") {
+ withSQLConf(
+ SQLConf.USE_V1_SOURCE_LIST.key -> "",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
+ CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") {
+ withTempDir { dir =>
+ var df = spark
+ .range(10000)
+ .selectExpr("id as key", "id % 8 as value")
+ .toDF("key", "value")
+
+ df.write.mode("overwrite").parquet(dir.toString)
+ df = spark.read.parquet(dir.toString)
+ df = df.groupBy("key", "value").count()
+ df.collect()
+
+ val planAfter = df.queryExecution.executedPlan
+ val nodeNames = planAfter.collect { case c: CometSparkToColumnarExec =>
+ c.nodeName
+ }
+ assert(nodeNames.length == 1)
+ assert(nodeNames.head == "CometSparkColumnarToColumnar")
+ }
+ }
+ }
+
}
case class BucketedTableTestSpec(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]