gengliangwang commented on a change in pull request #28208: URL: https://github.com/apache/spark/pull/28208#discussion_r411871518
########## File path: sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala ########## @@ -84,18 +119,40 @@ private[v1] class SqlResource extends BaseAppResource { } val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime - val planDetails = if (details) exec.physicalPlanDescription else "" - val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty + val planDetails = if (details && planDescription) exec.physicalPlanDescription else "" + val metrics = + if (details) printableMetrics(exec.metrics, exec.metricValues, nodeIdAndWSCGIdMap) Review comment: nit: make it ``` if { } else { } ``` ########## File path: sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala ########## @@ -21,46 +21,81 @@ import java.util.Date import javax.ws.rs._ import javax.ws.rs.core.MediaType +import scala.util.{Failure, Success, Try} + import org.apache.spark.JobExecutionStatus -import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class SqlResource extends BaseAppResource { + val WHOLE_STAGE_CODEGEN = "WholeStageCodegen" + @GET def sqlList( @DefaultValue("false") @QueryParam("details") details: Boolean, + @DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean, @DefaultValue("0") @QueryParam("offset") offset: Int, @DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details)) + sqlStore.executionsList(offset, length).map(prepareExecutionData(_, + details = details, planDescription = planDescription)) } } @GET @Path("{executionId:\\d+}") def sql( @PathParam("executionId") execId: Long, - @DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = { + @DefaultValue("false") @QueryParam("details") details: Boolean, + @DefaultValue("true") @QueryParam("planDescription") + planDescription: Boolean): ExecutionData = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) + val graph = sqlStore.planGraph(execId) + val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = getNodeIdAndWSCGIdMap(graph) sqlStore .execution(execId) - .map(prepareExecutionData(_, details)) - .getOrElse(throw new NotFoundException("unknown id: " + execId)) + .map(prepareExecutionData(_, nodeIdAndWSCGIdMap, details, planDescription)) + .getOrElse(throw new NotFoundException("unknown execution id: " + execId)) } } private def printableMetrics( - metrics: Seq[SQLPlanMetric], - metricValues: Map[Long, String]): Seq[Metrics] = { - metrics.map(metric => - Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse(""))) + sqlPlanMetrics: Seq[SQLPlanMetric], + metricValues: Map[Long, String], + nodeIdAndWSCGIdMap: Map[Long, Option[Long]]): Seq[MetricDetails] = { + + def getMetric(metricValues: Map[Long, String], accumulatorId: Long, + metricName: String): Option[Metric] = { + metricValues.get(accumulatorId).map( mv => { + val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv + Metric(metricName, metricValue) + }) + } + + val groupedMap: Map[(Long, String), Seq[SQLPlanMetric]] = + sqlPlanMetrics.groupBy[(Long, String)]( + sqlPlanMetric => (sqlPlanMetric.nodeId.getOrElse(-1), sqlPlanMetric.nodeName.getOrElse(""))) + + val metrics = groupedMap.mapValues[Seq[Metric]](sqlPlanMetrics => + sqlPlanMetrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim))) + + val metricDetails = metrics.map { + case ((nodeId: Long, nodeName: String), metrics: Seq[Metric]) => + val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(nodeId).flatten + MetricDetails(nodeId = nodeId, nodeName = nodeName.trim, wholeStageCodegenId, metrics) + }.toSeq + + metricDetails.sortBy(_.nodeId).reverse } - private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = { + private def prepareExecutionData(exec: SQLExecutionUIData, + nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map.empty, Review comment: 4 spaces indent ########## File path: sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala ########## @@ -23,11 +23,16 @@ class ExecutionData private[spark] ( val status: String, val description: String, val planDescription: String, - val metrics: Seq[Metrics], val submissionTime: Date, val duration: Long, val runningJobIds: Seq[Int], val successJobIds: Seq[Int], - val failedJobIds: Seq[Int]) + val failedJobIds: Seq[Int], + val metricDetails: Seq[MetricDetails]) -case class Metrics private[spark] (metricName: String, metricValue: String) +case class MetricDetails private[spark] (nodeId: Long, + nodeName: String, Review comment: 4 spaces indent ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org