This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4274fb8806c [SPARK-38550][SQL][CORE] Use a disk-based store to save 
more debug information for live UI
4274fb8806c is described below

commit 4274fb8806c25519cac829d00b01132fe08c0cac
Author: Linhong Liu <linhong....@databricks.com>
AuthorDate: Thu Apr 14 14:01:03 2022 +0800

    [SPARK-38550][SQL][CORE] Use a disk-based store to save more debug 
information for live UI
    
    ### What changes were proposed in this pull request?
    In Spark, the UI lacks troubleshooting abilities. For example:
    * AQE plan changes are not available
    * plan description of a large plan is truncated
    
    This is because the live UI depends on an in-memory KV store. We should 
always be worried
    about the stability issues when adding more information to the store. 
Therefore, it's better to
    add a disk-based store to save more information
    
    This PR includes:
    * A disk-based KV Store in AppStatusStore that allows adding information 
that does not fits in memory
    * A separate listener that collects diagnostic data and saves it to the 
disk store
    * New Rest API endpoint to expose the diagnostics data (AQE plan changes, 
untruncated plan)
    
    ### Why are the changes needed?
    The troubleshooting ability is highly needed. Because without this, it's 
hard to
    debug AQE related issues. Once we solve the blockers, we can make a 
long-term plan to improve the
    observability.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, a new REST API to expose more information of the application.
    Rest API endpoint: 
http://localhost:4040/api/v1/applications/local-1647312132944/diagnostics/sql/0
    Example:
    ```
    $ ./bin/spark-shell --conf spark.appStatusStore.diskStore.dir=/tmp/diskstore
    spark-shell>
    val df = sql(
      """SELECT t1.*, t2.c, t3.d
        |  FROM (SELECT 1 as a, 'b' as b) t1
        |  JOIN (SELECT 1 as a, 'c' as c) t2
        |  ON t1.a = t2.a
        |  JOIN (SELECT 1 as a, 'd' as d) t3
        |  ON t2.a = t3.a
        |""".stripMargin)
    df.show()
    ```
    Output:
    ```json
    {
      "id" : 0,
      "physicalPlan" : "<plan description string>",
      "submissionTime" : "2022-03-15T03:41:42.226GMT",
      "completionTime" : "2022-03-15T03:41:43.387GMT",
      "errorMessage" : "",
      "planChanges" : [ {
        "physicalPlan" : "<plan description string>",
        "updateTime" : "2022-03-15T03:41:42.268GMT"
      }, {
        "physicalPlan" : "<plan description string>",
        "updateTime" : "2022-03-15T03:41:43.262GMT"
      } ]
    }
    ```
    
    ### How was this patch tested?
    manually test
    
    Closes #35856 from linhongliu-db/diagnostic.
    
    Authored-by: Linhong Liu <linhong....@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/internal/config/Status.scala  |   8 ++
 .../org/apache/spark/status/AppStatusStore.scala   |  27 ++++-
 docs/monitoring.md                                 |   9 ++
 .../org/apache/spark/sql/internal/SQLConf.scala    |  11 ++
 .../spark/sql/diagnostic/DiagnosticListener.scala  | 112 +++++++++++++++++++++
 .../spark/sql/diagnostic/DiagnosticStore.scala     |  73 ++++++++++++++
 .../spark/sql/execution/QueryExecution.scala       |   6 +-
 .../apache/spark/sql/execution/SQLExecution.scala  |   6 +-
 .../spark/sql/execution/ui/SQLListener.scala       |   6 +-
 .../apache/spark/sql/internal/SharedState.scala    |   7 ++
 .../status/api/v1/sql/ApiSqlRootResource.scala     |  11 ++
 .../status/api/v1/sql/SQLDiagnosticResource.scala  |  67 ++++++++++++
 .../org/apache/spark/status/api/v1/sql/api.scala   |  10 ++
 13 files changed, 345 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
index 669fa07053c..1db7267237f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
@@ -70,4 +70,12 @@ private[spark] object Status {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(false)
+
+  val DISK_STORE_DIR_FOR_STATUS =
+    ConfigBuilder("spark.appStatusStore.diskStoreDir")
+      .doc("Local directory where to store app status. " +
+           "It's an alternative to the in-memory kv store")
+      .version("3.4.0")
+      .stringConf
+      .createOptional
 }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 34155e3e330..b455850d609 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -17,12 +17,17 @@
 
 package org.apache.spark.status
 
+import java.io.File
+import java.nio.file.Files
 import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
