peter-toth commented on code in PR #55482:
URL: https://github.com/apache/spark/pull/55482#discussion_r3132395324
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala:
##########
@@ -244,57 +238,52 @@ class PlanMerger(
(newPlan, cachedPlan) match {
case (np: Project, cp: Project) =>
tryMergePlans(np.child, cp.child, filterPropagationSupported).map {
- case TryMergeResult(mergedChild, npMapping, cpMapping, npFilter,
cpFilter) =>
- val (mergedProjectList, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.projectList, cp.projectList,
npMapping, cpMapping,
- npFilter, cpFilter)
- TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, newCPMapping,
- npFilter, cpFilter)
+ case TryMergeResult(mergedChild, npMapping, npFilter, cpFilter) =>
+ val (mergedProjectList, newNPMapping) =
+ mergeNamedExpressions(np.projectList, cp.projectList,
npMapping, npFilter, cpFilter)
+ TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, npFilter,
+ cpFilter)
}
case (np, cp: Project) =>
tryMergePlans(np, cp.child, filterPropagationSupported).map {
- case TryMergeResult(mergedChild, npMapping, cpMapping, npFilter,
cpFilter) =>
- val (mergedProjectList, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.output, cp.projectList, npMapping,
cpMapping, npFilter,
- cpFilter)
- TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, newCPMapping,
- npFilter, cpFilter)
+ case TryMergeResult(mergedChild, npMapping, npFilter, cpFilter) =>
+ val (mergedProjectList, newNPMapping) =
+ mergeNamedExpressions(np.output, cp.projectList, npMapping,
npFilter, cpFilter)
+ TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, npFilter,
+ cpFilter)
}
case (np: Project, cp) =>
tryMergePlans(np.child, cp, filterPropagationSupported).map {
- case TryMergeResult(mergedChild, npMapping, cpMapping, npFilter,
cpFilter) =>
- val (mergedProjectList, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.projectList, cp.output, npMapping,
cpMapping, npFilter,
- cpFilter)
- TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, newCPMapping,
- npFilter, cpFilter)
+ case TryMergeResult(mergedChild, npMapping, npFilter, cpFilter) =>
+ val (mergedProjectList, newNPMapping) =
+ mergeNamedExpressions(np.projectList, cp.output, npMapping,
npFilter, cpFilter)
+ TryMergeResult(Project(mergedProjectList, mergedChild),
newNPMapping, npFilter,
+ cpFilter)
}
case (np: Aggregate, cp: Aggregate) if supportedAggregateMerge(np, cp)
=>
// Filter propagation into the aggregate is only safe when there is
no grouping.
val childFilterPropagationSupported = filterPropagationEnabled &&
np.groupingExpressions.isEmpty && cp.groupingExpressions.isEmpty
tryMergePlans(np.child, cp.child,
childFilterPropagationSupported).flatMap {
- case TryMergeResult(mergedChild, npMapping, cpMapping, None, None)
=>
+ case TryMergeResult(mergedChild, npMapping, None, None) =>
val mappedNPGroupingExpression =
np.groupingExpressions.map(mapAttributes(_, npMapping))
- val mappedCPGroupingExpression =
- cp.groupingExpressions.map(mapAttributes(_, cpMapping))
+ val mappedCPGroupingExpression = cp.groupingExpressions
// Order of grouping expression does matter as merging different
grouping orders can
// introduce "extra" shuffles/sorts that might not present in
all of the original
// subqueries.
if (mappedNPGroupingExpression.map(_.canonicalized) ==
mappedCPGroupingExpression.map(_.canonicalized)) {
- val (mergedAggregateExpressions, newNPMapping, newCPMapping) =
- mergeNamedExpressions(np.aggregateExpressions,
cp.aggregateExpressions, npMapping,
- cpMapping)
+ val (mergedAggregateExpressions, newNPMapping) =
+ mergeNamedExpressions(np.aggregateExpressions,
cp.aggregateExpressions, npMapping)
val mergedPlan =
Aggregate(mappedCPGroupingExpression,
mergedAggregateExpressions, mergedChild)
Review Comment:
Thanks, fixed in
https://github.com/apache/spark/pull/55482/commits/79184550e43383a8507206249bca5738c5103b8c.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala:
##########
@@ -523,35 +500,27 @@ class PlanMerger(
ne.toAttribute -> resultAttr
})
- // Wrap unmatched cached expressions with the cached plan's filter so they
are only computed
- // for rows that belong to the cached plan side. Plain attribute
references are not wrapped.
- // Record each attr rewrite in the cached plan map so ancestor nodes can
remap their stale
- // references.
- val newCPMapping = AttributeMap(cachedPlanFilter.toSeq.flatMap { f =>
- mergedExpressions.zipWithIndex.flatMap {
- case (ce, i) if !matchedCachedIndices.contains(i) =>
- val withoutAlias = ce match {
- case Alias(child, _) => child
- case e => e
- }
- // Plain attribute references are not wrapped: no remapping entry
needed.
- Option.when(!withoutAlias.isInstanceOf[Attribute]) {
- val newAlias =
- Alias(If(f, withoutAlias, Literal(null, withoutAlias.dataType)),
ce.name)()
- mergedExpressions(i) = newAlias
- ce.toAttribute -> newAlias.toAttribute
- }
- case _ => None
+ // Wrap unmatched cached expressions with the cached plan's filter so they
are only computed for
+ // rows that belong to the cached plan side. Plain attribute references
are not wrapped.
+ cachedPlanFilter.foreach { f =>
+ for (i <- 0 until cachedPlanExpressions.size if
!matchedCachedIndices.contains(i)) {
+ mergedExpressions(i) match {
+ case ce @ Alias(child, _) if !child.isInstanceOf[Attribute] =>
+ mergedExpressions(i) =
+ Alias(If(f, child, Literal(null, child.dataType)), ce.name)(
+ exprId = ce.toAttribute.exprId)
Review Comment:
Added in
https://github.com/apache/spark/pull/55482/commits/79184550e43383a8507206249bca5738c5103b8c.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]