tgravescs commented on code in PR #34622:
URL: https://github.com/apache/spark/pull/34622#discussion_r842897202


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -202,6 +227,47 @@ class SQLAppStatusListener(
     }
   }
 
+  /* Connects Operators to Stages by doing the following:
+   * 1. Read SparkGraph to get every Node's name and respective AccumulatorIDs.
+   * 2. Gets each stage's AccumulatorIDs.
+   * 3. Maps Operators to stages by checking for non-zero intersection of 1 
and 2's AccumulatorIDs.
+   * 4. Connect SparkGraphNodes to respective StageIDs for rendering in SQL UI.
+   */
+  private def connectOperatorToStage(exec: LiveExecutionData): Map[Long, 
List[Int]] = {
+    // Reads in SparkPlanGraph to get each Node's name and respective 
accumulatorIDs
+    // and saves it inside nodeNameToAccumulatorIds.
+    val planGraph = kvstore.read(classOf[SparkPlanGraphWrapper], 
exec.executionId)
+      .toSparkPlanGraph()
+    val nodeNameToAccumulatorIds = planGraph.allNodes.map { node =>
+      (node.name, node.metrics.map(_.accumulatorId)) }
+    logDebug("each Operator's Metrics represented by AccumulatorIds are: \n 
%s\n"
+      .format(nodeNameToAccumulatorIds.mkString("\n ")))
+
+    // Gets each stage and its list of distinct accumulatorIDs
+    // which is retrived upon completion of a stage.
+    val stageIdToAccumulatorIDs = stageAccumulators.asScala
+    logDebug("each Stage's Metrics represented by AccumulatorIds are: \n %s\n"
+      .format(stageIdToAccumulatorIDs.mkString("\n ")))
+
+    // Maps stages to operators by checking for non-zero intersection
+    // between nodeMetrics and stageAccumulateIDs
+    val operatorToStage = nodeNameToAccumulatorIds.map { case (nodeName, 
accumulatorIds1) =>
+      val mappedStages = stageIdToAccumulatorIDs.flatMap { case (stageId, 
accumulatorIds2) =>
+        if (accumulatorIds1.intersect(accumulatorIds2).nonEmpty) Some(stageId)
+        else None
+      }.toList.sorted
+      (nodeName, accumulatorIds1, mappedStages)
+    }
+    val operatorToStageString = operatorToStage.mkString("\n")
+    logDebug(s"Each Operator's AccumulatorIds and Stages are:\n 
$operatorToStageString")
+
+    // Connect SparkGraphNode IDs to StageIDs for rendering in SQL UI in
+    // SparkPlanGraph's makeDotFile.
+    val operatorStageID = operatorToStage.map(x => x._3).toList

Review Comment:
   this all seems extra and uneeded code, instead of passing around the 
nodeName which isn't used above, use the nodeId when creating the tuple. Also 
just drop accumulatorIds1 from tuple if its not used anywhere.  If I'm missing 
something let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to