This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5dcc342b74 [GLUTEN-9025] Remove the ColumnarPartialProject when its
followers don't support columnar (#9026)
5dcc342b74 is described below
commit 5dcc342b741a266e1761e78b141c4bc069e8a930
Author: Xiuli Wei <[email protected]>
AuthorDate: Tue Apr 29 16:30:12 2025 +0800
[GLUTEN-9025] Remove the ColumnarPartialProject when its followers don't
support columnar (#9026)
---
.../gluten/extension/PartialProjectRule.scala | 38 +++++++++++++++++-----
.../gluten/expression/UDFPartialProjectSuite.scala | 37 +++++++++++++++++++--
2 files changed, 64 insertions(+), 11 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
index f345a9a40c..f60bf11746 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ColumnarPartialProjectExec
+import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
@@ -28,15 +29,34 @@ case class PartialProjectRule(spark: SparkSession) extends
Rule[SparkPlan] {
if (!GlutenConfig.get.enableColumnarPartialProject) {
return plan
}
- plan.transformUp {
- case plan: ProjectExec =>
- val transformer = ColumnarPartialProjectExec.create(plan)
- if (
- transformer.doValidate().ok() &&
-
transformer.child.asInstanceOf[ColumnarPartialProjectExec].doValidate().ok()
- ) {
- transformer
- } else plan
+
+ val newPlan = plan match {
+ // If the root node of the plan is a ProjectExec and its child is a
gluten columnar op,
+ // we try to add a ColumnarPartialProjectExec
+ case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
+ tryAddColumnarPartialProjectExec(p)
+ case _ => plan
+ }
+
+ newPlan.transformUp {
+ case parent: SparkPlan
+ if parent.children.exists(_.isInstanceOf[ProjectExec]) &&
+ PlanUtil.isGlutenColumnarOp(parent) =>
+ parent.mapChildren {
+ case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
+ tryAddColumnarPartialProjectExec(p)
+ case other => other
+ }
}
}
+
+ private def tryAddColumnarPartialProjectExec(plan: ProjectExec): SparkPlan =
{
+ val transformer = ColumnarPartialProjectExec.create(plan)
+ if (
+ transformer.doValidate().ok() &&
+
transformer.child.asInstanceOf[ColumnarPartialProjectExec].doValidate().ok()
+ ) {
+ transformer
+ } else plan
+ }
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
index ff691906ef..83eb67acae 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
@@ -20,6 +20,7 @@ import
org.apache.gluten.execution.{ColumnarPartialProjectExec, WholeStageTransf
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding,
NullPropagation}
+import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions.udf
import java.io.File
@@ -83,7 +84,7 @@ abstract class UDFPartialProjectSuite extends
WholeStageTransformerSuite {
}
}
- test("test subquery") {
+ ignore("test subquery") {
runQueryAndCompare(
"select plus_one(" +
"(select plus_one(count(*)) from (values (1)) t0(inner_c))) as col " +
@@ -120,7 +121,7 @@ abstract class UDFPartialProjectSuite extends
WholeStageTransformerSuite {
}
}
- test("test function no argument") {
+ ignore("test function no argument") {
runQueryAndCompare("""SELECT no_argument(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ColumnarPartialProjectExec]
@@ -213,4 +214,36 @@ abstract class UDFPartialProjectSuite extends
WholeStageTransformerSuite {
checkGlutenOperatorMatch[ColumnarPartialProjectExec]
}
}
+ // only SparkVersion >= 3.4 support columnar native writer
+ testWithSpecifiedSparkVersion(
+ "only the child and parent of the project both support Columnar," +
+ "just add ColumnarPartialProjectExec for the project",
+ "3.4",
+ "3.5") {
+ Seq("false", "true").foreach {
+ enableNativeScanAndWriter =>
+ withSQLConf(
+ "spark.gluten.sql.native.writer.enabled" ->
enableNativeScanAndWriter,
+ "spark.gluten.sql.columnar.batchscan" -> enableNativeScanAndWriter
+ ) {
+ withTable("t1") {
+ spark.sql("""
+ |create table if not exists t1 (revenue double) using
parquet
+ |""".stripMargin)
+ runQueryAndCompare(""" insert overwrite t1
+ | select (plus_one(l_extendedprice) *
l_discount
+ | + hash(l_orderkey) + hash(l_comment))
as revenue
+ | from lineitem
+ |""".stripMargin) {
+
+ if (enableNativeScanAndWriter.toBoolean) {
+ checkGlutenOperatorMatch[ColumnarPartialProjectExec]
+ } else {
+ checkSparkOperatorMatch[ProjectExec]
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]