This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f08523bd969e [SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec 
interpreted iterator
f08523bd969e is described below

commit f08523bd969e5f1b20db09fdcb94a53e4408c0d8
Author: Szehon Ho <[email protected]>
AuthorDate: Thu May 21 10:29:01 2026 +0800

    [SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator
    
    ### What changes were proposed in this pull request?
    
    Cache `SQLMetric` references in `MergeRowIterator` and update them directly 
in the hot loop. Previously, each row called `longMetric("…")`, which performs 
a `metrics(name)` map lookup on every increment (up to 2–3 lookups per 
delete/update row). Metrics are `lazy val` fields so a partition only resolves 
metrics it actually increments.
    
    This matches the pattern used elsewhere (e.g. `FilterEvaluatorFactory` 
passes a `SQLMetric` into the partition evaluator). The whole-stage codegen 
path is unchanged; it already resolves metrics once via `metricTerm`.
    
    `codegenBenchmark` in `SqlBasedBenchmark` now accepts optional 
`warmupTime`, `minTime`, and per-case `numIters`. `MergeRowsExecBenchmark` uses 
7s warmup and a 7s timed window for all whole-stage on/off cases.
    
    ### Why are the changes needed?
    
    `MergeRowsExec` updates multiple MERGE metrics per output row on the 
interpreted path (`doExecute` / `MergeRowIterator`). For delete-heavy workloads 
with little projection work, repeated map lookups were a noticeable fraction of 
per-row cost. Production MERGE typically runs with whole-stage codegen enabled, 
but the interpreted path is still used when codegen is disabled or unsupported.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing `MergeRowsExec` / MERGE tests (CI).
    
    **Local benchmark** (`MergeRowsExecBenchmark`, 20M rows, Apple M4 Max, JDK 
21). **Both sides used the same benchmark harness** (7s `warmupTime`, 7s 
`minTime`, `wholestageOffNumIters = 0` / `wholestageOnNumIters = 0` via 
extended `codegenBenchmark`). Compared `MergeRowsExec` **without** the cache 
(`1ad4fa420cd`, parent of the cache commit) vs **with** the cache (this PR), 
checking out only that file between runs.
    
    ```bash
    SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \
      -Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \
      "sql/Test/runMain 
org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"
    ```
    
    **Whole-stage off (interpreted path)** — best time (ms):
    
    | Case | Without cache | With cache (this PR) | Change |
    |------|--------------:|---------------------:|-------:|
    | matched update only | 5475 | 5238 | −4% |
    | not matched insert only | 7612 | 7337 | −4% |
    | matched update + not matched insert | 5795 | 4315 | −26% |
    | matched delete | 2914 | 546 | −81% |
    | conditional clauses | 3872 | 1251 | −68% |
    | matched + not matched + not matched by source | 3813 | 1119 | −71% |
    | split update (delete + insert) | 1844 | 1400 | −24% |
    
    Matched-update-only and insert-only are roughly unchanged on the 
interpreted path in this run; the largest wins are on delete-heavy and 
multi-metric cases.
    
    **Whole-stage on (codegen)** — unchanged within noise (e.g. matched delete 
best ~13 ms; matched update only ~333–338 ms).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #55967 from szehon-ho/spark-cache-merge-rows-metrics.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/datasources/v2/MergeRowsExec.scala   | 46 ++++++++++++++--------
 .../benchmark/MergeRowsExecBenchmark.scala         | 28 +++++++++----
 .../execution/benchmark/SqlBasedBenchmark.scala    | 37 ++++++++++++++---
 3 files changed, 82 insertions(+), 29 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
index 67212de165e9..887f6d832c82 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -518,6 +518,19 @@ case class MergeRowsExec(
       private val notMatchedBySourceInstructions: Seq[InstructionExec])
     extends Iterator[InternalRow] {
 
+    // Resolve each metric at most once per partition, on first use; 
longMetric(name) is a map
+    // lookup. See SPARK-56933.
+    private lazy val numTargetRowsCopied = longMetric("numTargetRowsCopied")
+    private lazy val numTargetRowsInserted = 
longMetric("numTargetRowsInserted")
+    private lazy val numTargetRowsDeleted = longMetric("numTargetRowsDeleted")
+    private lazy val numTargetRowsUpdated = longMetric("numTargetRowsUpdated")
+    private lazy val numTargetRowsMatchedUpdated = 
longMetric("numTargetRowsMatchedUpdated")
+    private lazy val numTargetRowsMatchedDeleted = 
longMetric("numTargetRowsMatchedDeleted")
+    private lazy val numTargetRowsNotMatchedBySourceUpdated =
+      longMetric("numTargetRowsNotMatchedBySourceUpdated")
+    private lazy val numTargetRowsNotMatchedBySourceDeleted =
+      longMetric("numTargetRowsNotMatchedBySourceDeleted")
+
     var cachedExtraRow: InternalRow = _
 
     override def hasNext: Boolean = cachedExtraRow != null || 
rowIterator.hasNext
@@ -579,28 +592,27 @@ case class MergeRowsExec(
 
       null
     }
-  }
 
-  // For group based merge, copy is inserted if row matches no other case
-  private def incrementCopyMetric(): Unit = longMetric("numTargetRowsCopied") 
+= 1
+    private def incrementCopyMetric(): Unit = numTargetRowsCopied += 1
 
-  private def incrementInsertMetric(): Unit = 
longMetric("numTargetRowsInserted") += 1
+    private def incrementInsertMetric(): Unit = numTargetRowsInserted += 1
 
-  private def incrementDeleteMetric(sourcePresent: Boolean): Unit = {
-    longMetric("numTargetRowsDeleted") += 1
-    if (sourcePresent) {
-      longMetric("numTargetRowsMatchedDeleted") += 1
-    } else {
-      longMetric("numTargetRowsNotMatchedBySourceDeleted") += 1
+    private def incrementDeleteMetric(sourcePresent: Boolean): Unit = {
+      numTargetRowsDeleted += 1
+      if (sourcePresent) {
+        numTargetRowsMatchedDeleted += 1
+      } else {
+        numTargetRowsNotMatchedBySourceDeleted += 1
+      }
     }
-  }
 
-  private def incrementUpdateMetric(sourcePresent: Boolean): Unit = {
-    longMetric("numTargetRowsUpdated") += 1
-    if (sourcePresent) {
-      longMetric("numTargetRowsMatchedUpdated") += 1
-    } else {
-      longMetric("numTargetRowsNotMatchedBySourceUpdated") += 1
+    private def incrementUpdateMetric(sourcePresent: Boolean): Unit = {
+      numTargetRowsUpdated += 1
+      if (sourcePresent) {
+        numTargetRowsMatchedUpdated += 1
+      } else {
+        numTargetRowsNotMatchedBySourceUpdated += 1
+      }
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
index 8ddbca46b739..0fcac326d923 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.benchmark
 
+import scala.concurrent.duration._
+
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GreaterThan, IsNotNull, Literal}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
@@ -43,6 +45,18 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with 
ClassicConversions
 
   private val N = 20 << 20
 
+  /** Longer warm-up and timed window for stable interpreted (whole-stage off) 
results. */
+  private def mergeRowsBenchmark(name: String, cardinality: Long)(f: => Unit): 
Unit = {
+    codegenBenchmark(
+      name,
+      cardinality,
+      warmupTime = 7.seconds,
+      minTime = 7.seconds,
+      minNumIters = 3,
+      wholestageOffNumIters = 0,
+      wholestageOnNumIters = 0)(f)
+  }
+
   /**
    * Creates a DataFrame simulating the join output from a MERGE operation.
    *
@@ -110,7 +124,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
       a(0), a(5), a(6), a(3)
     )))
 
-    codegenBenchmark("merge - matched update only", N) {
+    mergeRowsBenchmark("merge - matched update only", N) {
       val df = buildMergeRowsDF(inputDF, matchedInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
@@ -126,7 +140,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
       a(4), a(5), a(6), a(7)
     )))
 
-    codegenBenchmark("merge - not matched insert only", N) {
+    mergeRowsBenchmark("merge - not matched insert only", N) {
       val df = buildMergeRowsDF(inputDF, Seq.empty, notMatchedInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
@@ -144,7 +158,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
       a(4), a(5), a(6), a(7)
     )))
 
-    codegenBenchmark("merge - matched update + not matched insert", N) {
+    mergeRowsBenchmark("merge - matched update + not matched insert", N) {
       val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
@@ -156,7 +170,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
 
     val matchedInstr = Seq(Discard(TrueLiteral))
 
-    codegenBenchmark("merge - matched delete", N) {
+    mergeRowsBenchmark("merge - matched delete", N) {
       val df = buildMergeRowsDF(inputDF, matchedInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
@@ -177,7 +191,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
       Keep(Insert, GreaterThan(a(5), Literal(500)), Seq(a(4), a(5), a(6), 
a(7)))
     )
 
-    codegenBenchmark("merge - conditional clauses", N) {
+    mergeRowsBenchmark("merge - conditional clauses", N) {
       val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
@@ -199,7 +213,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
     )))
     val notMatchedBySourceInstr = Seq(Discard(TrueLiteral))
 
-    codegenBenchmark("merge - matched + not matched + not matched by source", 
N) {
+    mergeRowsBenchmark("merge - matched + not matched + not matched by 
source", N) {
       val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr, 
notMatchedBySourceInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
@@ -216,7 +230,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark 
with ClassicConversions
       Seq(a(0), a(5), a(6), a(3))
     ))
 
-    codegenBenchmark("merge - split update (delete + insert)", N) {
+    mergeRowsBenchmark("merge - split update (delete + insert)", N) {
       val df = buildMergeRowsDF(inputDF, matchedInstr)
       assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec]))
       df.noop()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
index 78d6b0158035..6c60721599bb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.benchmark
 
+import scala.concurrent.duration._
+
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -46,17 +48,42 @@ trait SqlBasedBenchmark extends BenchmarkBase with 
SQLHelper {
       .getOrCreate()
   }
 
-  /** Runs function `f` with whole stage codegen on and off. */
-  final def codegenBenchmark(name: String, cardinality: Long)(f: => Unit): 
Unit = {
-    val benchmark = new Benchmark(name, cardinality, output = output)
+  /**
+   * Runs function `f` with whole stage codegen on and off.
+   *
+   * @param minNumIters minimum timed iterations per case when the 
corresponding
+   *        `wholestageOffNumIters` or `wholestageOnNumIters` is zero.
+   * @param warmupTime JIT warm-up duration per case before timed iterations.
+   * @param minTime minimum total timed duration per case when the 
corresponding
+   *        `wholestageOffNumIters` or `wholestageOnNumIters` is zero.
+   * @param wholestageOffNumIters if non-zero, run exactly this many timed 
iterations
+   *        for the wholestage-off case; otherwise use `minNumIters` and 
`minTime`.
+   * @param wholestageOnNumIters if non-zero, run exactly this many timed 
iterations
+   *        for the wholestage-on case; otherwise use `minNumIters` and 
`minTime`.
+   */
+  final def codegenBenchmark(
+      name: String,
+      cardinality: Long,
+      minNumIters: Int = 2,
+      warmupTime: FiniteDuration = 2.seconds,
+      minTime: FiniteDuration = 2.seconds,
+      wholestageOffNumIters: Int = 2,
+      wholestageOnNumIters: Int = 5)(f: => Unit): Unit = {
+    val benchmark = new Benchmark(
+      name,
+      cardinality,
+      minNumIters = minNumIters,
+      warmupTime = warmupTime,
+      minTime = minTime,
+      output = output)
 
-    benchmark.addCase(s"$name wholestage off", numIters = 2) { _ =>
+    benchmark.addCase(s"$name wholestage off", numIters = 
wholestageOffNumIters) { _ =>
       withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
         f
       }
     }
 
-    benchmark.addCase(s"$name wholestage on", numIters = 5) { _ =>
+    benchmark.addCase(s"$name wholestage on", numIters = wholestageOnNumIters) 
{ _ =>
       withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
         f
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to