This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 08da649f08 [VL] Remove metrics try-catch, fix Generate metrics, and
handle missing join/aggregation params (#11861)
08da649f08 is described below
commit 08da649f0880e35151071cdec4020b999eb4c149
Author: Ankita Victor <[email protected]>
AuthorDate: Fri Apr 10 04:55:14 2026 +0530
[VL] Remove metrics try-catch, fix Generate metrics, and handle missing
join/aggregation params (#11861)
This PR removes the silent try/catch in
MetricsUtil.genMetricsUpdatingFunction that was masking metrics bugs and fixes
metrics issues across different files:
Adds proper Generate transformer metrics to VeloxMetricsApi
Registers JoinParams for CartesianProductExecTransformer with
postProjectionNeeded = false
Adds null-safe defaults for missing join/aggregation params in metrics
updates
---
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 20 ++++++++
.../gluten/execution/GenerateExecTransformer.scala | 18 ++-----
.../apache/gluten/metrics/JoinMetricsUpdater.scala | 15 +++---
.../org/apache/gluten/metrics/MetricsUtil.scala | 57 ++++++++++------------
.../gluten/execution/VeloxMetricsSuite.scala | 21 ++++++++
.../org/apache/gluten/backendsapi/MetricsApi.scala | 6 +++
.../CartesianProductExecTransformer.scala | 10 +++-
7 files changed, 94 insertions(+), 53 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index caa2e7de4b..9643f4902a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -358,6 +358,26 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genExpandTransformerMetricsUpdater(metrics: Map[String,
SQLMetric]): MetricsUpdater =
new ExpandMetricsUpdater(metrics)
+ override def genGenerateTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
+ "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
+ "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of
generate"),
+ "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time
count"),
+ "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak
memory bytes"),
+ "numMemoryAllocations" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of memory allocations"),
+ "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "time of loading lazy vectors")
+ )
+
+ override def genGenerateTransformerMetricsUpdater(
+ metrics: Map[String, SQLMetric]): MetricsUpdater =
+ new GenerateMetricsUpdater(metrics)
+
override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String,
SQLMetric] =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index c4b2e84d67..7c99c96733 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.GenerateExecTransformer.supportsGenerate
-import org.apache.gluten.metrics.{GenerateMetricsUpdater, MetricsUpdater}
+import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.ExpressionNode
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode,
ExtensionBuilder}
@@ -27,7 +27,6 @@ import org.apache.gluten.utils.PullOutProjectHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
-import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{BooleanType, IntegerType}
import com.google.protobuf.StringValue
@@ -50,19 +49,10 @@ case class GenerateExecTransformer(
@transient
override lazy val metrics =
- Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of
output vectors"),
- "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
- "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of
generate"),
- "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time
count"),
- "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak
memory bytes"),
- "numMemoryAllocations" -> SQLMetrics.createMetric(
- sparkContext,
- "number of memory allocations")
- )
+
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetrics(sparkContext)
- override def metricsUpdater(): MetricsUpdater = new
GenerateMetricsUpdater(metrics)
+ override def metricsUpdater(): MetricsUpdater =
+
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetricsUpdater(metrics)
override protected def withNewChildInternal(newChild: SparkPlan):
GenerateExecTransformer =
copy(generator, requiredChildOutput, outer, generatorOutput, newChild)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
index cf894b9da4..b056cd36a8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
@@ -46,13 +46,14 @@ abstract class JoinMetricsUpdaterBase(val metrics:
Map[String, SQLMetric])
joinMetrics: util.ArrayList[OperatorMetrics],
singleMetrics: SingleMetric,
joinParams: JoinParams): Unit = {
- assert(joinParams.postProjectionNeeded)
- val postProjectMetrics = joinMetrics.remove(0)
- postProjectionCpuCount += postProjectMetrics.cpuCount
- postProjectionWallNanos += postProjectMetrics.wallNanos
- numOutputRows += postProjectMetrics.outputRows
- numOutputVectors += postProjectMetrics.outputVectors
- numOutputBytes += postProjectMetrics.outputBytes
+ if (joinParams.postProjectionNeeded) {
+ val postProjectMetrics = joinMetrics.remove(0)
+ postProjectionCpuCount += postProjectMetrics.cpuCount
+ postProjectionWallNanos += postProjectMetrics.wallNanos
+ numOutputRows += postProjectMetrics.outputRows
+ numOutputVectors += postProjectMetrics.outputVectors
+ numOutputBytes += postProjectMetrics.outputBytes
+ }
updateJoinMetricsInternal(joinMetrics, joinParams)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 09430fdd70..0567ccc8ce 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -253,19 +253,19 @@ object MetricsUtil extends Logging {
mutNode.updater match {
case smj: SortMergeJoinMetricsUpdater =>
- smj.updateJoinMetrics(
- operatorMetrics,
- metrics.getSingleMetrics,
- joinParamsMap.get(operatorIdx))
+ val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
+ val p = JoinParams(); p.postProjectionNeeded = false; p
+ }
+ smj.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics,
joinParams)
case ju: JoinMetricsUpdaterBase =>
// JoinRel and CrossRel output two suites of metrics respectively for
build and probe.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
- ju.updateJoinMetrics(
- operatorMetrics,
- metrics.getSingleMetrics,
- joinParamsMap.get(operatorIdx))
+ val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
+ val p = JoinParams(); p.postProjectionNeeded = false; p
+ }
+ ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics,
joinParams)
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build
and hash probe.
// Therefore, fetch one more suite of metrics here.
@@ -273,7 +273,8 @@ object MetricsUtil extends Logging {
curMetricsIdx -= 1
u.updateUnionMetrics(operatorMetrics)
case hau: HashAggregateMetricsUpdater =>
- hau.updateAggregationMetrics(operatorMetrics,
aggParamsMap.get(operatorIdx))
+ val aggParams =
Option(aggParamsMap.get(operatorIdx)).getOrElse(AggregationParams())
+ hau.updateAggregationMetrics(operatorMetrics, aggParams)
case lu: LimitMetricsUpdater =>
// Limit over Sort is converted to TopN node in Velox, so there is
only one suite of metrics
// for the two transformers. We do not update metrics for limit and
leave it for sort.
@@ -342,30 +343,24 @@ object MetricsUtil extends Logging {
aggParamsMap: JMap[JLong, AggregationParams],
taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = {
imetrics =>
- try {
- val metrics = imetrics.asInstanceOf[Metrics]
- val numNativeMetrics = metrics.inputRows.length
- if (numNativeMetrics == 0) {
- ()
- } else {
- updateTransformerMetricsInternal(
- mutNode,
- relMap,
- operatorIdx,
- metrics,
- numNativeMetrics - 1,
- joinParamsMap,
- aggParamsMap)
+ val metrics = imetrics.asInstanceOf[Metrics]
+ val numNativeMetrics = metrics.inputRows.length
+ if (numNativeMetrics == 0) {
+ ()
+ } else {
+ updateTransformerMetricsInternal(
+ mutNode,
+ relMap,
+ operatorIdx,
+ metrics,
+ numNativeMetrics - 1,
+ joinParamsMap,
+ aggParamsMap)
- // Update the task stats accumulator with the metrics.
- if (metrics.taskStats != null) {
- taskStatsAccumulator.add(metrics.taskStats)
- }
+ // Update the task stats accumulator with the metrics.
+ if (metrics.taskStats != null) {
+ taskStatsAccumulator.add(metrics.taskStats)
}
- } catch {
- case e: Exception =>
- logWarning(s"Updating native metrics failed due to ${e.getCause}.")
- ()
}
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 247de220d0..35edc4fa6e 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -152,6 +152,27 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(metrics("numOutputVectors").value > 0)
assert(metrics("numOutputBytes").value > 0)
}
+
+ runQueryAndCompare(
+ "SELECT c1, col FROM metrics_t1 LATERAL VIEW explode(array(c1, c2)) t AS
col") {
+ df =>
+ val scan = find(df.queryExecution.executedPlan) {
+ case _: FileSourceScanExecTransformer => true
+ case _ => false
+ }
+ assert(scan.isDefined)
+ val scanMetrics = scan.get.metrics
+ assert(scanMetrics("rawInputRows").value > 0)
+
+ val generate = find(df.queryExecution.executedPlan) {
+ case _: GenerateExecTransformer => true
+ case _ => false
+ }
+ assert(generate.isDefined)
+ val genMetrics = generate.get.metrics
+ assert(genMetrics("numOutputRows").value == 2 *
scanMetrics("rawInputRows").value)
+ assert(genMetrics.contains("loadLazyVectorTime"))
+ }
}
test("Metrics of window") {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index 896979b100..93e7c1fb9d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -82,6 +82,12 @@ trait MetricsApi extends Serializable {
def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]):
MetricsUpdater
+ def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String,
SQLMetric] =
+ throw new UnsupportedOperationException()
+
+ def genGenerateTransformerMetricsUpdater(metrics: Map[String, SQLMetric]):
MetricsUpdater =
+ throw new UnsupportedOperationException()
+
def genCustomExpandMetrics(sparkContext: SparkContext): Map[String,
SQLMetric]
def genColumnarShuffleExchangeMetrics(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 868e6f1a9e..8e6df15934 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.{JoinParams, SubstraitContext}
import org.apache.gluten.substrait.rel.RelBuilder
import org.apache.gluten.utils.SubstraitUtil
@@ -96,6 +96,11 @@ case class CartesianProductExecTransformer(
JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput,
validation = false)
val operatorId = context.nextOperatorId(this.nodeName)
+ val joinParams = new JoinParams
+ joinParams.postProjectionNeeded = false
+ if (condition.isDefined) {
+ joinParams.isWithCondition = true
+ }
val currRel = RelBuilder.makeCrossRel(
inputLeftRelNode,
@@ -106,6 +111,9 @@ case class CartesianProductExecTransformer(
context,
operatorId
)
+
+ context.registerJoinParam(operatorId, joinParams)
+
TransformContext(output, currRel)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]