This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ee9146df22 [CORE][VL] Hide child nodes from implementations of
`OffloadSingleNode` (#9220)
ee9146df22 is described below
commit ee9146df220e2eb04e921ae7bfb54af96c4facd9
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Apr 7 12:08:17 2025 +0100
[CORE][VL] Hide child nodes from implementations of `OffloadSingleNode`
(#9220)
---
.gitignore | 3 +-
.../gluten/component/VeloxDeltaComponent.scala | 1 +
.../gluten/component/VeloxHudiComponent.scala | 2 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +-
.../columnar/offload/OffloadSingleNode.scala | 85 +++++++++++++++++++++-
.../columnar/offload/OffloadSingleNodeRules.scala | 2 +-
6 files changed, 89 insertions(+), 6 deletions(-)
diff --git a/.gitignore b/.gitignore
index f402cb1e83..ab86334c8e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,8 +56,7 @@ cmake_install.cmake
build/
*-build/
Testing/
-cmake-build-debug/
-cmake-build-release/
+cmake-build-*/
ep/_ep/
# Editor temporary/working/backup files #
diff --git
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
index 4e5ec88c00..d157d7b725 100644
---
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
+++
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -37,6 +37,7 @@ class VeloxDeltaComponent extends Component {
legacy.injectTransform {
c =>
val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(),
OffloadDeltaFilter())
+ .map(_.toStrcitRule())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
}
val offloads: Seq[RasOffload] = Seq(
diff --git
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
index c9eeabcdfe..b11645efbd 100644
---
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
+++
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
@@ -36,7 +36,7 @@ class VeloxHudiComponent extends Component {
val ras = injector.gluten.ras
legacy.injectTransform {
c =>
- val offload = Seq(OffloadHudiScan())
+ val offload = Seq(OffloadHudiScan()).map(_.toStrcitRule())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
}
ras.injectRasRule {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 82a20b8cc5..04dfbf2558 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -74,7 +74,7 @@ object VeloxRuleApi {
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))
// Legacy: The legacy transform rule.
- val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
+ val offloads = Seq(OffloadOthers(), OffloadExchange(),
OffloadJoin()).map(_.toStrcitRule())
val validatorBuilder: GlutenConfig => Validator = conf =>
Validators.newValidator(conf, offloads)
val rewrites =
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala
index 232973f53a..4cf3e1d0c9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala
@@ -16,8 +16,17 @@
*/
package org.apache.gluten.extension.columnar.offload
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.vectorized.ColumnarBatch
/**
* Converts a vanilla Spark plan node into Gluten plan node. Gluten plan is
supposed to be executed
@@ -29,3 +38,77 @@ import org.apache.spark.sql.execution.SparkPlan
trait OffloadSingleNode extends Logging {
def offload(plan: SparkPlan): SparkPlan
}
+
+object OffloadSingleNode {
+ implicit class OffloadSingleNodeOps(rule: OffloadSingleNode) {
+
+ /**
+ * Converts the [[OffloadSingleNode]] rule to a strict version.
+ *
+ * In the strict version of the rule, all children of the input query plan
node will be replaced
+ * with 'DummyLeafExec' nodes so they are not accessible from the rule
body.
+ */
+ def toStrcitRule(): OffloadSingleNode = {
+ new StrictRule(rule);
+ }
+ }
+
+ private class StrictRule(delegate: OffloadSingleNode) extends
OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = {
+ val planWithChildrenHidden = hideChildren(plan)
+ val applied = delegate.offload(planWithChildrenHidden)
+ val out = restoreHiddenChildren(applied)
+ out
+ }
+
+ /**
+ * Replaces the children with 'DummyLeafExec' nodes so they become
inaccessible afterward. Used
+ * when the children plan nodes can be dropped because not interested.
+ */
+ private def hideChildren[T <: SparkPlan](plan: T): T = {
+ plan
+ .withNewChildren(
+ plan.children.map {
+ child =>
+ val dummyLeaf = DummyLeafExec(child)
+ child.logicalLink.foreach(dummyLeaf.setLogicalLink)
+ dummyLeaf
+ }
+ )
+ .asInstanceOf[T]
+ }
+
+ /** Restores hidden children from the replaced 'DummyLeafExec' nodes. */
+ private def restoreHiddenChildren[T <: SparkPlan](plan: T): T = {
+ plan
+ .transformDown {
+ case d: DummyLeafExec =>
+ d.hiddenPlan
+ case other => other
+ }
+ .asInstanceOf[T]
+ }
+ }
+
+ /**
+ * The plan node that hides the real child plan node during #applyOnNode
call. This is used when
+ * query planner doesn't allow a rule to access the child plan nodes from
the input query plan
+ * node.
+ */
+ private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode
with GlutenPlan {
+ private lazy val conv: Convention = Convention.get(hiddenPlan)
+
+ override def batchType(): Convention.BatchType = conv.batchType
+ override def rowType0(): Convention.RowType = conv.rowType
+ override def output: Seq[Attribute] = hiddenPlan.output
+ override def outputPartitioning: Partitioning =
hiddenPlan.outputPartitioning
+ override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering
+
+ override def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException("Not allowed in #applyOnNode
call")
+ override def doExecuteColumnar(): RDD[ColumnarBatch] =
+ throw new UnsupportedOperationException("Not allowed in #applyOnNode
call")
+ override def doExecuteBroadcast[T](): Broadcast[T] =
+ throw new UnsupportedOperationException("Not allowed in #applyOnNode
call")
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index d356a39b41..611b2a8ff7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -184,7 +184,7 @@ object OffloadOthers {
// Utility to replace single node within transformed Gluten node.
// Children will be preserved as they are as children of the output node.
//
- // Do not look up on children on the input node in this rule. Otherwise
+ // Do not look up on children on the input node in this rule. Otherwise,
// it may break RAS which would group all the possible input nodes to
// search for validate candidates.
private class ReplaceSingleNode extends LogLevelUtil with Logging {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]