This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 600a5eeb9 [VL]: Fix VeloxColumnarWriteFilesExecwithNewChildren doesn't 
replace the dummy child (#5726)
600a5eeb9 is described below

commit 600a5eeb93cf2cbd12aa2c018d28addf12510bd2
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon May 13 18:19:46 2024 +0800

    [VL]: Fix VeloxColumnarWriteFilesExecwithNewChildren doesn't replace the 
dummy child (#5726)
---
 .../execution/VeloxColumnarWriteFilesExec.scala    | 61 +++++++++++++++-------
 1 file changed, 42 insertions(+), 19 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 23dff990c..1d3d55afb 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -259,8 +259,9 @@ class VeloxColumnarWriteFilesRDD(
 // we need to expose a dummy child (as right child) with type "WriteFilesExec" 
to let Spark
 // choose the new write code path (version >= 3.4). The actual plan to write 
is the left child
 // of this operator.
-case class VeloxColumnarWriteFilesExec(
-    child: SparkPlan,
+case class VeloxColumnarWriteFilesExec private (
+    override val left: SparkPlan,
+    override val right: SparkPlan,
     fileFormat: FileFormat,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
@@ -269,7 +270,8 @@ case class VeloxColumnarWriteFilesExec(
   extends BinaryExecNode
   with GlutenPlan
   with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
-  import VeloxColumnarWriteFilesExec._
+
+  val child: SparkPlan = left
 
   override lazy val references: AttributeSet = AttributeSet.empty
 
@@ -320,28 +322,49 @@ case class VeloxColumnarWriteFilesExec(
       new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
     }
   }
-
-  override def left: SparkPlan = child
-
-  // This is a workaround for FileFormatWriter#write. Vanilla Spark (version 
>= 3.4) requires for
-  // a plan that has at least one node exactly of type `WriteFilesExec` that 
is a Scala case-class,
-  // to decide to choose new `#executeWrite` code path over the legacy 
`#execute` for write
-  // operation.
-  //
-  // So we add a no-op `WriteFilesExec` child to let Spark pick the new code 
path.
-  //
-  // See: FileFormatWriter#write
-  // See: V1Writes#getWriteFilesOpt
-  override val right: SparkPlan =
-    WriteFilesExec(NoopLeaf(), fileFormat, partitionColumns, bucketSpec, 
options, staticPartitions)
-
   override protected def withNewChildrenInternal(
       newLeft: SparkPlan,
       newRight: SparkPlan): SparkPlan =
-    copy(newLeft, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
+    copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
 }
 
 object VeloxColumnarWriteFilesExec {
+
+  def apply(
+      child: SparkPlan,
+      fileFormat: FileFormat,
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      options: Map[String, String],
+      staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = {
+    // This is a workaround for FileFormatWriter#write. Vanilla Spark (version 
>= 3.4) requires for
+    // a plan that has at least one node exactly of type `WriteFilesExec` that 
is a Scala
+    // case-class, to decide to choose new `#executeWrite` code path over the 
legacy `#execute`
+    // for write operation.
+    //
+    // So we add a no-op `WriteFilesExec` child to let Spark pick the new code 
path.
+    //
+    // See: FileFormatWriter#write
+    // See: V1Writes#getWriteFilesOpt
+    val right: SparkPlan =
+      WriteFilesExec(
+        NoopLeaf(),
+        fileFormat,
+        partitionColumns,
+        bucketSpec,
+        options,
+        staticPartitions)
+
+    VeloxColumnarWriteFilesExec(
+      child,
+      right,
+      fileFormat,
+      partitionColumns,
+      bucketSpec,
+      options,
+      staticPartitions)
+  }
+
   private case class NoopLeaf() extends LeafExecNode {
     override protected def doExecute(): RDD[InternalRow] =
       throw new GlutenException(s"$nodeName does not support doExecute")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to