+import scala.util.control.NonFatal
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
 import org.apache.spark.ui.scope._
@@ -34,6 +39,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
  */
 private[spark] class AppStatusStore(
     val store: KVStore,
+    val diskStore: Option[KVStore] = None,
     val listener: Option[AppStatusListener] = None) {
 
   def applicationInfo(): v1.ApplicationInfo = {
@@ -755,18 +761,33 @@ private[spark] class AppStatusStore(
   }
 }
 
-private[spark] object AppStatusStore {
+private[spark] object AppStatusStore extends Logging {
 
   val CURRENT_VERSION = 2L
 
   /**
-   * Create an in-memory store for a live application.
+   * Create an in-memory store for a live application. also create a disk 
store if
+   * the `spark.appStatusStore.diskStore.dir` is set
    */
   def createLiveStore(
       conf: SparkConf,
       appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
     val store = new ElementTrackingStore(new InMemoryStore(), conf)
     val listener = new AppStatusListener(store, conf, true, appStatusSource)
-    new AppStatusStore(store, listener = Some(listener))
+    // create a disk-based kv store if the directory is set
+    val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir =>
+      val storePath = Files.createDirectories(
+        new File(storeDir, System.currentTimeMillis().toString).toPath
+      ).toFile
+       try {
+        Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), 
conf))
+          .map(new ElementTrackingStore(_, conf))
+      } catch {
+        case NonFatal(e) =>
+          logWarning("Failed to create disk-based app status store: ", e)
+          None
+      }
+    }
+    new AppStatusStore(store, diskStore = diskStore, listener = Some(listener))
   }
 }
diff --git a/docs/monitoring.md b/docs/monitoring.md
index f2c6e379749..6d1bd2eefcc 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -611,6 +611,15 @@ can be identified by their `[attempt-id]`. In the API 
listed below, when running
     <code>?planDescription=[true (default) | false]</code> enables/disables 
Physical <code>planDescription</code> on demand for the given query when 
Physical Plan size is high.
     </td>
   </tr>
