This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 56dd20f6a6b [SPARK-41708][SQL][FOLLOWUP] Do not insert columnar to row transition before write command 56dd20f6a6b is described below commit 56dd20f6a6b3efb2676196fa14d5e7fbbafe4949 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Feb 7 20:13:38 2023 +0800 [SPARK-41708][SQL][FOLLOWUP] Do not insert columnar to row transition before write command ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/39277 . With planned write, the write command requires neither columnar nor row-based execution. It invokes a new API `executeWrite`, which returns commit messages, not columnar or row-based data. This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case into consideration. ### Why are the changes needed? If people replaces `WriteFilesExec` with a columnar version, the plan can't be executed due to an extra columnar to row transition between `WriteFilesExee` and the write command. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes #39922 from cloud-fan/write. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/execution/Columnar.scala | 17 ++++++++-- .../spark/sql/SparkSessionExtensionSuite.scala | 38 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index c3118d7e78c..684a3f319ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.V1WriteCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ @@ -541,10 +543,19 @@ case class ApplyColumnarRulesAndInsertTransitions( // `outputsColumnar` is false but the plan only outputs columnar format, so add a // to-row transition here. ColumnarToRowExec(insertRowToColumnar(plan)) - } else if (!plan.isInstanceOf[ColumnarToRowTransition]) { - plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false))) - } else { + } else if (plan.isInstanceOf[ColumnarToRowTransition]) { plan + } else { + val outputsColumnar = plan match { + // With planned write, the write command invokes child plan's `executeWrite` which is + // neither columnar nor row-based. + case write: DataWritingCommandExec + if write.cmd.isInstanceOf[V1WriteCommand] && conf.plannedWriteEnabled => + write.child.supportsColumnar + case _ => + false + } + plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 66507756e89..48ad10992c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -26,6 +26,8 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -33,8 +35,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Limit, LocalRela import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, WriteFilesSpec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf @@ -301,6 +305,11 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used. assert(result(1).getLong(0) == 201L) assert(result(2).getLong(0) == 301L) + + withTempPath { path => + val e = intercept[Exception](df.write.parquet(path.getCanonicalPath)) + assert(e.getMessage == "columnar write") + } } } @@ -790,6 +799,27 @@ class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) new ColumnarProjectExec(projectList, newChild) } +class ColumnarWriteExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) extends WriteFilesExec( + child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) { + + override def supportsColumnar(): Boolean = true + + override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { + assert(child.supportsColumnar) + throw new Exception("columnar write") + } + + override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = + new ColumnarWriteExec( + newChild, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) +} + /** * A version of add that supports columnar processing for longs. This version is broken * on purpose so it adds the numbers plus 1 so that the tests can show that it was replaced. @@ -897,6 +927,14 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] { new ColumnarProjectExec(plan.projectList.map((exp) => replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]), replaceWithColumnarPlan(plan.child)) + case write: WriteFilesExec => + new ColumnarWriteExec( + replaceWithColumnarPlan(write.child), + write.fileFormat, + write.partitionColumns, + write.bucketSpec, + write.options, + write.staticPartitions) case p => logWarning(s"Columnar processing for ${p.getClass} is not currently supported.") p.withNewChildren(p.children.map(replaceWithColumnarPlan)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org