cloud-fan commented on a change in pull request #32602:
URL: https://github.com/apache/spark/pull/32602#discussion_r637769570



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -20,65 +20,73 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
+import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, 
TRUE_OR_FALSE_LITERAL}
 
 /**
- * Collapse plans consisting empty local relations generated by 
[[PruneFilters]].
- * 1. Binary(or Higher)-node Logical Plans
- *    - Union with all empty children.
- *    - Join with one or two empty children (including Intersect/Except).
- * 2. Unary-node Logical Plans
- *    - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
- *    - Join with false condition.
- *    - Aggregate with all empty children and at least one grouping expression.
- *    - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ *  1. Binary-node Logical Plans
+ *     - Join with one or two empty children (including Intersect/Except).
+ *     - Join is single column NULL-aware anti join (NAAJ)
+ *       Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. 
Eliminate join to an
+ *       empty [[LocalRelation]].
+ *     - Left semi Join
+ *       Right side is non-empty and condition is empty. Eliminate join to its 
left side.
+ *     - Left anti join
+ *       Right side is non-empty and condition is empty. Eliminate join to an 
empty
+ *       [[LocalRelation]].
+ *  2. Unary-node Logical Plans
+ *     - Limit/Repartition with all empty children.
+ *     - Aggregate with all empty children and at least one grouping 
expression.
+ *     - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
  */
-object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper 
with CastSupport {
-  private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
+abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with 
CastSupport {
+  /**
+   * At AQE side, we use this function to check if a plan has output rows or 
not
+   */
+  protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None
+
+  /**
+   * At AQE side, we use the broadcast query stage to do the check
+   */
+  protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = 
None
+
+  protected def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
     case p: LocalRelation => p.data.isEmpty
     case _ => false
   }
 
-  private def empty(plan: LogicalPlan) =
+  private def isEmptyLocalRelationWithRowCount(plan: LogicalPlan): Boolean = {
+    val defaultEmptyRelation: Boolean = isEmptyLocalRelation(plan)
+    if (checkRowCount.isDefined) {
+      checkRowCount.get.apply(plan, false) || defaultEmptyRelation
+    } else {
+      defaultEmptyRelation
+    }
+  }
+
+  protected def empty(plan: LogicalPlan): LocalRelation =
     LocalRelation(plan.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
 
   // Construct a project list from plan's output, while the value is always 
NULL.
   private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
     plan.output.map{ a => Alias(cast(Literal(null), a.dataType), 
a.name)(a.exprId) }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
-    _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
-    case p: Union if p.children.exists(isEmptyLocalRelation) =>
-      val newChildren = p.children.filterNot(isEmptyLocalRelation)
-      if (newChildren.isEmpty) {
-        empty(p)
-      } else {
-        val newPlan = if (newChildren.size > 1) Union(newChildren) else 
newChildren.head
-        val outputs = newPlan.output.zip(p.output)
-        // the original Union may produce different output attributes than the 
new one so we alias
-        // them if needed
-        if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == 
oldAttr.exprId }) {
-          newPlan
-        } else {
-          val outputAliases = outputs.map { case (newAttr, oldAttr) =>
-            val newExplicitMetadata =
-              if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) 
else None
-            Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = 
newExplicitMetadata)
-          }
-          Project(outputAliases, newPlan)
-        }
-      }
+  protected def propagateEmptyRelationAdvanced: PartialFunction[LogicalPlan, 
LogicalPlan] = {

Review comment:
       `single column NULL-aware anti join (NAAJ)` is AQE only, I think it's 
better to match it only in the AQE rule 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to