This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d213ff3dea [SPARK-45092][SQL][UI] Avoid analyzing twice for failed 
queries
4d213ff3dea is described below

commit 4d213ff3dea4d66e5dec7be3b35c5441d9187c30
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Sep 12 16:35:39 2023 +0800

    [SPARK-45092][SQL][UI] Avoid analyzing twice for failed queries
    
    ### What changes were proposed in this pull request?
    
    As a discussion starting from 
https://github.com/apache/spark/pull/42481#discussion_r1316776270, for failed 
queries, we need to avoid calling SparkPlanInfo fromSparkPlan, which triggers 
another round of analyzing.
    
    This patch uses `Either[Throwable, () => T]` to pass the throwable 
conditionally and bypass plan explain functions on error.
    
    ### Why are the changes needed?
    
    improvements of https://github.com/apache/spark/pull/42481
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #42838 from yaooqinn/SPARK-45092.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../spark/sql/execution/QueryExecution.scala       |  2 +-
 .../apache/spark/sql/execution/SQLExecution.scala  | 72 ++++++++++++++--------
 .../spark/sql/execution/ui/UISeleniumSuite.scala   |  2 +-
 3 files changed, 49 insertions(+), 27 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 8ddfde8acf8..b3c97a83970 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -71,7 +71,7 @@ class QueryExecution(
         // Because we do eager analysis for Dataframe, there will be no 
execution created after
         // AnalysisException occurs. So we need to explicitly create a new 
execution to post
         // start/end events to notify the listener and UI components.
-        SQLExecution.withNewExecutionId(this, Some("analyze"))(throw e)
+        SQLExecution.withNewExecutionIdOnError(this, Some("analyze"))(e)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 2a44a016d2d..b96b9c25dda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -66,9 +66,10 @@ object SQLExecution extends Logging {
    * Wrap an action that will execute "queryExecution" to track all Spark jobs 
in the body so that
    * we can connect them with an execution.
    */
-  def withNewExecutionId[T](
+  private def withNewExecutionId0[T](
       queryExecution: QueryExecution,
-      name: Option[String] = None)(body: => T): T = 
queryExecution.sparkSession.withActive {
+      name: Option[String] = None)(
+      body: Either[Throwable, () => T]): T = 
queryExecution.sparkSession.withActive {
     val sparkSession = queryExecution.sparkSession
     val sc = sparkSession.sparkContext
     val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
@@ -103,9 +104,6 @@ object SQLExecution extends Logging {
           redactedStr.substring(0, Math.min(truncateLength, 
redactedStr.length))
         }.getOrElse(callSite.shortForm)
 
-      val planDescriptionMode =
-        ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode)
-
       val globalConfigs = sparkSession.sharedState.conf.getAll.toMap
       val modifiedConfigs = sparkSession.sessionState.conf.getAllConfs
         .filterNot { case (key, value) =>
@@ -118,28 +116,39 @@ object SQLExecution extends Logging {
       withSQLConfPropagated(sparkSession) {
         var ex: Option[Throwable] = None
         val startTime = System.nanoTime()
+        val startEvent = SparkListenerSQLExecutionStart(
+          executionId = executionId,
+          rootExecutionId = Some(rootExecutionId),
+          description = desc,
+          details = callSite.longForm,
+          physicalPlanDescription = "",
+          sparkPlanInfo = SparkPlanInfo.EMPTY,
+          time = System.currentTimeMillis(),
+          modifiedConfigs = redactedConfigs,
+          jobTags = sc.getJobTags()
+        )
         try {
-          val planInfo = try {
-            SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
-          } catch {
-            case NonFatal(e) =>
-              logDebug("Failed to generate SparkPlanInfo", e)
-              // If the queryExecution already failed before this, we are not 
able to generate the
-              // the plan info, so we use and empty graphviz node to make the 
UI happy
-              SparkPlanInfo.EMPTY
+          body match {
+            case Left(e) =>
+              sc.listenerBus.post(startEvent)
+              throw e
+            case Right(f) =>
+              val planDescriptionMode =
+                
ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode)
+              val planDesc = queryExecution.explainString(planDescriptionMode)
+              val planInfo = try {
+                SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
+              } catch {
+                case NonFatal(e) =>
+                  logDebug("Failed to generate SparkPlanInfo", e)
+                  // If the queryExecution already failed before this, we are 
not able to generate
+                  // the the plan info, so we use and empty graphviz node to 
make the UI happy
+                  SparkPlanInfo.EMPTY
+              }
+              sc.listenerBus.post(
+                startEvent.copy(physicalPlanDescription = planDesc, 
sparkPlanInfo = planInfo))
+              f()
           }
-          sc.listenerBus.post(SparkListenerSQLExecutionStart(
-            executionId = executionId,
-            rootExecutionId = Some(rootExecutionId),
-            description = desc,
-            details = callSite.longForm,
-            physicalPlanDescription = 
queryExecution.explainString(planDescriptionMode),
-            sparkPlanInfo = planInfo,
-            time = System.currentTimeMillis(),
-            modifiedConfigs = redactedConfigs,
-            jobTags = sc.getJobTags()
-          ))
-          body
         } catch {
           case e: Throwable =>
             ex = Some(e)
@@ -181,6 +190,19 @@ object SQLExecution extends Logging {
     }
   }
 
+  def withNewExecutionId[T](
+      queryExecution: QueryExecution,
+      name: Option[String] = None)(body: => T): T = {
+    withNewExecutionId0(queryExecution, name)(Right(() => body))
+  }
+
+  def withNewExecutionIdOnError(
+      queryExecution: QueryExecution,
+      name: Option[String] = None)(t: Throwable): Unit = {
+    withNewExecutionId0(queryExecution, name)(Left(t))
+  }
+
+
   /**
    * Wrap an action with a known executionId. When running a different action 
in a different
    * thread from the original one, this method can be used to connect the 
Spark jobs in this action
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala
index f25c150259f..30124a5988e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/UISeleniumSuite.scala
@@ -110,7 +110,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
{
       val planDot = findAll(cssSelector(""".dot-file""")).map(_.text).toList
       assert(planDot.head.startsWith("digraph G {"))
       val planDetails = 
findAll(cssSelector("""#physical-plan-details""")).map(_.text).toList
-      assert(planDetails.head.contains("TABLE_OR_VIEW_NOT_FOUND"))
+      assert(planDetails.head.isEmpty)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to