tgravescs commented on code in PR #34622:
URL: https://github.com/apache/spark/pull/34622#discussion_r855294364


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -202,6 +227,47 @@ class SQLAppStatusListener(
     }
   }
 
+  /* Connects Operators to Stages by doing the following:
+   * 1. Read SparkGraph to get every Node's name and respective AccumulatorIDs.
+   * 2. Gets each stage's AccumulatorIDs.
+   * 3. Maps Operators to stages by checking for non-zero intersection of 1 
and 2's AccumulatorIDs.
+   * 4. Connect SparkGraphNodes to respective StageIDs for rendering in SQL UI.
+   */
+  private def connectOperatorToStage(exec: LiveExecutionData): Map[Long, 
List[Int]] = {
+    // Reads in SparkPlanGraph to get each Node's name and respective 
accumulatorIDs
+    // and saves it inside nodeNameToAccumulatorIds.
+    val planGraph = kvstore.read(classOf[SparkPlanGraphWrapper], 
exec.executionId)
+      .toSparkPlanGraph()
+    val nodeNameToAccumulatorIds = planGraph.allNodes.map { node =>
+      (node.name, node.metrics.map(_.accumulatorId)) }
+    logDebug("each Operator's Metrics represented by AccumulatorIds are: \n 
%s\n"
+      .format(nodeNameToAccumulatorIds.mkString("\n ")))
+
+    // Gets each stage and its list of distinct accumulatorIDs
+    // which is retrived upon completion of a stage.
+    val stageIdToAccumulatorIDs = stageAccumulators.asScala
+    logDebug("each Stage's Metrics represented by AccumulatorIds are: \n %s\n"
+      .format(stageIdToAccumulatorIDs.mkString("\n ")))
+
+    // Maps stages to operators by checking for non-zero intersection
+    // between nodeMetrics and stageAccumulateIDs
+    val operatorToStage = nodeNameToAccumulatorIds.map { case (nodeName, 
accumulatorIds1) =>
+      val mappedStages = stageIdToAccumulatorIDs.flatMap { case (stageId, 
accumulatorIds2) =>
+        if (accumulatorIds1.intersect(accumulatorIds2).nonEmpty) Some(stageId)
+        else None
+      }.toList.sorted
+      (nodeName, accumulatorIds1, mappedStages)
+    }
+    val operatorToStageString = operatorToStage.mkString("\n")
+    logDebug(s"Each Operator's AccumulatorIds and Stages are:\n 
$operatorToStageString")
+
+    // Connect SparkGraphNode IDs to StageIDs for rendering in SQL UI in
+    // SparkPlanGraph's makeDotFile.
+    val operatorStageID = operatorToStage.map(x => x._3).toList

Review Comment:
    if you had it for debugging, that is fine, but remove it now. this is just 
extra memory usage and performance impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to