gengliangwang commented on a change in pull request #28208:
URL: https://github.com/apache/spark/pull/28208#discussion_r419917593



##########
File path: 
sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
##########
@@ -21,46 +21,99 @@ import java.util.Date
 import javax.ws.rs._
 import javax.ws.rs.core.MediaType
 
+import scala.util.{Failure, Success, Try}
+
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, 
SQLExecutionUIData, SQLPlanMetric}
+import org.apache.spark.sql.execution.ui.{SparkPlanGraph, 
SparkPlanGraphCluster, SparkPlanGraphEdge, SQLAppStatusStore, 
SQLExecutionUIData, SQLPlanMetric}
 import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
 
 @Produces(Array(MediaType.APPLICATION_JSON))
 private[v1] class SqlResource extends BaseAppResource {
 
+  val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"
+
   @GET
   def sqlList(
       @DefaultValue("false") @QueryParam("details") details: Boolean,
+      @DefaultValue("true") @QueryParam("planDescription") planDescription: 
Boolean,
       @DefaultValue("0") @QueryParam("offset") offset: Int,
       @DefaultValue("20") @QueryParam("length") length: Int): 
Seq[ExecutionData] = {
     withUI { ui =>
       val sqlStore = new SQLAppStatusStore(ui.store.store)
-      sqlStore.executionsList(offset, length).map(prepareExecutionData(_, 
details))
+      sqlStore.executionsList(offset, length).map { exec =>
+        val (edges, nodeIdAndWSCGIdMap) = computeDetailsIfTrue(sqlStore, 
exec.executionId, details)
+        prepareExecutionData(exec, edges, nodeIdAndWSCGIdMap,
+          details = details, planDescription = planDescription)
+      }
     }
   }
 
   @GET
   @Path("{executionId:\\d+}")
   def sql(
       @PathParam("executionId") execId: Long,
-      @DefaultValue("false") @QueryParam("details") details: Boolean): 
ExecutionData = {
+      @DefaultValue("false") @QueryParam("details") details: Boolean,
+      @DefaultValue("true") @QueryParam("planDescription")
+      planDescription: Boolean): ExecutionData = {
     withUI { ui =>
       val sqlStore = new SQLAppStatusStore(ui.store.store)
+      val (edges, nodeIdAndWSCGIdMap) = computeDetailsIfTrue(sqlStore, execId, 
details)
       sqlStore
         .execution(execId)
-        .map(prepareExecutionData(_, details))
-        .getOrElse(throw new NotFoundException("unknown id: " + execId))
+        .map(prepareExecutionData(_, edges, nodeIdAndWSCGIdMap, details, 
planDescription))
+        .getOrElse(throw new NotFoundException("unknown execution id: " + 
execId))
+    }
+  }
+
+  private def computeDetailsIfTrue(sqlStore: SQLAppStatusStore,

Review comment:
       4 spaces indent

##########
File path: 
sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.sql
+
+import java.util.Date
+
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
+import org.apache.spark.sql.execution.ui.{SparkPlanGraphEdge, 
SQLExecutionUIData, SQLPlanMetric}
+
+object SqlResourceSuite {
+
+  val SCAN_TEXT = "Scan text"
+  val FILTER = "Filter"
+  val WHOLE_STAGE_CODEGEN_1 = "WholeStageCodegen (1)"
+  val DURATION = "duration"
+  val NUMBER_OF_OUTPUT_ROWS = "number of output rows"
+  val METADATA_TIME = "metadata time"
+  val NUMBER_OF_FILES_READ = "number of files read"
+  val SIZE_OF_FILES_READ = "size of files read"
+  val PLAN_DESCRIPTION = "== Physical Plan ==\nCollectLimit (3)\n+- * Filter 
(2)\n +- Scan text..."
+  val DESCRIPTION = "csv at MyDataFrames.scala:57"
+
+  val edges: Seq[SparkPlanGraphEdge] =
+    Seq(SparkPlanGraphEdge(3, 2), SparkPlanGraphEdge(2, 1), 
SparkPlanGraphEdge(1, 0))
+
+  private def getSQLExecutionUIData(): SQLExecutionUIData = {
+    def getMetrics(): Seq[SQLPlanMetric] = {
+      Seq(SQLPlanMetric(DURATION, 0, "", Some(1), Some(WHOLE_STAGE_CODEGEN_1)),
+        SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "", Some(2), Some(FILTER)),
+        SQLPlanMetric(METADATA_TIME, 2, "", Some(3), Some(SCAN_TEXT)),
+        SQLPlanMetric(NUMBER_OF_FILES_READ, 3, "", Some(3), Some(SCAN_TEXT)),
+        SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, "", Some(3), Some(SCAN_TEXT)),
+        SQLPlanMetric(SIZE_OF_FILES_READ, 5, "", Some(3), Some(SCAN_TEXT)))
+    }
+
+    def getMetricValues() = {
+      Map[Long, String](
+        0L -> "0 ms",
+        1L -> "1",
+        2L -> "2 ms",
+        3L -> "1",
+        4L -> "1",
+        5L -> "330.0 B"
+      )
+    }
+
+    new SQLExecutionUIData(
+      executionId = 0,
+      description = DESCRIPTION,
+      details = "",
+      physicalPlanDescription = PLAN_DESCRIPTION,
+      metrics = getMetrics(),
+      submissionTime = 1586768888233L,
+      completionTime = Some(new Date(1586768888999L)),
+      jobs = Map[Int, JobExecutionStatus](
+        0 -> JobExecutionStatus.SUCCEEDED,
+        1 -> JobExecutionStatus.SUCCEEDED),
+      stages = Set[Int](),
+      metricValues = getMetricValues()
+    )
+  }
+
+  private def getNodes(): Seq[Node] = {
+    val node = Node(1, WHOLE_STAGE_CODEGEN_1,
+      wholeStageCodegenId = None, metrics = Seq(Metric(DURATION, "0 ms")))
+    val node2 = Node(2, FILTER,
+      wholeStageCodegenId = Some(1), metrics = 
Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
+    val node3 = Node(3, SCAN_TEXT, wholeStageCodegenId = Some(1),
+      metrics = Seq(Metric(METADATA_TIME, "2 ms"),
+        Metric(NUMBER_OF_FILES_READ, "1"),
+        Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
+        Metric(SIZE_OF_FILES_READ, "330.0 B")))
+
+    // reverse order because of supporting execution order by aligning with 
Spark-UI
+    Seq(node3, node2, node)
+  }
+
+  private def getExpectedNodesWhenWholeStageCodegenIsOff(): Seq[Node] = {
+    val node = Node(1, WHOLE_STAGE_CODEGEN_1, metrics = Seq(Metric(DURATION, 
"0 ms")))
+    val node2 = Node(2, FILTER, metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, 
"1")))
+    val node3 = Node(3, SCAN_TEXT,
+      metrics = Seq(Metric(METADATA_TIME, "2 ms"),
+        Metric(NUMBER_OF_FILES_READ, "1"),
+        Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
+        Metric(SIZE_OF_FILES_READ, "330.0 B")))
+
+    // reverse order because of supporting execution order by aligning with 
Spark-UI
+    Seq(node3, node2, node)
+  }
+
+  private def verifyExpectedExecutionData(executionData: ExecutionData,
+                                          nodes: Seq[Node],

Review comment:
       4 spaces indent

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
##########
@@ -146,4 +146,6 @@ class SparkPlanGraphNodeWrapper(
 case class SQLPlanMetric(
     name: String,
     accumulatorId: Long,
-    metricType: String)
+    metricType: String,
+    nodeId: Option[Long] = None,

Review comment:
       As long as we can access the `sqlStore` in the `SqlResource.scala`, we 
can get the corresponding `SparkPlanGraphNode`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to