This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c5a01a  [SPARK-35378][SQL][FOLLOWUP] Restore the command execution 
name for DataFrameWriterV2
0c5a01a is described below

commit 0c5a01a78c067a78025878dff9d54843111287cc
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Jun 17 08:55:42 2021 +0000

    [SPARK-35378][SQL][FOLLOWUP] Restore the command execution name for 
DataFrameWriterV2
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/32513
    
    It's hard to keep the command execution name for `DataFrameWriter`, as the 
command logical plan is a bit messy (DS v1, file source and hive and different 
command logical plans) and sometimes it's hard to distinguish "insert" and 
"save".
    
    However, `DataFrameWriterV2` only produce v2 commands which are pretty 
clean. It's easy to keep the command execution name for them.
    
    ### Why are the changes needed?
    
    less breaking changes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    Closes #32919 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 docs/sql-migration-guide.md                            |  2 +-
 .../scala/org/apache/spark/sql/DataFrameWriter.scala   |  3 +--
 .../scala/org/apache/spark/sql/DataFrameWriterV2.scala |  4 +---
 .../apache/spark/sql/execution/QueryExecution.scala    | 18 ++++++++++++++----
 .../connector/WriteDistributionAndOrderingSuite.scala  |  5 ++---
 5 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index ce655af..dd654cd 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -95,7 +95,7 @@ license: |
   
   - In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it 
used to be mapped to `REAL`, which is by default a synonym to `DOUBLE 
PRECISION` in MySQL. 
 
-  - In Spark 3.2, the query executions triggered by `DataFrameWriter` are 
always named `command` when being sent to `QueryExecutionListener`. In Spark 
3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`, 
`create`, `append`, `overwrite`, `overwritePartitions`, `replace`.
+  - In Spark 3.2, the query executions triggered by `DataFrameWriter` are 
always named `command` when being sent to `QueryExecutionListener`. In Spark 
3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`.
 
 ## Upgrading from Spark SQL 3.0 to 3.1
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5b68493a..47eb199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -857,8 +857,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    */
   private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
     val qe = session.sessionState.executePlan(command)
-    // call `QueryExecution.commandExecuted` to trigger the execution of 
commands.
-    qe.commandExecuted
+    qe.assertCommandExecuted()
   }
 
   private def lookupV2Provider(): Option[TableProvider] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 7b13105..3931d1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -25,7 +25,6 @@ import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, 
Hours, Literal, Months, Years}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
 import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference, Transform}
-import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.types.IntegerType
 
 /**
@@ -191,8 +190,7 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
    */
   private def runCommand(command: LogicalPlan): Unit = {
     val qe = sparkSession.sessionState.executePlan(command)
-    // call `QueryExecution.toRDD` to trigger the execution of commands.
-    SQLExecution.withNewExecutionId(qe, Some("command"))(qe.toRdd)
+    qe.assertCommandExecuted()
   }
 
   private def internalReplace(orCreate: Boolean): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index a794a47..aaa87bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
QueryPlanningTracker}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
ReturnAnswer}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, 
CreateTableAsSelect, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
@@ -81,11 +81,21 @@ class QueryExecution(
     case CommandExecutionMode.SKIP => analyzed
   }
 
+  private def commandExecutionName(command: Command): String = command match {
+    case _: CreateTableAsSelect => "create"
+    case _: ReplaceTableAsSelect => "replace"
+    case _: AppendData => "append"
+    case _: OverwriteByExpression => "overwrite"
+    case _: OverwritePartitionsDynamic => "overwritePartitions"
+    case _ => "command"
+  }
+
   private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
     case c: Command =>
       val qe = sparkSession.sessionState.executePlan(c, 
CommandExecutionMode.NON_ROOT)
-      val result =
-        SQLExecution.withNewExecutionId(qe, 
Some("command"))(qe.executedPlan.executeCollect())
+      val result = SQLExecution.withNewExecutionId(qe, 
Some(commandExecutionName(c))) {
+        qe.executedPlan.executeCollect()
+      }
       CommandResult(
         qe.analyzed.output,
         qe.commandExecuted,
@@ -102,7 +112,7 @@ class QueryExecution(
     
sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone())
   }
 
-  private def assertCommandExecuted(): Unit = commandExecuted
+  def assertCommandExecuted(): Unit = commandExecuted
 
   lazy val optimizedPlan: LogicalPlan = {
     // We need to materialize the commandExecuted here because optimizedPlan 
is also tracked under
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index 945a35a..db4a9c1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog}
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NullOrdering, SortDirection, SortOrder}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions._
-import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, 
SortExec, SparkPlan}
+import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@@ -778,8 +778,7 @@ class WriteDistributionAndOrderingSuite
 
     sparkContext.listenerBus.waitUntilEmpty()
 
-    assert(executedPlan.isInstanceOf[CommandResultExec])
-    executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan match {
+    executedPlan match {
       case w: V2TableWriteExec =>
         stripAQEPlan(w.query)
       case _ =>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to