This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e63b6111efe [SPARK-54972][SQL] Improve NOT IN subqueries with 
non-nullable columns
8e63b6111efe is described below

commit 8e63b6111efe4b63a3a97b8d10efb6c6574a6277
Author: Peter Toth <[email protected]>
AuthorDate: Fri Jan 9 13:53:31 2026 +0100

    [SPARK-54972][SQL] Improve NOT IN subqueries with non-nullable columns
    
    ### What changes were proposed in this pull request?
    
    Run `NullPropagation` after NOT IN subquery rewrite.
    
    ### Why are the changes needed?
    
    NOT IN subqueries like `SELECT * FROM t1 WHERE c NOT IN (SELECT c FROM t2)` 
are rewritten as left anti join `t1.c = t2.c` with additional `OR IsNull(t1.c = 
t2.c)` conditions which prevents equi join implementations to be used so those 
joins end up as `BroadcastNestedLoopJoin`. When we know the columns can't be 
null, we can either drop those additional conditions during subquery rewrite or 
call `NullPropagation` after the rewrite to simplify them to `false`. This PR 
contains the latter.
    
    Please note that https://github.com/apache/spark/pull/29104 already 
optmized the single column NOT IN subqueries from `BroadcastNestedLoopJoin` to 
"null aware" `BroadcastHashJoin` very well, but when the columns are not 
nullable we can optimize multi column cases as well and the join don't need to 
be "null aware".
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, performance improvement.
    
    ### How was this patch tested?
    
    A new UTs was added and some exsisting tests were adjusted to keep their 
validity.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53733 from peter-toth/SPARK-54972-improve-not-in-with-non-nullables.
    
    Authored-by: Peter Toth <[email protected]>
    Signed-off-by: Peter Toth <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala    |  1 +
 .../test/scala/org/apache/spark/sql/JoinSuite.scala | 21 +++++++++++++++++++--
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a2dced57c715..fe15819bd44a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -264,6 +264,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
       CheckCartesianProducts),
     Batch("RewriteSubquery", Once,
       RewritePredicateSubquery,
+      NullPropagation,
       PushPredicateThroughJoin,
       LimitPushDown,
       ColumnPruning,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 885512d4d198..dd3bd0a9e96d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1236,13 +1236,13 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
       // positive not in subquery case
       var joinExec = assertJoin((
-        "select * from testData where key not in (select a from testData2)",
+        "select * from testData where key not in (select b from testData3)",
         classOf[BroadcastHashJoinExec]))
       assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
 
       // negative not in subquery case since multi-column is not supported
       assertJoin((
-        "select * from testData where (key, key + 1) not in (select * from 
testData2)",
+        "select * from testData where (key, key + 1) not in (select b, b + 1 
from testData3)",
         classOf[BroadcastNestedLoopJoinExec]))
 
       // positive hand-written left anti join
@@ -1271,6 +1271,23 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
     }
   }
 
+  test("SPARK-54972: Improve not in subqueries with non-nullable columns") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> 
Long.MaxValue.toString) {
+      // testData.key nullable false
+      // testData2.* nullable false
+
+      val joinExec = assertJoin((
+        "select * from testData where key not in (select a from testData2)",
+        classOf[BroadcastHashJoinExec]))
+      assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
+
+      val joinExec2 = assertJoin((
+        "select * from testData where (key, key + 1) not in (select * from 
testData2)",
+        classOf[BroadcastHashJoinExec]))
+      
assert(!joinExec2.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
+    }
+  }
+
   test("SPARK-32399: Full outer shuffled hash join") {
     val inputDFs = Seq(
       // Test unique join key


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to