+  <tr>
+    <td><code>/applications/[app-id]/diagnostics/sql/[execution-id]</code></td>
+    <td>Diagnostic for the given query. it includes:
+    <br>
+    1. plan change history of adaptive execution
+    <br>
+    2. physical plan description with unlimited fields
+    </td>
+  </tr>
   <tr>
     <td><code>/applications/[app-id]/environment</code></td>
     <td>Environment details of the given application.</td>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 69ea91f8d3e..ac2a2e350c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3198,6 +3198,15 @@ object SQLConf {
     .intConf
     .createWithDefault(25)
 
+  val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC =
+    buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic")
+      .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect 
when the " +
+        s"output will be stored for the diagnostics API. The output will be 
stored in " +
+        s"disk instead of memory. So it can be larger than 
${MAX_TO_STRING_FIELDS.key}")
+      .version("3.4.0")
+      .intConf
+      .createWithDefault(10000)
+
   val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
     .doc("Maximum number of characters to output for a plan string.  If the 
plan is " +
       "longer, further output will be truncated.  The default setting always 
generates a full " +
@@ -4451,6 +4460,8 @@ class SQLConf extends Serializable with Logging {
 
   def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
 
+  def maxToStringFieldsForDiagnostic: Int = 
getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC)
+
   def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
 
   def maxMetadataStringLength: Int = 
getConf(SQLConf.MAX_METADATA_STRING_LENGTH)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
new file mode 100644
index 00000000000..7ce1093e879
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.diagnostic
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.ExplainMode
+import 
org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, 
SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
+import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+
+/**
+ * A Spark listener that writes diagnostic information to a data store. The 
information can be
+ * accessed by the public REST API.
+ *
+ * @param kvStore used to store the diagnostic information
+ */
+class DiagnosticListener(
+    conf: SparkConf,
+    kvStore: ElementTrackingStore) extends SparkListener {
+
+  kvStore.addTrigger(
+    classOf[ExecutionDiagnosticData],
+    conf.get(UI_RETAINED_EXECUTIONS)) { count =>
+    cleanupExecutions(count)
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
+    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
+    case e: SparkListenerSQLAdaptiveExecutionUpdate => 
onAdaptiveExecutionUpdate(e)
+    case _ => // Ignore
+  }
+
+  private def onAdaptiveExecutionUpdate(event: 
SparkListenerSQLAdaptiveExecutionUpdate): Unit = {
+    val data = new AdaptiveExecutionUpdate(
+      event.executionId,
+      System.currentTimeMillis(),
+      event.physicalPlanDescription
+    )
+    kvStore.write(data)
+  }
+
+  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
+    val sqlConf = event.qe.sparkSession.sessionState.conf
+    val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
+    val physicalPlan = event.qe.explainString(
+      planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
+    val data = new ExecutionDiagnosticData(
+      event.executionId,
+      physicalPlan,
+      event.time,
+      None,
+      None
+    )
+    // Check triggers since it's adding new netries
+    kvStore.write(data, checkTriggers = true)
+  }
+
+  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+    try {
+      val existing = kvStore.read(classOf[ExecutionDiagnosticData], 
event.executionId)
+      val sqlConf = event.qe.sparkSession.sessionState.conf
+      val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
+      val physicalPlan = event.qe.explainString(
+        planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
+      val data = new ExecutionDiagnosticData(
+        event.executionId,
+        physicalPlan,
+        existing.submissionTime,
+        Some(event.time),
+        event.executionFailure.map(
+          e => s"${e.getClass.getCanonicalName}: 
${e.getMessage}").orElse(Some(""))
+      )
+      kvStore.write(data)
+    } catch {
+      case _: NoSuchElementException =>
+      // this is possibly caused by the query failed before execution.
+    }
+  }
+
+  private def cleanupExecutions(count: Long): Unit = {
+    val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
+    if (countToDelete <= 0) {
+      return
+    }
+    val view = 
kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, 
countToDelete.toInt)(_.completionTime.isDefined)
+    toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData], 
e.executionId))
+    kvStore.removeAllByIndexValues(
+      classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId))
+  }
+}
+
+object DiagnosticListener {
+  val QUEUE_NAME = "diagnostics"
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
new file mode 100644
index 00000000000..236ee104f0e
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.diagnostic
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query 
diagnostic-specific
+ * information. There's no state kept in this class, so it's ok to have 
multiple instances
+ * of it in an application.
+ */
+class DiagnosticStore(store: KVStore) {
+
+  def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] 
= {
+    
store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq
+  }
+
+  def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = {
+    try {
+      Some(store.read(classOf[ExecutionDiagnosticData], executionId))
+    } catch {
+      case _: NoSuchElementException => None
+    }
+  }
+
+  def adaptiveExecutionUpdates(executionId: Long): 
Seq[AdaptiveExecutionUpdate] = {
+    store.view(classOf[AdaptiveExecutionUpdate])
+      .index("updateTime")
+      .parent(executionId)
+      .asScala
+      .toSeq
+  }
+}
+
+/* Represents the diagnostic data of a SQL execution */
+class ExecutionDiagnosticData(
+    @KVIndexParam val executionId: Long,
+    val physicalPlan: String,
+    val submissionTime: Long,
+    val completionTime: Option[Long],
+    val errorMessage: Option[String])
+
+/* Represents the plan change of an adaptive execution */
+class AdaptiveExecutionUpdate(
+    @KVIndexParam("id")
+    val executionId: Long,
+    @KVIndexParam(value = "updateTime", parent = "id")
+    val updateTime: Long,
+    val physicalPlan: String) {
+
+  @JsonIgnore @KVIndex
+  private def naturalIndex: Array[Long] = Array(executionId, updateTime)
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9bf8de5ea6c..4b74a96702c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -223,9 +223,11 @@ class QueryExecution(
     append("\n")
   }
 
-  def explainString(mode: ExplainMode): String = {
+  def explainString(
+      mode: ExplainMode,
+      maxFields: Int = SQLConf.get.maxToStringFields): String = {
     val concat = new PlanStringConcat()
-    explainString(mode, SQLConf.get.maxToStringFields, concat.append)
+    explainString(mode, maxFields, concat.append)
     withRedaction {
       concat.toString
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 748f75b1862..953c370297f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -96,7 +96,7 @@ object SQLExecution {
         var ex: Option[Throwable] = None
         val startTime = System.nanoTime()
         try {
-          sc.listenerBus.post(SparkListenerSQLExecutionStart(
+          val event = SparkListenerSQLExecutionStart(
             executionId = executionId,
             description = desc,
             details = callSite.longForm,
@@ -105,7 +105,9 @@ object SQLExecution {
             // will be caught and reported in the 
`SparkListenerSQLExecutionEnd`
             sparkPlanInfo = 
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
             time = System.currentTimeMillis(),
-            redactedConfigs))
+            redactedConfigs)
+          event.qe = queryExecution
+          sc.listenerBus.post(event)
           body
         } catch {
           case e: Throwable =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 26805e135b7..e3f51cbe3b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -49,7 +49,11 @@ case class SparkListenerSQLExecutionStart(
     sparkPlanInfo: SparkPlanInfo,
     time: Long,
     modifiedConfigs: Map[String, String] = Map.empty)
-  extends SparkListenerEvent
+  extends SparkListenerEvent {
+
+  // The `QueryExecution` instance that represents the SQL execution
+  @JsonIgnore private[sql] var qe: QueryExecution = null
+}
 
 @DeveloperApi
 case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index e894f39d927..f6b748d2424 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.diagnostic.DiagnosticListener
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.streaming.StreamExecution
@@ -118,6 +119,12 @@ private[sql] class SharedState(
     statusStore
   }
 
+  sparkContext.statusStore.diskStore.foreach { kvStore =>
+    sparkContext.listenerBus.addToQueue(
+      new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]),
+      DiagnosticListener.QUEUE_NAME)
+  }
+
   /**
    * A [[StreamingQueryListener]] for structured streaming ui, it contains all 
streaming query ui
    * data to show.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
index 747c05b9b06..6c727c4369d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
@@ -31,4 +31,15 @@ private[v1] class ApiSqlRootResource extends 
ApiRequestContext {
   def sqlList(
       @PathParam("appId") appId: String,
       @PathParam("attemptId") attemptId: String): Class[SqlResource] = 
classOf[SqlResource]
+
+  @Path("applications/{appId}/diagnostics/sql")
+  def sqlDiagnosticsList(
+      @PathParam("appId") appId: String): Class[SQLDiagnosticResource] =
+    classOf[SQLDiagnosticResource]
+
+  @Path("applications/{appId}/{attemptId}/diagnostics/sql")
+  def sqlDiagnosticsList(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] 
=
+    classOf[SQLDiagnosticResource]
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
new file mode 100644
index 00000000000..8a6c81ced74
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.sql
+
+import java.util.Date
+import javax.ws.rs._
+
+import org.apache.spark.sql.diagnostic._
+import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
+
+private[v1] class SQLDiagnosticResource extends BaseAppResource {
+
+  @GET
+  def sqlDiagnosticList(
+      @DefaultValue("0") @QueryParam("offset") offset: Int,
+      @DefaultValue("20") @QueryParam("length") length: Int): 
Seq[SQLDiagnosticData] = {
+    withUI { ui =>
+      ui.store.diskStore.map { kvStore =>
+        val store = new DiagnosticStore(kvStore)
+        store.diagnosticsList(offset, length)
+          // Do not display the plan changes in the list
+          .map(d => prepareSqlDiagnosticData(d, Seq.empty))
+      }.getOrElse(Seq.empty)
+    }
+  }
+
+  @GET
+  @Path("{executionId:\\d+}")
+  def sqlDiagnostic(
+      @PathParam("executionId") execId: Long): SQLDiagnosticData = {
+    withUI { ui =>
+      ui.store.diskStore.flatMap { kvStore =>
+        val store = new DiagnosticStore(kvStore)
+        val updates = store.adaptiveExecutionUpdates(execId)
+        store.diagnostic(execId)
+          .map(d => prepareSqlDiagnosticData(d, updates))
+      }.getOrElse(throw new NotFoundException("unknown query execution id: " + 
execId))
+    }
+  }
+
+  private def prepareSqlDiagnosticData(
+      diagnostic: ExecutionDiagnosticData,
+      updates: Seq[AdaptiveExecutionUpdate]): SQLDiagnosticData = {
+    new SQLDiagnosticData(
+      diagnostic.executionId,
+      diagnostic.physicalPlan,
+      new Date(diagnostic.submissionTime),
+      diagnostic.completionTime.map(t => new Date(t)),
+      diagnostic.errorMessage,
+      updates.map(u => AdaptivePlanChange(new Date(u.updateTime), 
u.physicalPlan)))
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
index 0ddf66718bc..3cafc10352f 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
@@ -40,3 +40,13 @@ case class Node private[spark](
     metrics: Seq[Metric])
 
 case class Metric private[spark] (name: String, value: String)
+
+class SQLDiagnosticData private[spark] (
+    val id: Long,
+    val physicalPlan: String,
+    val submissionTime: Date,
+    val completionTime: Option[Date],
+    val errorMessage: Option[String],
+    val planChanges: Seq[AdaptivePlanChange])
+
+case class AdaptivePlanChange(updateTime: Date, physicalPlan: String)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to