wangyum commented on code in PR #42315: URL: https://github.com/apache/spark/pull/42315#discussion_r1283918802
########## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ########## @@ -2272,9 +2316,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def union(other: Dataset[T]): Dataset[T] = withSetOperator { - // This breaks caching, but it's usually ok because it addresses a very specific use case: - // using union to union many files or partitions. - CombineUnions(Union(logicalPlan, other.logicalPlan)) Review Comment: Could we add a flag to `CombineUnions`? ```patch Index: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (revision d9d5325eed228c61d0825160477d0defaabee710) +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (date 1691116492482) @@ -91,7 +91,7 @@ CollapseWindow, EliminateOffsets, EliminateLimits, - CombineUnions, + CombineUnions(), // Constant folding and strength reduction OptimizeRepartition, TransposeWindow, @@ -159,7 +159,7 @@ InlineCTE()) :: Batch("Union", Once, RemoveNoopOperators, - CombineUnions, + CombineUnions(), RemoveNoopUnion) :: // Run this once earlier. This might simplify the plan and reduce cost of optimizer. // For example, a query such as Filter(LocalRelation) would go through all the heavy @@ -1451,7 +1451,7 @@ /** * Combines all adjacent [[Union]] operators into a single [[Union]]. */ -object CombineUnions extends Rule[LogicalPlan] { +case class CombineUnions(combineWithProject: Boolean = true) extends Rule[LogicalPlan] { import CollapseProject.{buildCleanedProjectList, canCollapseExpressions} import PushProjectionThroughUnion.pushProjectionThroughUnion @@ -1480,7 +1480,8 @@ while (stack.nonEmpty) { stack.pop() match { case p1 @ Project(_, p2: Project) - if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline = false) && + if combineWithProject && + canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline = false) && !p1.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) && !p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) => val newProjectList = buildCleanedProjectList(p1.projectList, p2.projectList) @@ -1499,15 +1500,17 @@ // Push down projection through Union and then push pushed plan to Stack if // there is a Project. case Project(projectList, Distinct(u @ Union(children, byName, allowMissingCol))) - if projectList.forall(_.deterministic) && children.nonEmpty && + if combineWithProject && projectList.forall(_.deterministic) && children.nonEmpty && flattenDistinct && byName == topByName && allowMissingCol == topAllowMissingCol => stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse) case Project(projectList, Deduplicate(keys: Seq[Attribute], u: Union)) - if projectList.forall(_.deterministic) && flattenDistinct && u.byName == topByName && + if combineWithProject && + projectList.forall(_.deterministic) && flattenDistinct && u.byName == topByName && u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) == u.outputSet => stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse) case Project(projectList, u @ Union(children, byName, allowMissingCol)) - if projectList.forall(_.deterministic) && children.nonEmpty && + if combineWithProject && + projectList.forall(_.deterministic) && children.nonEmpty && byName == topByName && allowMissingCol == topAllowMissingCol => stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse) case child => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org