peter-toth commented on code in PR #55628:
URL: https://github.com/apache/spark/pull/55628#discussion_r3173341865
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -6608,6 +6608,21 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED =
+ buildConf(
+
"spark.sql.optimizer.mergeSubplans.filterPropagation.filterPropagationThroughJoin.enabled")
Review Comment:
Good idea, renamed in
https://github.com/apache/spark/pull/55628/commits/4d87515c6de9583e4f6999094cb91a6d4dd70312.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala:
##########
@@ -416,18 +425,37 @@ class PlanMerger(
}
case (np: Join, cp: Join) if np.joinType == cp.joinType && np.hint ==
cp.hint =>
- // Filter propagation across joins is not yet supported.
- tryMergePlans(np.left, cp.left, false).flatMap {
- case TryMergeResult(mergedLeft, leftNPMapping, None, None) =>
- tryMergePlans(np.right, cp.right, false).flatMap {
- case TryMergeResult(mergedRight, rightNPMapping, None, None) =>
+ tryMergePlans(np.left, cp.left, filterPropagationSupported).flatMap {
+ case TryMergeResult(mergedLeft, leftNPMapping, leftNPFilter,
leftCPFilter) =>
+ tryMergePlans(np.right, cp.right,
filterPropagationSupported).flatMap {
+ case TryMergeResult(mergedRight, rightNPMapping,
rightNPFilter, rightCPFilter)
+ // If both children independently propagate filter
attributes we would need to
+ // AND them into a new alias above the join, which is not
yet supported.
+ if !(leftNPFilter.isDefined && rightNPFilter.isDefined) &&
+ !(leftCPFilter.isDefined && rightCPFilter.isDefined) &&
+ // Gate join-crossing filter propagation behind its own
config flag.
+ // When no filter attributes are in play the merge is
unconditionally safe.
+ (leftNPFilter.isEmpty && leftCPFilter.isEmpty &&
+ rightNPFilter.isEmpty && rightCPFilter.isEmpty ||
+ filterPropagationThroughJoinEnabled) &&
+ // A filter attribute is only safe to propagate through
a join if it comes
+ // from the "preserved" (non-nullable) side. On the
nullable side, unmatched
+ // rows are NULL-padded so f=NULL, causing FILTER
(WHERE f) to incorrectly
+ // exclude rows that should contribute to the
aggregate. Right-side
+ // attributes are also absent from semi/anti join
output.
+ (leftNPFilter.isEmpty && leftCPFilter.isEmpty ||
Review Comment:
Fixed in
https://github.com/apache/spark/pull/55628/commits/4d87515c6de9583e4f6999094cb91a6d4dd70312.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]