This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 08da649f08 [VL] Remove metrics try-catch, fix Generate metrics, and 
handle missing join/aggregation params (#11861)
08da649f08 is described below

commit 08da649f0880e35151071cdec4020b999eb4c149
Author: Ankita Victor <[email protected]>
AuthorDate: Fri Apr 10 04:55:14 2026 +0530

    [VL] Remove metrics try-catch, fix Generate metrics, and handle missing 
join/aggregation params (#11861)
    
    This PR removes the silent try/catch in 
MetricsUtil.genMetricsUpdatingFunction that was masking metrics bugs and fixes 
metrics issues across different files:
    
    Adds proper Generate transformer metrics to VeloxMetricsApi
    Registers JoinParams for CartesianProductExecTransformer with 
postProjectionNeeded = false
    Adds null-safe defaults for missing join/aggregation params in metrics 
updates
---
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 20 ++++++++
 .../gluten/execution/GenerateExecTransformer.scala | 18 ++-----
 .../apache/gluten/metrics/JoinMetricsUpdater.scala | 15 +++---
 .../org/apache/gluten/metrics/MetricsUtil.scala    | 57 ++++++++++------------
 .../gluten/execution/VeloxMetricsSuite.scala       | 21 ++++++++
 .../org/apache/gluten/backendsapi/MetricsApi.scala |  6 +++
 .../CartesianProductExecTransformer.scala          | 10 +++-
 7 files changed, 94 insertions(+), 53 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index caa2e7de4b..9643f4902a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -358,6 +358,26 @@ class VeloxMetricsApi extends MetricsApi with Logging {
   override def genExpandTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
     new ExpandMetricsUpdater(metrics)
 
+  override def genGenerateTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
+    Map(
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
+      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
+      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of 
generate"),
+      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time 
count"),
+      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
+      "numMemoryAllocations" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of memory allocations"),
+      "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "time of loading lazy vectors")
+    )
+
+  override def genGenerateTransformerMetricsUpdater(
+      metrics: Map[String, SQLMetric]): MetricsUpdater =
+    new GenerateMetricsUpdater(metrics)
+
   override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric] =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index c4b2e84d67..7c99c96733 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.GenerateExecTransformer.supportsGenerate
-import org.apache.gluten.metrics.{GenerateMetricsUpdater, MetricsUpdater}
+import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.expression.ExpressionNode
 import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, 
ExtensionBuilder}
@@ -27,7 +27,6 @@ import org.apache.gluten.utils.PullOutProjectHelper
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
-import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.{BooleanType, IntegerType}
 
 import com.google.protobuf.StringValue
@@ -50,19 +49,10 @@ case class GenerateExecTransformer(
 
   @transient
   override lazy val metrics =
-    Map(
-      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
-      "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
-      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of 
generate"),
-      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time 
count"),
-      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak 
memory bytes"),
-      "numMemoryAllocations" -> SQLMetrics.createMetric(
-        sparkContext,
-        "number of memory allocations")
-    )
+    
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetrics(sparkContext)
 
-  override def metricsUpdater(): MetricsUpdater = new 
GenerateMetricsUpdater(metrics)
+  override def metricsUpdater(): MetricsUpdater =
+    
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetricsUpdater(metrics)
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
GenerateExecTransformer =
     copy(generator, requiredChildOutput, outer, generatorOutput, newChild)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
index cf894b9da4..b056cd36a8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
@@ -46,13 +46,14 @@ abstract class JoinMetricsUpdaterBase(val metrics: 
Map[String, SQLMetric])
       joinMetrics: util.ArrayList[OperatorMetrics],
       singleMetrics: SingleMetric,
       joinParams: JoinParams): Unit = {
-    assert(joinParams.postProjectionNeeded)
-    val postProjectMetrics = joinMetrics.remove(0)
-    postProjectionCpuCount += postProjectMetrics.cpuCount
-    postProjectionWallNanos += postProjectMetrics.wallNanos
-    numOutputRows += postProjectMetrics.outputRows
-    numOutputVectors += postProjectMetrics.outputVectors
-    numOutputBytes += postProjectMetrics.outputBytes
+    if (joinParams.postProjectionNeeded) {
+      val postProjectMetrics = joinMetrics.remove(0)
+      postProjectionCpuCount += postProjectMetrics.cpuCount
+      postProjectionWallNanos += postProjectMetrics.wallNanos
+      numOutputRows += postProjectMetrics.outputRows
+      numOutputVectors += postProjectMetrics.outputVectors
+      numOutputBytes += postProjectMetrics.outputBytes
+    }
 
     updateJoinMetricsInternal(joinMetrics, joinParams)
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 09430fdd70..0567ccc8ce 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -253,19 +253,19 @@ object MetricsUtil extends Logging {
 
     mutNode.updater match {
       case smj: SortMergeJoinMetricsUpdater =>
-        smj.updateJoinMetrics(
-          operatorMetrics,
-          metrics.getSingleMetrics,
-          joinParamsMap.get(operatorIdx))
+        val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
+          val p = JoinParams(); p.postProjectionNeeded = false; p
+        }
+        smj.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, 
joinParams)
       case ju: JoinMetricsUpdaterBase =>
         // JoinRel and CrossRel output two suites of metrics respectively for 
build and probe.
         // Therefore, fetch one more suite of metrics here.
         operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
         curMetricsIdx -= 1
-        ju.updateJoinMetrics(
-          operatorMetrics,
-          metrics.getSingleMetrics,
-          joinParamsMap.get(operatorIdx))
+        val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
+          val p = JoinParams(); p.postProjectionNeeded = false; p
+        }
+        ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, 
joinParams)
       case u: UnionMetricsUpdater =>
         // JoinRel outputs two suites of metrics respectively for hash build 
