This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8e63b6111efe [SPARK-54972][SQL] Improve NOT IN subqueries with
non-nullable columns
8e63b6111efe is described below
commit 8e63b6111efe4b63a3a97b8d10efb6c6574a6277
Author: Peter Toth <[email protected]>
AuthorDate: Fri Jan 9 13:53:31 2026 +0100
[SPARK-54972][SQL] Improve NOT IN subqueries with non-nullable columns
### What changes were proposed in this pull request?
Run `NullPropagation` after NOT IN subquery rewrite.
### Why are the changes needed?
NOT IN subqueries like `SELECT * FROM t1 WHERE c NOT IN (SELECT c FROM t2)`
are rewritten as left anti join `t1.c = t2.c` with additional `OR IsNull(t1.c =
t2.c)` conditions which prevents equi join implementations to be used so those
joins end up as `BroadcastNestedLoopJoin`. When we know the columns can't be
null, we can either drop those additional conditions during subquery rewrite or
call `NullPropagation` after the rewrite to simplify them to `false`. This PR
contains the latter.
Please note that https://github.com/apache/spark/pull/29104 already
optmized the single column NOT IN subqueries from `BroadcastNestedLoopJoin` to
"null aware" `BroadcastHashJoin` very well, but when the columns are not
nullable we can optimize multi column cases as well and the join don't need to
be "null aware".
### Does this PR introduce _any_ user-facing change?
Yes, performance improvement.
### How was this patch tested?
A new UTs was added and some exsisting tests were adjusted to keep their
validity.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53733 from peter-toth/SPARK-54972-improve-not-in-with-non-nullables.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 1 +
.../test/scala/org/apache/spark/sql/JoinSuite.scala | 21 +++++++++++++++++++--
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a2dced57c715..fe15819bd44a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -264,6 +264,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
CheckCartesianProducts),
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
+ NullPropagation,
PushPredicateThroughJoin,
LimitPushDown,
ColumnPruning,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 885512d4d198..dd3bd0a9e96d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1236,13 +1236,13 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
// positive not in subquery case
var joinExec = assertJoin((
- "select * from testData where key not in (select a from testData2)",
+ "select * from testData where key not in (select b from testData3)",
classOf[BroadcastHashJoinExec]))
assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
// negative not in subquery case since multi-column is not supported
assertJoin((
- "select * from testData where (key, key + 1) not in (select * from
testData2)",
+ "select * from testData where (key, key + 1) not in (select b, b + 1
from testData3)",
classOf[BroadcastNestedLoopJoinExec]))
// positive hand-written left anti join
@@ -1271,6 +1271,23 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
}
+ test("SPARK-54972: Improve not in subqueries with non-nullable columns") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key ->
Long.MaxValue.toString) {
+ // testData.key nullable false
+ // testData2.* nullable false
+
+ val joinExec = assertJoin((
+ "select * from testData where key not in (select a from testData2)",
+ classOf[BroadcastHashJoinExec]))
+ assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
+
+ val joinExec2 = assertJoin((
+ "select * from testData where (key, key + 1) not in (select * from
testData2)",
+ classOf[BroadcastHashJoinExec]))
+
assert(!joinExec2.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
+ }
+ }
+
test("SPARK-32399: Full outer shuffled hash join") {
val inputDFs = Seq(
// Test unique join key
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]