This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 70a4a1e7ac05 [SPARK-54988][SQL] Simplify the implementation of 
ObservationManager.tryComplete
70a4a1e7ac05 is described below

commit 70a4a1e7ac0555e6939034d8a2f996c14a58c656
Author: Yihong He <[email protected]>
AuthorDate: Mon Jan 12 10:28:25 2026 +0800

    [SPARK-54988][SQL] Simplify the implementation of 
ObservationManager.tryComplete
    
    ### What changes were proposed in this pull request?
    
    This PR optimizes the tryComplete method in ObservationManager by:
    1. Making allMetrics a lazy val to defer metric collection until needed
    2. Simplifying the conditional logic using getOrElse(c.name, Row.empty)
    
    ### Why are the changes needed?
    
    - Performance: The current implementation eagerly collects observed metrics 
even when the logical plan doesn't contain CollectMetrics nodes. Using lazy val 
allows tryComplete to complete faster when metric collection is unnecessary.
    - Code simplification: The original code had separate branches for handling 
missing vs. present metrics. This consolidates them into a single, cleaner code 
path.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests for ObservationManager and observation functionality 
cover this change. The refactoring maintains identical behavior.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    Closes #53752 from heyihong/SPARK-54988.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/classic/ObservationManager.scala     | 27 ++++++++--------------
 1 file changed, 9 insertions(+), 18 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
index b5ec18d5ff12..fbe7034cc247 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/ObservationManager.scala
@@ -54,27 +54,18 @@ private[sql] class ObservationManager(session: 
SparkSession) {
     })
 
   private def tryComplete(qe: QueryExecution): Unit = {
-    val allMetrics = qe.observedMetrics
+    // Use lazy val to defer collecting the observed metrics until it is 
needed so that tryComplete
+    // can finish faster (e.g., when the logical plan doesn't contain 
CollectMetrics).
+    lazy val allMetrics = qe.observedMetrics
     qe.logical.foreachWithSubqueriesAndPruning(
       _.containsPattern(TreePattern.COLLECT_METRICS)) {
       case c: CollectMetrics =>
-        val keyExists = observations.containsKey((c.name, c.dataframeId))
-        val metrics = allMetrics.get(c.name)
-        if (keyExists && metrics.isEmpty) {
-          // If the key exists but no metrics were collected, it means for 
some reason the metrics
-          // could not be collected. This can happen e.g., if the 
CollectMetricsExec was optimized
-          // away.
-          val observation = observations.remove((c.name, c.dataframeId))
-          if (observation != null) {
-            observation.setMetricsAndNotify(Row.empty)
-          }
-        } else {
-          metrics.foreach { metrics =>
-            val observation = observations.remove((c.name, c.dataframeId))
-            if (observation != null) {
-              observation.setMetricsAndNotify(metrics)
-            }
-          }
+        val observation = observations.remove((c.name, c.dataframeId))
+        if (observation != null) {
+          // If the key exists but no metrics were collected, it means for 
some reason the
+          // metrics could not be collected. This can happen e.g., if the 
CollectMetricsExec
+          // was optimized away.
+          observation.setMetricsAndNotify(allMetrics.getOrElse(c.name, 
Row.empty))
         }
       case _ =>
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to