This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 1146ba7e944 [SPARK-44591][CONNECT][WEBUI] Use jobTags in 
SparkListenerSQLExecutionStart to link SQL Execution IDs for Spark UI Connect 
page
1146ba7e944 is described below

commit 1146ba7e944254c0ebbbd082a186fce818fd033c
Author: Jason Li <jason...@databricks.com>
AuthorDate: Tue Aug 1 00:15:09 2023 -0700

    [SPARK-44591][CONNECT][WEBUI] Use jobTags in SparkListenerSQLExecutionStart 
to link SQL Execution IDs for Spark UI Connect page
    
    ### What changes were proposed in this pull request?
    Use jobTags in SparkListenerSQLExecutionStart to get the corresponding SQL 
Execution for Spark Connect request rather than 
SparkListenerJobStart.props.getProperty(SQLExecution.EXECUTION_ID_KEY), which 
won't work when the SQL Execution does not trigger a job.
    
    ### Why are the changes needed?
    This change handles cases where a SQL Execution doesn't trigger a job and 
we can't retrieve the SQL Execution ID from SparkListenerJobStart
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    Update unit test + local manual testing
    
    Closes #42244 from jasonli-db/spark-connect-ui-sql-start.
    
    Authored-by: Jason Li <jason...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
    (cherry picked from commit a40e46fe6dc35226b27335bb1431583f455f1e58)
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../connect/ui/SparkConnectServerListener.scala    | 49 ++++++++++++--
 .../ui/SparkConnectServerListenerSuite.scala       | 74 ++++++++++++----------
 2 files changed, 83 insertions(+), 40 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
