This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new fa5910efd chore: Override node name for CometSparkToColumnar (#1577)
fa5910efd is described below

commit fa5910efd927e115d1717b5f0c78fad0ece75c6c
Author: Ɓukasz <[email protected]>
AuthorDate: Mon Mar 31 19:38:54 2025 +0200

    chore: Override node name for CometSparkToColumnar (#1577)
    
    Co-authored-by: Lukasz <[email protected]>
---
 .../spark/sql/comet/CometSparkToColumnarExec.scala |  6 +++
 .../org/apache/comet/exec/CometExecSuite.scala     | 51 ++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index 9602a2868..e6ab425d0 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -52,6 +52,12 @@ case class CometSparkToColumnarExec(child: SparkPlan)
 
   override def supportsColumnar: Boolean = true
 
+  override def nodeName: String = if (child.supportsColumnar) {
+    "CometSparkColumnarToColumnar"
+  } else {
+    "CometSparkRowToColumnar"
+  }
+
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
     "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 91a8bdf8b..d14191c09 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -1905,6 +1905,57 @@ class CometExecSuite extends CometTestBase {
 
     assert(!CometScanExec.isFileFormatSupported(new CustomParquetFileFormat()))
   }
+
+  test("SparkToColumnar override node name for row input") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
+      val df = spark
+        .range(1000)
+        .selectExpr("id as key", "id % 8 as value")
+        .toDF("key", "value")
+        .groupBy("key")
+        .count()
+      df.collect()
+
+      val planAfter = df.queryExecution.executedPlan
+      assert(planAfter.toString.startsWith("AdaptiveSparkPlan 
isFinalPlan=true"))
+      val adaptivePlan = 
planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+      val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec 
=>
+        c.nodeName
+      }
+      assert(nodeNames.length == 1)
+      assert(nodeNames.head == "CometSparkRowToColumnar")
+    }
+  }
+
+  test("SparkToColumnar override node name for columnar input") {
+    withSQLConf(
+      SQLConf.USE_V1_SOURCE_LIST.key -> "",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+      CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
+      CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") {
+      withTempDir { dir =>
+        var df = spark
+          .range(10000)
+          .selectExpr("id as key", "id % 8 as value")
+          .toDF("key", "value")
+
+        df.write.mode("overwrite").parquet(dir.toString)
+        df = spark.read.parquet(dir.toString)
+        df = df.groupBy("key", "value").count()
+        df.collect()
+
+        val planAfter = df.queryExecution.executedPlan
+        val nodeNames = planAfter.collect { case c: CometSparkToColumnarExec =>
+          c.nodeName
+        }
+        assert(nodeNames.length == 1)
+        assert(nodeNames.head == "CometSparkColumnarToColumnar")
+      }
+    }
+  }
+
 }
 
 case class BucketedTableTestSpec(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to