This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2796812 [SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters 2796812 is described below commit 2796812cea343f700c2009db9ce733b4f048ecd0 Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Thu Apr 1 12:48:26 2021 -0500 [SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters ### What changes were proposed in this pull request? Add more flexable parameters for stage end point endpoint /application/{app-id}/stages. It can be: /application/{app-id}/stages?details=[true|false]&status=[ACTIVE|COMPLETE|FAILED|PENDING|SKIPPED]&withSummaries=[true|false]$quantiles=[comma separated quantiles string]&taskStatus=[RUNNING|SUCCESS|FAILED|PENDING] where ``` query parameter details=true is to show the detailed task information within each stage. The default value is details=false; query parameter status can select those stages with the specified status. When status parameter is not specified, a list of all stages are generated. query parameter withSummaries=true is to show both task summary information in percentile distribution and executor summary information in percentile distribution. The default value is withSummaries=false. query parameter quantiles support user defined quantiles, default quantiles is `0.0,0.25,0.5,0.75,1.0` query parameter taskStatus is to show only those tasks with the specified status within their corresponding stages. This parameter will be set when details=true (i.e. this parameter will be ignored when details=false). ``` ### Why are the changes needed? More flexable restful API ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? UT Closes #31204 from AngersZhuuuu/SPARK-26399-NEW. Lead-authored-by: Angerszhuuuu <angers....@gmail.com> Co-authored-by: AngersZhuuuu <angers....@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../org/apache/spark/status/AppStatusStore.scala | 16 ++++++++--- .../spark/status/api/v1/StagesResource.scala | 31 ++++++++++++++++++++-- docs/monitoring.md | 6 ++++- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 1d34d8a..8d9c7cc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -104,13 +104,23 @@ private[spark] class AppStatusStore( listener.map(_.activeStages()).getOrElse(Nil) } - def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { + def stageList( + statuses: JList[v1.StageStatus], + details: Boolean = false, + withSummaries: Boolean = false, + unsortedQuantiles: Array[Double] = Array.empty, + taskStatus: JList[v1.TaskStatus] = List().asJava): Seq[v1.StageData] = { + val quantiles = unsortedQuantiles.sorted val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info) - if (statuses != null && !statuses.isEmpty()) { + val ret = if (statuses != null && !statuses.isEmpty()) { it.filter { s => statuses.contains(s.status) }.toSeq } else { it.toSeq } + ret.map { s => + newStageData(s, withDetail = details, taskStatus = taskStatus, + withSummaries = withSummaries, unsortedQuantiles = quantiles) + } } def stageData( @@ -472,7 +482,7 @@ private[spark] class AppStatusStore( def newStageData( stage: v1.StageData, withDetail: Boolean = false, - taskStatus: JList[v1.TaskStatus], + taskStatus: JList[v1.TaskStatus] = List().asJava, withSummaries: Boolean = false, unsortedQuantiles: Array[Double] = Array.empty[Double]): v1.StageData = { if (!withDetail && !withSummaries) { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index af66a73..26dfa5a 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -20,6 +20,9 @@ import java.util.{HashMap, List => JList, Locale} import javax.ws.rs.{NotFoundException => _, _} import javax.ws.rs.core.{Context, MediaType, MultivaluedMap, UriInfo} +import scala.collection.JavaConverters._ + +import org.apache.spark.status.api.v1.TaskStatus._ import org.apache.spark.ui.UIUtils import org.apache.spark.ui.jobs.ApiHelper._ import org.apache.spark.util.Utils @@ -28,8 +31,32 @@ import org.apache.spark.util.Utils private[v1] class StagesResource extends BaseAppResource { @GET - def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { - withUI(_.store.stageList(statuses)) + def stageList( + @QueryParam("status") statuses: JList[StageStatus], + @QueryParam("details") @DefaultValue("false") details: Boolean, + @QueryParam("withSummaries") @DefaultValue("false") withSummaries: Boolean, + @QueryParam("quantiles") @DefaultValue("0.0,0.25,0.5,0.75,1.0") quantileString: String, + @QueryParam("taskStatus") taskStatus: JList[TaskStatus]): Seq[StageData] = { + withUI { + val quantiles = parseQuantileString(quantileString) + ui => { + ui.store.stageList(statuses, details, withSummaries, quantiles, taskStatus) + .filter { stage => + if (details && taskStatus.asScala.nonEmpty) { + taskStatus.asScala.exists { + case FAILED => stage.numFailedTasks > 0 + case KILLED => stage.numKilledTasks > 0 + case RUNNING => stage.numActiveTasks > 0 + case SUCCESS => stage.numCompleteTasks > 0 + case UNKNOWN => stage.numTasks - stage.numFailedTasks - stage.numKilledTasks - + stage.numActiveTasks - stage.numCompleteTasks > 0 + } + } else { + true + } + } + } + } } @GET diff --git a/docs/monitoring.md b/docs/monitoring.md index 6157244..14c8c4f 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -472,7 +472,11 @@ can be identified by their `[attempt-id]`. In the API listed below, when running <td><code>/applications/[app-id]/stages</code></td> <td> A list of all stages for a given application. - <br><code>?status=[active|complete|pending|failed]</code> list only stages in the state. + <br><code>?status=[active|complete|pending|failed]</code> list only stages in the given state. + <br><code>?details=true</code> lists all stages with the task data. + <br><code>?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING]</code> lists stages only those tasks with the specified task status. Query parameter taskStatus takes effect only when <code>details=true</code>. + <br><code>?withSummaries=true</code> lists stages with task metrics distribution and executor metrics distribution. + <br><code>?quantiles=0.0,0.25,0.5,0.75,1.0</code> summarize the metrics with the given quantiles. Query parameter quantiles takes effect only when <code>withSummaries=true</code>. Default value is <code>0.0,0.25,0.5,0.75,1.0</code>. </td> </tr> <tr> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org