This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3757091e1c51 [SPARK-54319][SQL] BHJ LeftAnti update numOutputRows 
wrong when codegen is disabled
3757091e1c51 is described below

commit 3757091e1c513eb2390dc2b939bcf9e1fb70ef53
Author: Angerszhuuuu <[email protected]>
AuthorDate: Mon Nov 17 11:39:11 2025 +0800

    [SPARK-54319][SQL] BHJ LeftAnti update numOutputRows wrong when codegen is 
disabled
    
    ### What changes were proposed in this pull request?
    
    BHJ LeftAnti update numOutputRows missing case for hashed = 
EmptyHashedRelation
    
    <img width="1754" height="1148" alt="image" 
src="https://github.com/user-attachments/assets/a71e4546-578e-4e4d-9434-9287074ebe75";
 />
    
    ### Why are the changes needed?
    Fix missing sql metrics for BHJ
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, BHJ LeftAnti will update numOutputRows when hashed = 
EmptyHashedRelation
    
    ### How was this patch tested?
    Existed UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53014 from AngersZhuuuu/SPARK-54319.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/joins/BroadcastHashJoinExec.scala    |  5 ++++-
 .../sql/execution/metric/SQLMetricsSuite.scala     | 23 ++++++++++++++++------
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 368534d05b1f..b62d8f0798b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -129,7 +129,10 @@ case class BroadcastHashJoinExec(
         val hashed = broadcastRelation.value.asReadOnlyCopy()
         
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
         if (hashed == EmptyHashedRelation) {
-          streamedIter
+          streamedIter.map { row =>
+            numOutputRows += 1
+            row
+          }
         } else if (hashed == HashedRelationWithAllNullKeys) {
           Iterator.empty
         } else {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4e50457ae47d..402365a59ece 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -915,16 +915,27 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
           withTable("t1", "t2") {
             spark.range(4).write.saveAsTable("t1")
             spark.range(2).write.saveAsTable("t2")
-            val df = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM 
t2)")
-            df.collect()
-            val plan = df.queryExecution.executedPlan
+            val df1 = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM 
t2)")
+            df1.collect()
+            val plan1 = df1.queryExecution.executedPlan
 
-            val joins = plan.collect {
+            val joins1 = plan1.collect {
               case s: BroadcastHashJoinExec => s
             }
 
-            assert(joins.size === 1)
-            testMetricsInSparkPlanOperator(joins.head, Map("numOutputRows" -> 
2))
+            assert(joins1.size === 1)
+            testMetricsInSparkPlanOperator(joins1.head, Map("numOutputRows" -> 
2))
+
+            val df2 = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM t2 
WHERE 1 = 2)")
+            df2.collect()
+            val plan2 = df2.queryExecution.executedPlan
+
+            val joins2 = plan2.collect {
+              case s: BroadcastHashJoinExec => s
+            }
+
+            assert(joins2.size === 1)
+            testMetricsInSparkPlanOperator(joins2.head, Map("numOutputRows" -> 
4))
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to