tgravescs commented on code in PR #34622:
URL: https://github.com/apache/spark/pull/34622#discussion_r842875274


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -202,6 +227,47 @@ class SQLAppStatusListener(
     }
   }
 
+  /* Connects Operators to Stages by doing the following:
+   * 1. Read SparkGraph to get every Node's name and respective AccumulatorIDs.
+   * 2. Gets each stage's AccumulatorIDs.
+   * 3. Maps Operators to stages by checking for non-zero intersection of 1 
and 2's AccumulatorIDs.
+   * 4. Connect SparkGraphNodes to respective StageIDs for rendering in SQL UI.
+   */
+  private def connectOperatorToStage(exec: LiveExecutionData): Map[Long, 
List[Int]] = {
+    // Reads in SparkPlanGraph to get each Node's name and respective 
accumulatorIDs
+    // and saves it inside nodeNameToAccumulatorIds.
+    val planGraph = kvstore.read(classOf[SparkPlanGraphWrapper], 
exec.executionId)
+      .toSparkPlanGraph()
+    val nodeNameToAccumulatorIds = planGraph.allNodes.map { node =>
+      (node.name, node.metrics.map(_.accumulatorId)) }
+    logDebug("each Operator's Metrics represented by AccumulatorIds are: \n 
%s\n"
+      .format(nodeNameToAccumulatorIds.mkString("\n ")))
+
+    // Gets each stage and its list of distinct accumulatorIDs
+    // which is retrived upon completion of a stage.
+    val stageIdToAccumulatorIDs = stageAccumulators.asScala
+    logDebug("each Stage's Metrics represented by AccumulatorIds are: \n %s\n"
+      .format(stageIdToAccumulatorIDs.mkString("\n ")))
+
+    // Maps stages to operators by checking for non-zero intersection
+    // between nodeMetrics and stageAccumulateIDs
+    val operatorToStage = nodeNameToAccumulatorIds.map { case (nodeName, 
accumulatorIds1) =>
+      val mappedStages = stageIdToAccumulatorIDs.flatMap { case (stageId, 
accumulatorIds2) =>
+        if (accumulatorIds1.intersect(accumulatorIds2).nonEmpty) Some(stageId)
+        else None
+      }.toList.sorted
+      (nodeName, accumulatorIds1, mappedStages)
+    }
+    val operatorToStageString = operatorToStage.mkString("\n")

Review Comment:
   just put this into the logDebug itself with ${}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to