wangyum commented on code in PR #42315:
URL: https://github.com/apache/spark/pull/42315#discussion_r1283918802


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2272,9 +2316,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def union(other: Dataset[T]): Dataset[T] = withSetOperator {
-    // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
-    // using union to union many files or partitions.
-    CombineUnions(Union(logicalPlan, other.logicalPlan))

Review Comment:
   Could we add a flag to `CombineUnions`?
   ```patch
   Index: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
   --- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
    (revision d9d5325eed228c61d0825160477d0defaabee710)
   +++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
    (date 1691116492482)
   @@ -91,7 +91,7 @@
            CollapseWindow,
            EliminateOffsets,
            EliminateLimits,
   -        CombineUnions,
   +        CombineUnions(),
            // Constant folding and strength reduction
            OptimizeRepartition,
            TransposeWindow,
   @@ -159,7 +159,7 @@
          InlineCTE()) ::
        Batch("Union", Once,
          RemoveNoopOperators,
   -      CombineUnions,
   +      CombineUnions(),
          RemoveNoopUnion) ::
        // Run this once earlier. This might simplify the plan and reduce cost 
of optimizer.
        // For example, a query such as Filter(LocalRelation) would go through 
all the heavy
   @@ -1451,7 +1451,7 @@
    /**
     * Combines all adjacent [[Union]] operators into a single [[Union]].
     */
   -object CombineUnions extends Rule[LogicalPlan] {
   +case class CombineUnions(combineWithProject: Boolean = true) extends 
Rule[LogicalPlan] {
      import CollapseProject.{buildCleanedProjectList, canCollapseExpressions}
      import PushProjectionThroughUnion.pushProjectionThroughUnion
    
   @@ -1480,7 +1480,8 @@
        while (stack.nonEmpty) {
          stack.pop() match {
            case p1 @ Project(_, p2: Project)
   -            if canCollapseExpressions(p1.projectList, p2.projectList, 
alwaysInline = false) &&
   +            if combineWithProject &&
   +              canCollapseExpressions(p1.projectList, p2.projectList, 
alwaysInline = false) &&
                  
!p1.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) &&
                  
!p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) =>
              val newProjectList = buildCleanedProjectList(p1.projectList, 
p2.projectList)
   @@ -1499,15 +1500,17 @@
            // Push down projection through Union and then push pushed plan to 
Stack if
            // there is a Project.
            case Project(projectList, Distinct(u @ Union(children, byName, 
allowMissingCol)))
   -            if projectList.forall(_.deterministic) && children.nonEmpty &&
   +            if combineWithProject && projectList.forall(_.deterministic) && 
children.nonEmpty &&
                  flattenDistinct && byName == topByName && allowMissingCol == 
topAllowMissingCol =>
              stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
            case Project(projectList, Deduplicate(keys: Seq[Attribute], u: 
Union))
   -            if projectList.forall(_.deterministic) && flattenDistinct && 
u.byName == topByName &&
   +            if combineWithProject &&
   +              projectList.forall(_.deterministic) && flattenDistinct && 
u.byName == topByName &&
                  u.allowMissingCol == topAllowMissingCol && AttributeSet(keys) 
== u.outputSet =>
              stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
            case Project(projectList, u @ Union(children, byName, 
allowMissingCol))
   -            if projectList.forall(_.deterministic) && children.nonEmpty &&
   +            if combineWithProject &&
   +              projectList.forall(_.deterministic) && children.nonEmpty &&
                  byName == topByName && allowMissingCol == topAllowMissingCol 
=>
              stack.pushAll(pushProjectionThroughUnion(projectList, u).reverse)
            case child =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to