and hash probe.
         // Therefore, fetch one more suite of metrics here.
@@ -273,7 +273,8 @@ object MetricsUtil extends Logging {
         curMetricsIdx -= 1
         u.updateUnionMetrics(operatorMetrics)
       case hau: HashAggregateMetricsUpdater =>
-        hau.updateAggregationMetrics(operatorMetrics, 
aggParamsMap.get(operatorIdx))
+        val aggParams = 
Option(aggParamsMap.get(operatorIdx)).getOrElse(AggregationParams())
+        hau.updateAggregationMetrics(operatorMetrics, aggParams)
       case lu: LimitMetricsUpdater =>
         // Limit over Sort is converted to TopN node in Velox, so there is 
only one suite of metrics
         // for the two transformers. We do not update metrics for limit and 
leave it for sort.
@@ -342,30 +343,24 @@ object MetricsUtil extends Logging {
       aggParamsMap: JMap[JLong, AggregationParams],
       taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = {
     imetrics =>
-      try {
-        val metrics = imetrics.asInstanceOf[Metrics]
-        val numNativeMetrics = metrics.inputRows.length
-        if (numNativeMetrics == 0) {
-          ()
-        } else {
-          updateTransformerMetricsInternal(
-            mutNode,
-            relMap,
-            operatorIdx,
-            metrics,
-            numNativeMetrics - 1,
-            joinParamsMap,
-            aggParamsMap)
+      val metrics = imetrics.asInstanceOf[Metrics]
+      val numNativeMetrics = metrics.inputRows.length
+      if (numNativeMetrics == 0) {
+        ()
+      } else {
+        updateTransformerMetricsInternal(
+          mutNode,
+          relMap,
+          operatorIdx,
+          metrics,
+          numNativeMetrics - 1,
+          joinParamsMap,
+          aggParamsMap)
 
-          // Update the task stats accumulator with the metrics.
-          if (metrics.taskStats != null) {
-            taskStatsAccumulator.add(metrics.taskStats)
-          }
+        // Update the task stats accumulator with the metrics.
+        if (metrics.taskStats != null) {
+          taskStatsAccumulator.add(metrics.taskStats)
         }
-      } catch {
-        case e: Exception =>
-          logWarning(s"Updating native metrics failed due to ${e.getCause}.")
-          ()
       }
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 247de220d0..35edc4fa6e 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -152,6 +152,27 @@ class VeloxMetricsSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
         assert(metrics("numOutputVectors").value > 0)
         assert(metrics("numOutputBytes").value > 0)
     }
+
+    runQueryAndCompare(
+      "SELECT c1, col FROM metrics_t1 LATERAL VIEW explode(array(c1, c2)) t AS 
col") {
+      df =>
+        val scan = find(df.queryExecution.executedPlan) {
+          case _: FileSourceScanExecTransformer => true
+          case _ => false
+        }
+        assert(scan.isDefined)
+        val scanMetrics = scan.get.metrics
+        assert(scanMetrics("rawInputRows").value > 0)
+
+        val generate = find(df.queryExecution.executedPlan) {
+          case _: GenerateExecTransformer => true
+          case _ => false
+        }
+        assert(generate.isDefined)
+        val genMetrics = generate.get.metrics
+        assert(genMetrics("numOutputRows").value == 2 * 
scanMetrics("rawInputRows").value)
+        assert(genMetrics.contains("loadLazyVectorTime"))
+    }
   }
 
   test("Metrics of window") {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index 896979b100..93e7c1fb9d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -82,6 +82,12 @@ trait MetricsApi extends Serializable {
 
   def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater
 
+  def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric] =
+    throw new UnsupportedOperationException()
+
+  def genGenerateTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater =
+    throw new UnsupportedOperationException()
+
   def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric]
 
   def genColumnarShuffleExchangeMetrics(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 868e6f1a9e..8e6df15934 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.{JoinParams, SubstraitContext}
 import org.apache.gluten.substrait.rel.RelBuilder
 import org.apache.gluten.utils.SubstraitUtil
 
@@ -96,6 +96,11 @@ case class CartesianProductExecTransformer(
       JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput, 
validation = false)
 
     val operatorId = context.nextOperatorId(this.nodeName)
+    val joinParams = new JoinParams
+    joinParams.postProjectionNeeded = false
+    if (condition.isDefined) {
+      joinParams.isWithCondition = true
+    }
 
     val currRel = RelBuilder.makeCrossRel(
       inputLeftRelNode,
@@ -106,6 +111,9 @@ case class CartesianProductExecTransformer(
       context,
       operatorId
     )
+
+    context.registerJoinParam(operatorId, joinParams)
+
     TransformContext(output, currRel)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to