peter-toth commented on code in PR #51685:
URL: https://github.com/apache/spark/pull/51685#discussion_r2251474112
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala:
##########
@@ -85,8 +84,9 @@ case class PlanDynamicPruningFilters(sparkSession:
SparkSession) extends Rule[Sp
val aliases = broadcastKeyIndices.map(idx =>
Alias(buildKeys(idx), buildKeys(idx).toString)())
val aggregate = Aggregate(aliases, aliases, buildPlan)
- DynamicPruningExpression(expressions.InSubquery(
Review Comment:
If we use `InSubquery` here then `PlanSubqueries` rule will transform it to
a general `InSubqueryExec(..., isDynamicPruning = false)`, which causes the
collected result of the subquery plan to be broadcasted in case the
`InSubqueryExec` expression is evaluated on an executor.
But when it comes to DPP (`InSubqueryExec` is nested into a
`DynamicPruningExpression`) then we know that the expression is always
evaluated on the driver only, so we don't need to [broadcast the subquery
result](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L140-L142).
When AQE is on `PlanAdaptiveSubqueries` transforms `DynamicPruningSubquery`s
into `DynamicPruningExpression(InSubqueryExec(...))` immediately, so this issue
doesn't affect AQE.
Historically `InSubqueryExec` was added to the code base with the initial
DPP implementation and broadcasting the result was initially there:
https://github.com/apache/spark/pull/25600/files#diff-9b62cef6bfdeb6c802bb120c7a724a974d5067a69585285bebb64c48603f8d6fR138
(please note at that point `InSubqueryExec` was used with/required for DPP
only) and then https://github.com/apache/spark/pull/34051 introduced skipping
the broadcast in case of DPP.
But then
https://github.com/apache/spark/pull/40569/files#diff-9b62cef6bfdeb6c802bb120c7a724a974d5067a69585285bebb64c48603f8d6fR197
added broadcasting back to all `InSubquery`s regardless if they are used for
DPP or not (so at this later point `InSubqueryExec` was used without DPP too).
And now finally this PR fixes the DPP case.
Sidenote 1: The added testcase in
https://github.com/apache/spark/pull/40569/files#diff-adc3f627771dab40517626755090a97ce8d6ab3cec413b8fb88627e7049414e0R2699
doesn't fail now if I remove the broadcasting logic entirelly, which means
that Spark became smarter and is uses `InSubqueryExec` in less cases without
DPP. Maybe never, but for safety reasons I don't want to remove broadcasting
logic entirelly.
Sidenote 2: `RowLevelOperationRuntimeGroupFiltering` also adds adds a
`DynamicPruningExpression(InSubquery(...))`, which also does an unnecessary
broadcast, but I will try to fix it in a follow-up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]