This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0c5a01a [SPARK-35378][SQL][FOLLOWUP] Restore the command execution name for DataFrameWriterV2 0c5a01a is described below commit 0c5a01a78c067a78025878dff9d54843111287cc Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Jun 17 08:55:42 2021 +0000 [SPARK-35378][SQL][FOLLOWUP] Restore the command execution name for DataFrameWriterV2 ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/32513 It's hard to keep the command execution name for `DataFrameWriter`, as the command logical plan is a bit messy (DS v1, file source and hive and different command logical plans) and sometimes it's hard to distinguish "insert" and "save". However, `DataFrameWriterV2` only produce v2 commands which are pretty clean. It's easy to keep the command execution name for them. ### Why are the changes needed? less breaking changes. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #32919 from cloud-fan/follow. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- docs/sql-migration-guide.md | 2 +- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- .../scala/org/apache/spark/sql/DataFrameWriterV2.scala | 4 +--- .../apache/spark/sql/execution/QueryExecution.scala | 18 ++++++++++++++---- .../connector/WriteDistributionAndOrderingSuite.scala | 5 ++--- 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index ce655af..dd654cd 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -95,7 +95,7 @@ license: | - In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it used to be mapped to `REAL`, which is by default a synonym to `DOUBLE PRECISION` in MySQL. - - In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`, `create`, `append`, `overwrite`, `overwritePartitions`, `replace`. + - In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`. ## Upgrading from Spark SQL 3.0 to 3.1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5b68493a..47eb199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -857,8 +857,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = { val qe = session.sessionState.executePlan(command) - // call `QueryExecution.commandExecuted` to trigger the execution of commands. - qe.commandExecuted + qe.assertCommandExecuted() } private def lookupV2Provider(): Option[TableProvider] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 7b13105..3931d1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.types.IntegerType /** @@ -191,8 +190,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) */ private def runCommand(command: LogicalPlan): Unit = { val qe = sparkSession.sessionState.executePlan(command) - // call `QueryExecution.toRDD` to trigger the execution of commands. - SQLExecution.withNewExecutionId(qe, Some("command"))(qe.toRdd) + qe.assertCommandExecuted() } private def internalReplace(orCreate: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index a794a47..aaa87bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, ReturnAnswer} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString @@ -81,11 +81,21 @@ class QueryExecution( case CommandExecutionMode.SKIP => analyzed } + private def commandExecutionName(command: Command): String = command match { + case _: CreateTableAsSelect => "create" + case _: ReplaceTableAsSelect => "replace" + case _: AppendData => "append" + case _: OverwriteByExpression => "overwrite" + case _: OverwritePartitionsDynamic => "overwritePartitions" + case _ => "command" + } + private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown { case c: Command => val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT) - val result = - SQLExecution.withNewExecutionId(qe, Some("command"))(qe.executedPlan.executeCollect()) + val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) { + qe.executedPlan.executeCollect() + } CommandResult( qe.analyzed.output, qe.commandExecuted, @@ -102,7 +112,7 @@ class QueryExecution( sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone()) } - private def assertCommandExecuted(): Unit = commandExecuted + def assertCommandExecuted(): Unit = commandExecuted lazy val optimizedPlan: LogicalPlan = { // We need to materialize the commandExecuted here because optimizedPlan is also tracked under diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 945a35a..db4a9c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NullOrdering, SortDirection, SortOrder} import org.apache.spark.sql.connector.expressions.LogicalExpressions._ -import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SortExec, SparkPlan} +import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike @@ -778,8 +778,7 @@ class WriteDistributionAndOrderingSuite sparkContext.listenerBus.waitUntilEmpty() - assert(executedPlan.isInstanceOf[CommandResultExec]) - executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan match { + executedPlan match { case w: V2TableWriteExec => stripAQEPlan(w.query) case _ => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org