cloud-fan commented on code in PR #55482:
URL: https://github.com/apache/spark/pull/55482#discussion_r3132260608
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala:
##########
@@ -244,57 +238,52 @@ class PlanMerger(
(newPlan, cachedPlan) match {
case (np: Project, cp: Project) =>
tryMergePlans(np.child, cp.child, filterPropagationSupported).map {
- case TryMergeResult(mergedChild, npMapping, cpMapping, npFilter,
cpFilter) =>
- val (mergedProjectList, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.projectList, cp.projectList,
npMapping, cpMapping,
- npFilter, cpFilter)
- TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, newCPMapping,
- npFilter, cpFilter)
+ case TryMergeResult(mergedChild, npMapping, npFilter, cpFilter) =>
+ val (mergedProjectList, newNPMapping) =
+ mergeNamedExpressions(np.projectList, cp.projectList,
npMapping, npFilter, cpFilter)
+ TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, npFilter,
+ cpFilter)
}
case (np, cp: Project) =>
tryMergePlans(np, cp.child, filterPropagationSupported).map {
- case TryMergeResult(mergedChild, npMapping, cpMapping, npFilter,
cpFilter) =>
- val (mergedProjectList, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.output, cp.projectList, npMapping,
cpMapping, npFilter,
- cpFilter)
- TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, newCPMapping,
- npFilter, cpFilter)
+ case TryMergeResult(mergedChild, npMapping, npFilter, cpFilter) =>
+ val (mergedProjectList, newNPMapping) =
+ mergeNamedExpressions(np.output, cp.projectList, npMapping,
npFilter, cpFilter)
+ TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, npFilter,
+ cpFilter)
}
case (np: Project, cp) =>
tryMergePlans(np.child, cp, filterPropagationSupported).map {
- case TryMergeResult(mergedChild, npMapping, cpMapping, npFilter,
cpFilter) =>
- val (mergedProjectList, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.projectList, cp.output, npMapping,
cpMapping, npFilter,
- cpFilter)
- TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, newCPMapping,
- npFilter, cpFilter)
+ case TryMergeResult(mergedChild, npMapping, npFilter, cpFilter) =>
+ val (mergedProjectList, newNPMapping) =
+ mergeNamedExpressions(np.projectList, cp.output, npMapping,
npFilter, cpFilter)
+ TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, npFilter,
+ cpFilter)
}
case (np: Aggregate, cp: Aggregate) if supportedAggregateMerge(np, cp)
=>
// Filter propagation into the aggregate is only safe when there is
no grouping.
val childFilterPropagationSupported = filterPropagationEnabled &&
np.groupingExpressions.isEmpty && cp.groupingExpressions.isEmpty
tryMergePlans(np.child, cp.child,
childFilterPropagationSupported).flatMap {
- case TryMergeResult(mergedChild, npMapping, cpMapping, None, None)
=>
+ case TryMergeResult(mergedChild, npMapping, None, None) =>
val mappedNPGroupingExpression =
np.groupingExpressions.map(mapAttributes(_, npMapping))
- val mappedCPGroupingExpression =
- cp.groupingExpressions.map(mapAttributes(_, cpMapping))
+ val mappedCPGroupingExpression = cp.groupingExpressions
// Order of grouping expression does matter as merging different
grouping orders can
// introduce "extra" shuffles/sorts that might not present in
all of the original
// subqueries.
if (mappedNPGroupingExpression.map(_.canonicalized) ==
mappedCPGroupingExpression.map(_.canonicalized)) {
- val (mergedAggregateExpressions, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.aggregateExpressions,
cp.aggregateExpressions, npMapping,
- cpMapping)
+ val (mergedAggregateExpressions, newNPMapping) =
+ mergeNamedExpressions(np.aggregateExpressions,
cp.aggregateExpressions, npMapping)
val mergedPlan =
Aggregate(mappedCPGroupingExpression,
mergedAggregateExpressions, mergedChild)
Review Comment:
After dropping `cpMapping`, the `mapped` prefix no longer reflects what this
local holds — it's just `cp.groupingExpressions`. Same for the reference used
to build the merged `Aggregate`.
```suggestion
val cpGroupingExpressions = cp.groupingExpressions
// Order of grouping expression does matter as merging
different grouping orders can
// introduce "extra" shuffles/sorts that might not present in
all of the original
// subqueries.
if (mappedNPGroupingExpression.map(_.canonicalized) ==
cpGroupingExpressions.map(_.canonicalized)) {
val (mergedAggregateExpressions, newNPMapping) =
mergeNamedExpressions(np.aggregateExpressions,
cp.aggregateExpressions, npMapping)
val mergedPlan =
Aggregate(cpGroupingExpressions,
mergedAggregateExpressions, mergedChild)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]