peter-toth commented on code in PR #51685:
URL: https://github.com/apache/spark/pull/51685#discussion_r2251474112


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala:
##########
@@ -85,8 +84,9 @@ case class PlanDynamicPruningFilters(sparkSession: 
SparkSession) extends Rule[Sp
           val aliases = broadcastKeyIndices.map(idx =>
             Alias(buildKeys(idx), buildKeys(idx).toString)())
           val aggregate = Aggregate(aliases, aliases, buildPlan)
-          DynamicPruningExpression(expressions.InSubquery(

Review Comment:
   If we use `InSubquery` here then `PlanSubqueries` rule will transform it to 
a general `InSubqueryExec(..., isDynamicPruning = false)`, which causes the 
collected result of the subquery plan to be broadcasted in case the 
`InSubqueryExec` expression is evaluated on an executor.
   But when it comes to DPP (`InSubqueryExec` is nested into a 
`DynamicPruningExpression`) then we know that the expression is always 
evaluated on the driver only, so we don't need to [broadcast the subquery 
result](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L140-L142).
   
   When AQE is on `PlanAdaptiveSubqueries` transforms `DynamicPruningSubquery`s 
into `DynamicPruningExpression(InSubqueryExec(...))` immediately, so this issue 
doesn't affect AQE.
   
   Historically `InSubqueryExec` was added to the code base with the initial 
DPP implementation and broadcasting the result was initially there: 
https://github.com/apache/spark/pull/25600/files#diff-9b62cef6bfdeb6c802bb120c7a724a974d5067a69585285bebb64c48603f8d6fR138
 (please note at that point `InSubqueryExec` was used with/required for DPP 
only) and then https://github.com/apache/spark/pull/34051 introduced skipping 
the broadcast in case of DPP.
   But then 
https://github.com/apache/spark/pull/40569/files#diff-9b62cef6bfdeb6c802bb120c7a724a974d5067a69585285bebb64c48603f8d6fR197
 added broadcasting back to all `InSubquery`s regardless if they are used for 
DPP or not (so at this later point `InSubqueryExec` was used without DPP too).
   And now finally this PR fixes the DPP case.
   
   Sidenote 1: The added testcase in SPARK-42937 doesn't fail now if I remove 
the broadcasting logic entirelly, which means that Spark became smarter and is 
uses `InSubqueryExec` in less cases without DPP. Maybe never, but for safety 
reasons I don't want to remove broadcasting logic entirelly.
   
   Sidenote 2: `RowLevelOperationRuntimeGroupFiltering` also adds adds a 
`DynamicPruningExpression(InSubquery(...))`, which also does an unnecessary 
broadcast, but I will try to fix it in a follow-up PR.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to