cloud-fan commented on a change in pull request #32602: URL: https://github.com/apache/spark/pull/32602#discussion_r637769095
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala ########## @@ -20,65 +20,73 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} /** - * Collapse plans consisting empty local relations generated by [[PruneFilters]]. - * 1. Binary(or Higher)-node Logical Plans - * - Union with all empty children. - * - Join with one or two empty children (including Intersect/Except). - * 2. Unary-node Logical Plans - * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. - * - Join with false condition. - * - Aggregate with all empty children and at least one grouping expression. - * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. + * The rule used by both normal Optimizer and AQE Optimizer for: + * 1. Binary-node Logical Plans + * - Join with one or two empty children (including Intersect/Except). + * - Join is single column NULL-aware anti join (NAAJ) + * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an + * empty [[LocalRelation]]. + * - Left semi Join + * Right side is non-empty and condition is empty. Eliminate join to its left side. + * - Left anti join + * Right side is non-empty and condition is empty. Eliminate join to an empty + * [[LocalRelation]]. + * 2. Unary-node Logical Plans + * - Limit/Repartition with all empty children. + * - Aggregate with all empty children and at least one grouping expression. + * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. */ -object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport { - private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { +abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { + /** + * At AQE side, we use this function to check if a plan has output rows or not + */ + protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None + + /** + * At AQE side, we use the broadcast query stage to do the check + */ + protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None + + protected def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { case p: LocalRelation => p.data.isEmpty case _ => false } - private def empty(plan: LogicalPlan) = + private def isEmptyLocalRelationWithRowCount(plan: LogicalPlan): Boolean = { + val defaultEmptyRelation: Boolean = isEmptyLocalRelation(plan) + if (checkRowCount.isDefined) { + checkRowCount.get.apply(plan, false) || defaultEmptyRelation + } else { + defaultEmptyRelation + } + } + + protected def empty(plan: LogicalPlan): LocalRelation = LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) // Construct a project list from plan's output, while the value is always NULL. private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( - _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - case p: Union if p.children.exists(isEmptyLocalRelation) => - val newChildren = p.children.filterNot(isEmptyLocalRelation) - if (newChildren.isEmpty) { - empty(p) - } else { - val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head - val outputs = newPlan.output.zip(p.output) - // the original Union may produce different output attributes than the new one so we alias - // them if needed - if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { - newPlan - } else { - val outputAliases = outputs.map { case (newAttr, oldAttr) => - val newExplicitMetadata = - if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None - Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) - } - Project(outputAliases, newPlan) - } - } + protected def propagateEmptyRelationAdvanced: PartialFunction[LogicalPlan, LogicalPlan] = { Review comment: nit: `commonApplyFunc` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org