This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 65945e6f2b0 [SPARK-41708][SQL][FOLLOWUP] Do not insert columnar to row 
transition before write command
65945e6f2b0 is described below

commit 65945e6f2b07732f8aa65f8f94db80b5a058b8fb
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Feb 7 20:13:38 2023 +0800

    [SPARK-41708][SQL][FOLLOWUP] Do not insert columnar to row transition 
before write command
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/39277 . With 
planned write, the write command requires neither columnar nor row-based 
execution. It invokes a new API `executeWrite`, which returns commit messages, 
not columnar or row-based data.
    
    This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case 
into consideration.
    
    ### Why are the changes needed?
    
    If people replaces `WriteFilesExec` with a columnar version, the plan can't 
be executed due to an extra columnar to row transition between `WriteFilesExee` 
and the write command.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    new test
    
    Closes #39922 from cloud-fan/write.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 56dd20f6a6b3efb2676196fa14d5e7fbbafe4949)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/execution/Columnar.scala  | 17 ++++++++--
 .../spark/sql/SparkSessionExtensionSuite.scala     | 38 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index c3118d7e78c..684a3f319ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -28,6 +28,8 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.V1WriteCommand
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.types._
@@ -541,10 +543,19 @@ case class ApplyColumnarRulesAndInsertTransitions(
       // `outputsColumnar` is false but the plan only outputs columnar format, 
so add a
       // to-row transition here.
       ColumnarToRowExec(insertRowToColumnar(plan))
-    } else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
-      plan.withNewChildren(plan.children.map(insertTransitions(_, 
outputsColumnar = false)))
-    } else {
+    } else if (plan.isInstanceOf[ColumnarToRowTransition]) {
       plan
+    } else {
+      val outputsColumnar = plan match {
+        // With planned write, the write command invokes child plan's 
`executeWrite` which is
+        // neither columnar nor row-based.
+        case write: DataWritingCommandExec
+            if write.cmd.isInstanceOf[V1WriteCommand] && 
conf.plannedWriteEnabled =>
+          write.child.supportsColumnar
+        case _ =>
+          false
+      }
+      plan.withNewChildren(plan.children.map(insertTransitions(_, 
outputsColumnar)))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 66507756e89..48ad10992c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -26,6 +26,8 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, 
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
ParserInterface}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
@@ -33,8 +35,10 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Limit, LocalRela
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec, ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, 
WriteFilesSpec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.internal.SQLConf
@@ -301,6 +305,11 @@ class SparkSessionExtensionSuite extends SparkFunSuite 
with SQLHelper {
       assert(result(0).getLong(0) == 101L) // Check that broken columnar Add 
was used.
       assert(result(1).getLong(0) == 201L)
       assert(result(2).getLong(0) == 301L)
+
+      withTempPath { path =>
+        val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
+        assert(e.getMessage == "columnar write")
+      }
     }
   }
 
@@ -790,6 +799,27 @@ class ColumnarProjectExec(projectList: 
Seq[NamedExpression], child: SparkPlan)
     new ColumnarProjectExec(projectList, newChild)
 }
 
+class ColumnarWriteExec(
+    child: SparkPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    staticPartitions: TablePartitionSpec) extends WriteFilesExec(
+  child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) {
+
+  override def supportsColumnar(): Boolean = true
+
+  override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
+    assert(child.supportsColumnar)
+    throw new Exception("columnar write")
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
WriteFilesExec =
+    new ColumnarWriteExec(
+      newChild, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
+}
+
 /**
  * A version of add that supports columnar processing for longs.  This version 
is broken
  * on purpose so it adds the numbers plus 1 so that the tests can show that it 
was replaced.
@@ -897,6 +927,14 @@ case class PreRuleReplaceAddWithBrokenVersion() extends 
Rule[SparkPlan] {
           new ColumnarProjectExec(plan.projectList.map((exp) =>
             replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]),
             replaceWithColumnarPlan(plan.child))
+        case write: WriteFilesExec =>
+          new ColumnarWriteExec(
+            replaceWithColumnarPlan(write.child),
+            write.fileFormat,
+            write.partitionColumns,
+            write.bucketSpec,
+            write.options,
+            write.staticPartitions)
         case p =>
           logWarning(s"Columnar processing for ${p.getClass} is not currently 
supported.")
           p.withNewChildren(p.children.map(replaceWithColumnarPlan))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to