This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8d76cb81c9 [GLUTEN-11920] Keep columnar children for dual-mode parents
with row output (#12004)
8d76cb81c9 is described below
commit 8d76cb81c9d5966509033d29f11490b5df110a9c
Author: Yao-MR <[email protected]>
AuthorDate: Fri May 8 16:27:33 2026 +0800
[GLUTEN-11920] Keep columnar children for dual-mode parents with row output
(#12004)
---
.../scala/org/apache/gluten/execution/GlutenPlan.scala | 11 +++++++++--
.../extension/columnar/transition/ConventionFunc.scala | 8 ++++++++
.../columnar/transition/TransitionSuite.scala | 14 ++++++++++++++
.../columnar/transition/TransitionSuiteBase.scala | 18 ++++++++++++++++++
.../apache/gluten/utils/velox/VeloxTestSettings.scala | 2 +-
5 files changed, 50 insertions(+), 3 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
index 91d5ae4e72..010a5963ec 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
@@ -62,8 +62,15 @@ trait GlutenPlan
override def rowType(): Convention.RowType
override def requiredChildConvention(): Seq[ConventionReq] = {
- // In the normal case, children's convention should follow parent node's
convention.
- val childReq = Convention.of(rowType(), batchType()).asReq()
+ val childReq =
+ if (supportsColumnar && supportsRowBased) {
+ // A dual-mode parent can keep columnar children and still satisfy a
row-based output
+ // requirement itself. Align with Spark's transition insertion
behavior.
+ ConventionReq.ofBatch(ConventionReq.BatchType.Is(batchType()))
+ } else {
+ // In the normal case, children's convention should follow parent
node's convention.
+ Convention.of(rowType(), batchType()).asReq()
+ }
Seq.tabulate(children.size)(
_ => {
childReq
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index b28fbe4f36..0075d485e9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -185,6 +185,14 @@ object ConventionFunc {
ConventionReq.of(
ConventionReq.RowType.Is(Convention.RowType.VanillaRowType),
ConventionReq.BatchType.Any))
+ case other if other.supportsColumnar &&
SparkPlanUtil.supportsRowBased(other) =>
+ // A dual-mode parent can keep columnar children and still satisfy a
row-based output
+ // requirement itself. Align with Spark's transition insertion
behavior.
+ val childReq =
ConventionReq.ofBatch(ConventionReq.BatchType.Is(batchTypeOf(other)))
+ Seq.tabulate(other.children.size)(
+ _ => {
+ childReq
+ })
case other =>
// In the normal case, children's convention should follow parent
node's convention.
val childReq = conventionOf0(other).asReq()
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index 3c350003ac..eab0f66598 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -65,6 +65,20 @@ class TransitionSuite extends SharedSparkSession with
TransitionSuiteBase with W
BatchUnary(BatchTypeA, RowToBatch(RowTypeA, BatchTypeA,
RowLeaf(RowTypeA)))))
}
+ test("SPARK-51474: Dual-mode parent keeps columnar child for row output") {
+ val in = DualModeUnary(RowTypeA, BatchTypeA, BatchLeaf(BatchTypeA))
+ val out = insertTransitions(in,
ConventionReq.ofRow(ConventionReq.RowType.Is(RowTypeA)))
+ assert(out == in)
+ }
+
+ test("SPARK-51474: Dual-mode parent only needs batch child convention") {
+ val in = DualModeUnary(RowTypeA, BatchTypeA, RowLeaf(RowTypeA))
+ val out = insertTransitions(in,
ConventionReq.ofRow(ConventionReq.RowType.Is(RowTypeA)))
+ assert(
+ out ==
+ DualModeUnary(RowTypeA, BatchTypeA, RowToBatch(RowTypeA, BatchTypeA,
RowLeaf(RowTypeA))))
+ }
+
test("Insert C2R2C") {
val in = BatchUnary(BatchTypeA, BatchLeaf(BatchTypeB))
val out = insertTransitions(in,
ConventionReq.ofRow(ConventionReq.RowType.Is(RowTypeA)))
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index 4fddfcfbd5..4b3b8e42db 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -90,6 +90,24 @@ object TransitionSuiteBase {
override def output: Seq[Attribute] = child.output
}
+ case class DualModeUnary(
+ rowType0: Convention.RowType,
+ batchType0: Convention.BatchType,
+ override val child: SparkPlan)
+ extends UnaryExecNode
+ with GlutenPlan {
+ override def batchType(): Convention.BatchType = batchType0
+
+ override def rowType(): Convention.RowType = rowType0
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+
+ override def output: Seq[Attribute] = child.output
+ }
+
case class RowBinary(
override val rowType: Convention.RowType,
override val left: SparkPlan,
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 616cb9ac7d..3e32d6b559 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -664,7 +664,7 @@ class VeloxTestSettings extends BackendTestSettings {
// Generated suites for org.apache.spark.sql.execution
enableSuite[GlutenAggregatingAccumulatorSuite]
enableSuite[GlutenCoGroupedIteratorSuite]
- // TODO: 4.x enableSuite[GlutenColumnarRulesSuite] // 1 failure
+ enableSuite[GlutenColumnarRulesSuite]
enableSuite[GlutenDataSourceScanExecRedactionSuite]
.exclude("explain is redacted using SQLConf")
.exclude("SPARK-31793: FileSourceScanExec metadata should contain limited
file paths")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]