This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4274fb8806c [SPARK-38550][SQL][CORE] Use a disk-based store to save more debug information for live UI 4274fb8806c is described below commit 4274fb8806c25519cac829d00b01132fe08c0cac Author: Linhong Liu <linhong....@databricks.com> AuthorDate: Thu Apr 14 14:01:03 2022 +0800 [SPARK-38550][SQL][CORE] Use a disk-based store to save more debug information for live UI ### What changes were proposed in this pull request? In Spark, the UI lacks troubleshooting abilities. For example: * AQE plan changes are not available * plan description of a large plan is truncated This is because the live UI depends on an in-memory KV store. We should always be worried about the stability issues when adding more information to the store. Therefore, it's better to add a disk-based store to save more information This PR includes: * A disk-based KV Store in AppStatusStore that allows adding information that does not fits in memory * A separate listener that collects diagnostic data and saves it to the disk store * New Rest API endpoint to expose the diagnostics data (AQE plan changes, untruncated plan) ### Why are the changes needed? The troubleshooting ability is highly needed. Because without this, it's hard to debug AQE related issues. Once we solve the blockers, we can make a long-term plan to improve the observability. ### Does this PR introduce _any_ user-facing change? Yes, a new REST API to expose more information of the application. Rest API endpoint: http://localhost:4040/api/v1/applications/local-1647312132944/diagnostics/sql/0 Example: ``` $ ./bin/spark-shell --conf spark.appStatusStore.diskStore.dir=/tmp/diskstore spark-shell> val df = sql( """SELECT t1.*, t2.c, t3.d | FROM (SELECT 1 as a, 'b' as b) t1 | JOIN (SELECT 1 as a, 'c' as c) t2 | ON t1.a = t2.a | JOIN (SELECT 1 as a, 'd' as d) t3 | ON t2.a = t3.a |""".stripMargin) df.show() ``` Output: ```json { "id" : 0, "physicalPlan" : "<plan description string>", "submissionTime" : "2022-03-15T03:41:42.226GMT", "completionTime" : "2022-03-15T03:41:43.387GMT", "errorMessage" : "", "planChanges" : [ { "physicalPlan" : "<plan description string>", "updateTime" : "2022-03-15T03:41:42.268GMT" }, { "physicalPlan" : "<plan description string>", "updateTime" : "2022-03-15T03:41:43.262GMT" } ] } ``` ### How was this patch tested? manually test Closes #35856 from linhongliu-db/diagnostic. Authored-by: Linhong Liu <linhong....@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/internal/config/Status.scala | 8 ++ .../org/apache/spark/status/AppStatusStore.scala | 27 ++++- docs/monitoring.md | 9 ++ .../org/apache/spark/sql/internal/SQLConf.scala | 11 ++ .../spark/sql/diagnostic/DiagnosticListener.scala | 112 +++++++++++++++++++++ .../spark/sql/diagnostic/DiagnosticStore.scala | 73 ++++++++++++++ .../spark/sql/execution/QueryExecution.scala | 6 +- .../apache/spark/sql/execution/SQLExecution.scala | 6 +- .../spark/sql/execution/ui/SQLListener.scala | 6 +- .../apache/spark/sql/internal/SharedState.scala | 7 ++ .../status/api/v1/sql/ApiSqlRootResource.scala | 11 ++ .../status/api/v1/sql/SQLDiagnosticResource.scala | 67 ++++++++++++ .../org/apache/spark/status/api/v1/sql/api.scala | 10 ++ 13 files changed, 345 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 669fa07053c..1db7267237f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -70,4 +70,12 @@ private[spark] object Status { .version("3.0.0") .booleanConf .createWithDefault(false) + + val DISK_STORE_DIR_FOR_STATUS = + ConfigBuilder("spark.appStatusStore.diskStoreDir") + .doc("Local directory where to store app status. " + + "It's an alternative to the in-memory kv store") + .version("3.4.0") + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 34155e3e330..b455850d609 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -17,12 +17,17 @@ package org.apache.spark.status +import java.io.File +import java.nio.file.Files import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID import org.apache.spark.ui.scope._ @@ -34,6 +39,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} */ private[spark] class AppStatusStore( val store: KVStore, + val diskStore: Option[KVStore] = None, val listener: Option[AppStatusListener] = None) { def applicationInfo(): v1.ApplicationInfo = { @@ -755,18 +761,33 @@ private[spark] class AppStatusStore( } } -private[spark] object AppStatusStore { +private[spark] object AppStatusStore extends Logging { val CURRENT_VERSION = 2L /** - * Create an in-memory store for a live application. + * Create an in-memory store for a live application. also create a disk store if + * the `spark.appStatusStore.diskStore.dir` is set */ def createLiveStore( conf: SparkConf, appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { val store = new ElementTrackingStore(new InMemoryStore(), conf) val listener = new AppStatusListener(store, conf, true, appStatusSource) - new AppStatusStore(store, listener = Some(listener)) + // create a disk-based kv store if the directory is set + val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir => + val storePath = Files.createDirectories( + new File(storeDir, System.currentTimeMillis().toString).toPath + ).toFile + try { + Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), conf)) + .map(new ElementTrackingStore(_, conf)) + } catch { + case NonFatal(e) => + logWarning("Failed to create disk-based app status store: ", e) + None + } + } + new AppStatusStore(store, diskStore = diskStore, listener = Some(listener)) } } diff --git a/docs/monitoring.md b/docs/monitoring.md index f2c6e379749..6d1bd2eefcc 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -611,6 +611,15 @@ can be identified by their `[attempt-id]`. In the API listed below, when running <code>?planDescription=[true (default) | false]</code> enables/disables Physical <code>planDescription</code> on demand for the given query when Physical Plan size is high. </td> </tr> + <tr> + <td><code>/applications/[app-id]/diagnostics/sql/[execution-id]</code></td> + <td>Diagnostic for the given query. it includes: + <br> + 1. plan change history of adaptive execution + <br> + 2. physical plan description with unlimited fields + </td> + </tr> <tr> <td><code>/applications/[app-id]/environment</code></td> <td>Environment details of the given application.</td> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 69ea91f8d3e..ac2a2e350c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3198,6 +3198,15 @@ object SQLConf { .intConf .createWithDefault(25) + val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC = + buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic") + .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " + + s"output will be stored for the diagnostics API. The output will be stored in " + + s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS.key}") + .version("3.4.0") + .intConf + .createWithDefault(10000) + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, further output will be truncated. The default setting always generates a full " + @@ -4451,6 +4460,8 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + def maxToStringFieldsForDiagnostic: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC) + def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala new file mode 100644 index 00000000000..7ce1093e879 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.diagnostic + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.execution.ExplainMode +import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} +import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS +import org.apache.spark.status.{ElementTrackingStore, KVUtils} + +/** + * A Spark listener that writes diagnostic information to a data store. The information can be + * accessed by the public REST API. + * + * @param kvStore used to store the diagnostic information + */ +class DiagnosticListener( + conf: SparkConf, + kvStore: ElementTrackingStore) extends SparkListener { + + kvStore.addTrigger( + classOf[ExecutionDiagnosticData], + conf.get(UI_RETAINED_EXECUTIONS)) { count => + cleanupExecutions(count) + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) + case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e) + case _ => // Ignore + } + + private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { + val data = new AdaptiveExecutionUpdate( + event.executionId, + System.currentTimeMillis(), + event.physicalPlanDescription + ) + kvStore.write(data) + } + + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + val sqlConf = event.qe.sparkSession.sessionState.conf + val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode) + val physicalPlan = event.qe.explainString( + planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic) + val data = new ExecutionDiagnosticData( + event.executionId, + physicalPlan, + event.time, + None, + None + ) + // Check triggers since it's adding new netries + kvStore.write(data, checkTriggers = true) + } + + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + try { + val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId) + val sqlConf = event.qe.sparkSession.sessionState.conf + val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode) + val physicalPlan = event.qe.explainString( + planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic) + val data = new ExecutionDiagnosticData( + event.executionId, + physicalPlan, + existing.submissionTime, + Some(event.time), + event.executionFailure.map( + e => s"${e.getClass.getCanonicalName}: ${e.getMessage}").orElse(Some("")) + ) + kvStore.write(data) + } catch { + case _: NoSuchElementException => + // this is possibly caused by the query failed before execution. + } + } + + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) + if (countToDelete <= 0) { + return + } + val view = kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined) + toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData], e.executionId)) + kvStore.removeAllByIndexValues( + classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId)) + } +} + +object DiagnosticListener { + val QUEUE_NAME = "diagnostics" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala new file mode 100644 index 00000000000..236ee104f0e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.diagnostic + +import scala.collection.JavaConverters._ + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.{KVIndex, KVStore} + +/** + * Provides a view of a KVStore with methods that make it easy to query diagnostic-specific + * information. There's no state kept in this class, so it's ok to have multiple instances + * of it in an application. + */ +class DiagnosticStore(store: KVStore) { + + def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = { + store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq + } + + def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = { + try { + Some(store.read(classOf[ExecutionDiagnosticData], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def adaptiveExecutionUpdates(executionId: Long): Seq[AdaptiveExecutionUpdate] = { + store.view(classOf[AdaptiveExecutionUpdate]) + .index("updateTime") + .parent(executionId) + .asScala + .toSeq + } +} + +/* Represents the diagnostic data of a SQL execution */ +class ExecutionDiagnosticData( + @KVIndexParam val executionId: Long, + val physicalPlan: String, + val submissionTime: Long, + val completionTime: Option[Long], + val errorMessage: Option[String]) + +/* Represents the plan change of an adaptive execution */ +class AdaptiveExecutionUpdate( + @KVIndexParam("id") + val executionId: Long, + @KVIndexParam(value = "updateTime", parent = "id") + val updateTime: Long, + val physicalPlan: String) { + + @JsonIgnore @KVIndex + private def naturalIndex: Array[Long] = Array(executionId, updateTime) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9bf8de5ea6c..4b74a96702c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -223,9 +223,11 @@ class QueryExecution( append("\n") } - def explainString(mode: ExplainMode): String = { + def explainString( + mode: ExplainMode, + maxFields: Int = SQLConf.get.maxToStringFields): String = { val concat = new PlanStringConcat() - explainString(mode, SQLConf.get.maxToStringFields, concat.append) + explainString(mode, maxFields, concat.append) withRedaction { concat.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 748f75b1862..953c370297f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -96,7 +96,7 @@ object SQLExecution { var ex: Option[Throwable] = None val startTime = System.nanoTime() try { - sc.listenerBus.post(SparkListenerSQLExecutionStart( + val event = SparkListenerSQLExecutionStart( executionId = executionId, description = desc, details = callSite.longForm, @@ -105,7 +105,9 @@ object SQLExecution { // will be caught and reported in the `SparkListenerSQLExecutionEnd` sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), time = System.currentTimeMillis(), - redactedConfigs)) + redactedConfigs) + event.qe = queryExecution + sc.listenerBus.post(event) body } catch { case e: Throwable => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 26805e135b7..e3f51cbe3b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -49,7 +49,11 @@ case class SparkListenerSQLExecutionStart( sparkPlanInfo: SparkPlanInfo, time: Long, modifiedConfigs: Map[String, String] = Map.empty) - extends SparkListenerEvent + extends SparkListenerEvent { + + // The `QueryExecution` instance that represents the SQL execution + @JsonIgnore private[sql] var qe: QueryExecution = null +} @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index e894f39d927..f6b748d2424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.diagnostic.DiagnosticListener import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution @@ -118,6 +119,12 @@ private[sql] class SharedState( statusStore } + sparkContext.statusStore.diskStore.foreach { kvStore => + sparkContext.listenerBus.addToQueue( + new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]), + DiagnosticListener.QUEUE_NAME) + } + /** * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui * data to show. diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index 747c05b9b06..6c727c4369d 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -31,4 +31,15 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext { def sqlList( @PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource] + + @Path("applications/{appId}/diagnostics/sql") + def sqlDiagnosticsList( + @PathParam("appId") appId: String): Class[SQLDiagnosticResource] = + classOf[SQLDiagnosticResource] + + @Path("applications/{appId}/{attemptId}/diagnostics/sql") + def sqlDiagnosticsList( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] = + classOf[SQLDiagnosticResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala new file mode 100644 index 00000000000..8a6c81ced74 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import java.util.Date +import javax.ws.rs._ + +import org.apache.spark.sql.diagnostic._ +import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} + +private[v1] class SQLDiagnosticResource extends BaseAppResource { + + @GET + def sqlDiagnosticList( + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int): Seq[SQLDiagnosticData] = { + withUI { ui => + ui.store.diskStore.map { kvStore => + val store = new DiagnosticStore(kvStore) + store.diagnosticsList(offset, length) + // Do not display the plan changes in the list + .map(d => prepareSqlDiagnosticData(d, Seq.empty)) + }.getOrElse(Seq.empty) + } + } + + @GET + @Path("{executionId:\\d+}") + def sqlDiagnostic( + @PathParam("executionId") execId: Long): SQLDiagnosticData = { + withUI { ui => + ui.store.diskStore.flatMap { kvStore => + val store = new DiagnosticStore(kvStore) + val updates = store.adaptiveExecutionUpdates(execId) + store.diagnostic(execId) + .map(d => prepareSqlDiagnosticData(d, updates)) + }.getOrElse(throw new NotFoundException("unknown query execution id: " + execId)) + } + } + + private def prepareSqlDiagnosticData( + diagnostic: ExecutionDiagnosticData, + updates: Seq[AdaptiveExecutionUpdate]): SQLDiagnosticData = { + new SQLDiagnosticData( + diagnostic.executionId, + diagnostic.physicalPlan, + new Date(diagnostic.submissionTime), + diagnostic.completionTime.map(t => new Date(t)), + diagnostic.errorMessage, + updates.map(u => AdaptivePlanChange(new Date(u.updateTime), u.physicalPlan))) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 0ddf66718bc..3cafc10352f 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -40,3 +40,13 @@ case class Node private[spark]( metrics: Seq[Metric]) case class Metric private[spark] (name: String, value: String) + +class SQLDiagnosticData private[spark] ( + val id: Long, + val physicalPlan: String, + val submissionTime: Date, + val completionTime: Option[Date], + val errorMessage: Option[String], + val planChanges: Seq[AdaptivePlanChange]) + +case class AdaptivePlanChange(updateTime: Date, physicalPlan: String) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org