index b40e847f404..90f9afebcb6 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, 
CONNECT_UI_STATEMENT_LIMIT}
 import org.apache.spark.sql.connect.service._
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
 import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
 
 private[connect] class SparkConnectServerListener(
@@ -80,12 +80,9 @@ private[connect] class SparkConnectServerListener(
     }
     val executeJobTag = executeJobTagOpt.get
     val exec = executionList.get(executeJobTag)
-    val executionIdOpt: Option[String] = Option(jobStart.properties)
-      .flatMap { p => Option(p.getProperty(SQLExecution.EXECUTION_ID_KEY)) }
     if (exec.nonEmpty) {
       exec.foreach { exec =>
         exec.jobId += jobStart.jobId.toString
-        executionIdOpt.foreach { execId => exec.sqlExecId += execId }
         updateLiveStore(exec)
       }
     } else {
@@ -105,8 +102,8 @@ private[connect] class SparkConnectServerListener(
           exec.userId,
           exec.operationId,
           exec.sparkSessionTags)
+        liveExec.sqlExecId = exec.sqlExecId
         liveExec.jobId += jobStart.jobId.toString
-        executionIdOpt.foreach { execId => exec.sqlExecId += execId }
         updateStoreWithTriggerEnabled(liveExec)
         executionList.remove(liveExec.jobTag)
       }
@@ -115,6 +112,7 @@ private[connect] class SparkConnectServerListener(
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     event match {
+      case e: SparkListenerSQLExecutionStart => onSQLExecutionStart(e)
       case e: SparkListenerConnectOperationStarted => onOperationStarted(e)
       case e: SparkListenerConnectOperationAnalyzed => onOperationAnalyzed(e)
       case e: SparkListenerConnectOperationReadyForExecution => 
onOperationReadyForExecution(e)
@@ -128,6 +126,45 @@ private[connect] class SparkConnectServerListener(
     }
   }
 
+  def onSQLExecutionStart(e: SparkListenerSQLExecutionStart): Unit = {
+    val executeJobTagOpt = e.jobTags.find {
+      case ExecuteJobTag(_) => true
+      case _ => false
+    }
+    if (executeJobTagOpt.isEmpty) {
+      return
+    }
+    val executeJobTag = executeJobTagOpt.get
+    val exec = executionList.get(executeJobTag)
+    if (exec.nonEmpty) {
+      exec.foreach { exec =>
+        exec.sqlExecId += e.executionId.toString
+        updateLiveStore(exec)
+      }
+    } else {
+      // This block guards against potential event re-ordering where a 
SQLExecutionStart
+      // event is processed after a ConnectOperationClosed event, in which 
case the Execution
+      // has already been evicted from the executionList.
+      val storeExecInfo =
+        KVUtils.viewToSeq(kvstore.view(classOf[ExecutionInfo]), 
Int.MaxValue)(exec =>
+          exec.jobTag == executeJobTag)
+      storeExecInfo.foreach { exec =>
+        val liveExec = getOrCreateExecution(
+          exec.jobTag,
+          exec.statement,
+          exec.sessionId,
+          exec.startTimestamp,
+          exec.userId,
+          exec.operationId,
+          exec.sparkSessionTags)
+        liveExec.jobId = exec.jobId
+        liveExec.sqlExecId += e.executionId.toString
+        updateStoreWithTriggerEnabled(liveExec)
+        executionList.remove(liveExec.jobTag)
+      }
+    }
+  }
+
   private def onOperationStarted(e: SparkListenerConnectOperationStarted) = 
synchronized {
     val executionData = getOrCreateExecution(
       e.jobTag,
@@ -326,7 +363,7 @@ private[connect] class LiveExecutionData(
   var closeTimestamp: Long = 0L
   var detail: String = ""
   var state: ExecutionState.Value = ExecutionState.STARTED
-  val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
+  var jobId: ArrayBuffer[String] = ArrayBuffer[String]()
   var sqlExecId: mutable.Set[String] = mutable.Set[String]()
 
   override protected def doUpdate(): Any = {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
index 9292e44f177..7cdc0135201 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
@@ -26,6 +26,7 @@ import 
org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, LIVE_ENT
 import org.apache.spark.scheduler.SparkListenerJobStart
 import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, 
CONNECT_UI_STATEMENT_LIMIT}
 import org.apache.spark.sql.connect.service._
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.kvstore.InMemoryStore
 
@@ -36,6 +37,8 @@ class SparkConnectServerListenerSuite
 
   private var kvstore: ElementTrackingStore = _
 
+  private val jobTag = ExecuteJobTag("sessionId", "userId", "operationId")
+
   after {
     if (kvstore != null) {
       kvstore.close()
@@ -47,12 +50,11 @@ class SparkConnectServerListenerSuite
     test(s"listener events should store successfully (live = $live)") {
       val (statusStore: SparkConnectServerAppStatusStore, listener: 
SparkConnectServerListener) =
         createAppStatusStore(live)
-
       listener.onOtherEvent(
         SparkListenerConnectSessionStarted("sessionId", "user", 
System.currentTimeMillis()))
       listener.onOtherEvent(
         SparkListenerConnectOperationStarted(
-          ExecuteJobTag("sessionId", "userId", "operationId"),
+          jobTag,
           "operationId",
           System.currentTimeMillis(),
           "sessionId",
@@ -62,22 +64,24 @@ class SparkConnectServerListenerSuite
           None,
           Set()))
       listener.onOtherEvent(
-        SparkListenerConnectOperationAnalyzed(
-          ExecuteJobTag("sessionId", "userId", "operationId"),
-          "operationId",
-          System.currentTimeMillis()))
+        SparkListenerConnectOperationAnalyzed(jobTag, "operationId", 
System.currentTimeMillis()))
+      listener.onOtherEvent(
+        SparkListenerSQLExecutionStart(
+          0,
+          None,
+          null,
+          null,
+          null,
+          null,
+          System.currentTimeMillis(),
+          null,
+          Set(jobTag)))
       listener.onJobStart(
         SparkListenerJobStart(0, System.currentTimeMillis(), Nil, 
createProperties))
       listener.onOtherEvent(
-        SparkListenerConnectOperationFinished(
-          ExecuteJobTag("sessionId", "userId", "operationId"),
-          "sessionId",
-          System.currentTimeMillis()))
+        SparkListenerConnectOperationFinished(jobTag, "sessionId", 
System.currentTimeMillis()))
       listener.onOtherEvent(
-        SparkListenerConnectOperationClosed(
-          ExecuteJobTag("sessionId", "userId", "operationId"),
-          "sessionId",
-          System.currentTimeMillis()))
+        SparkListenerConnectOperationClosed(jobTag, "sessionId", 
System.currentTimeMillis()))
 
       if (live) {
         assert(statusStore.getOnlineSessionNum === 1)
@@ -95,10 +99,11 @@ class SparkConnectServerListenerSuite
 
       val storeExecData = statusStore.getExecutionList.head
 
-      assert(storeExecData.jobTag === ExecuteJobTag("sessionId", "userId", 
"operationId"))
+      assert(storeExecData.jobTag === jobTag)
       assert(storeExecData.sessionId === "sessionId")
       assert(storeExecData.statement === "dummy query")
       assert(storeExecData.jobId === Seq("0"))
+      assert(storeExecData.sqlExecId === Set("0"))
       assert(listener.noLiveData())
     }
   }
@@ -132,15 +137,16 @@ class SparkConnectServerListenerSuite
     }
   }
 
-  test("update execution info when jobstart event come after execution end 
event") {
+  test(
+    "update execution info when event reordering causes job and sql" +
+      " start to come after operation closed") {
     val (statusStore: SparkConnectServerAppStatusStore, listener: 
SparkConnectServerListener) =
       createAppStatusStore(true)
-
     listener.onOtherEvent(
       SparkListenerConnectSessionStarted("sessionId", "userId", 
System.currentTimeMillis()))
     listener.onOtherEvent(
       SparkListenerConnectOperationStarted(
-        ExecuteJobTag("sessionId", "userId", "operationId"),
+        jobTag,
         "operationId",
         System.currentTimeMillis(),
         "sessionId",
@@ -150,21 +156,22 @@ class SparkConnectServerListenerSuite
         None,
         Set()))
     listener.onOtherEvent(
-      SparkListenerConnectOperationAnalyzed(
-        ExecuteJobTag("sessionId", "userId", "operationId"),
-        "operationId",
-        System.currentTimeMillis()))
+      SparkListenerConnectOperationAnalyzed(jobTag, "operationId", 
System.currentTimeMillis()))
     listener.onOtherEvent(
-      SparkListenerConnectOperationFinished(
-        ExecuteJobTag("sessionId", "userId", "operationId"),
-        "operationId",
-        System.currentTimeMillis()))
+      SparkListenerConnectOperationFinished(jobTag, "operationId", 
System.currentTimeMillis()))
     listener.onOtherEvent(
-      SparkListenerConnectOperationClosed(
-        ExecuteJobTag("sessionId", "userId", "operationId"),
-        "operationId",
-        System.currentTimeMillis()))
-
+      SparkListenerConnectOperationClosed(jobTag, "operationId", 
System.currentTimeMillis()))
+    listener.onOtherEvent(
+      SparkListenerSQLExecutionStart(
+        0,
+        None,
+        null,
+        null,
+        null,
+        null,
+        System.currentTimeMillis(),
+        null,
+        Set(jobTag)))
     listener.onJobStart(
       SparkListenerJobStart(0, System.currentTimeMillis(), Nil, 
createProperties))
     listener.onOtherEvent(
@@ -172,6 +179,7 @@ class SparkConnectServerListenerSuite
     val exec = statusStore.getExecution(ExecuteJobTag("sessionId", "userId", 
"operationId"))
     assert(exec.isDefined)
     assert(exec.get.jobId === Seq("0"))
+    assert(exec.get.sqlExecId === Set("0"))
     assert(listener.noLiveData())
   }
 
@@ -207,9 +215,7 @@ class SparkConnectServerListenerSuite
 
   private def createProperties: Properties = {
     val properties = new Properties()
-    properties.setProperty(
-      SparkContext.SPARK_JOB_TAGS,
-      ExecuteJobTag("sessionId", "userId", "operationId"))
+    properties.setProperty(SparkContext.SPARK_JOB_TAGS, jobTag)
     properties
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to