This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3757091e1c51 [SPARK-54319][SQL] BHJ LeftAnti update numOutputRows
wrong when codegen is disabled
3757091e1c51 is described below
commit 3757091e1c513eb2390dc2b939bcf9e1fb70ef53
Author: Angerszhuuuu <[email protected]>
AuthorDate: Mon Nov 17 11:39:11 2025 +0800
[SPARK-54319][SQL] BHJ LeftAnti update numOutputRows wrong when codegen is
disabled
### What changes were proposed in this pull request?
BHJ LeftAnti update numOutputRows missing case for hashed =
EmptyHashedRelation
<img width="1754" height="1148" alt="image"
src="https://github.com/user-attachments/assets/a71e4546-578e-4e4d-9434-9287074ebe75"
/>
### Why are the changes needed?
Fix missing sql metrics for BHJ
### Does this PR introduce _any_ user-facing change?
Yes, BHJ LeftAnti will update numOutputRows when hashed =
EmptyHashedRelation
### How was this patch tested?
Existed UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53014 from AngersZhuuuu/SPARK-54319.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/joins/BroadcastHashJoinExec.scala | 5 ++++-
.../sql/execution/metric/SQLMetricsSuite.scala | 23 ++++++++++++++++------
2 files changed, 21 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 368534d05b1f..b62d8f0798b6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -129,7 +129,10 @@ case class BroadcastHashJoinExec(
val hashed = broadcastRelation.value.asReadOnlyCopy()
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
if (hashed == EmptyHashedRelation) {
- streamedIter
+ streamedIter.map { row =>
+ numOutputRows += 1
+ row
+ }
} else if (hashed == HashedRelationWithAllNullKeys) {
Iterator.empty
} else {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4e50457ae47d..402365a59ece 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -915,16 +915,27 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
withTable("t1", "t2") {
spark.range(4).write.saveAsTable("t1")
spark.range(2).write.saveAsTable("t2")
- val df = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM
t2)")
- df.collect()
- val plan = df.queryExecution.executedPlan
+ val df1 = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM
t2)")
+ df1.collect()
+ val plan1 = df1.queryExecution.executedPlan
- val joins = plan.collect {
+ val joins1 = plan1.collect {
case s: BroadcastHashJoinExec => s
}
- assert(joins.size === 1)
- testMetricsInSparkPlanOperator(joins.head, Map("numOutputRows" ->
2))
+ assert(joins1.size === 1)
+ testMetricsInSparkPlanOperator(joins1.head, Map("numOutputRows" ->
2))
+
+ val df2 = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM t2
WHERE 1 = 2)")
+ df2.collect()
+ val plan2 = df2.queryExecution.executedPlan
+
+ val joins2 = plan2.collect {
+ case s: BroadcastHashJoinExec => s
+ }
+
+ assert(joins2.size === 1)
+ testMetricsInSparkPlanOperator(joins2.head, Map("numOutputRows" ->
4))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]