maropu commented on a change in pull request #25717: [WIP][SPARK-29013][SQL] 
Structurally equivalent subexpression elimination
URL: https://github.com/apache/spark/pull/25717#discussion_r321996616
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 ##########
 @@ -65,35 +89,102 @@ class EquivalentExpressions {
   }
 
   /**
-   * Adds the expression to this data structure recursively. Stops if a 
matching expression
-   * is found. That is, if `expr` has already been added, its children are not 
added.
+   * Adds each expression to structural expression data structure, grouping 
them with existing
+   * structurally equivalent expressions. Non-recursive.
+   */
+  def addStructExpr(ctx: CodegenContext, expr: Expression): Unit = {
+    if (expr.deterministic) {
+      val refs = expr.collect {
+        case b: BoundReference => b
+      }
+
+      // For structural equivalent expressions, we need to pass in int type 
ordinals into
+      // split functions. If the number of ordinals is more than JVM function 
limit, we skip
+      // this expression.
+      // We calculate function parameter length by the number of ints plus 
`INPUT_ROW` plus
+      // a int type result array index.
+      val parameterLength = CodeGenerator.calculateParamLength(refs.map(_ => 
Literal(0))) + 2
+      if (CodeGenerator.isValidParamLength(parameterLength)) {
+        val parameterizedExpr = parameterizedBoundReferences(ctx, expr)
+
+        val e: StructuralExpr = StructuralExpr(parameterizedExpr)
+        val f = structEquivalenceMap.get(e)
+        if (f.isDefined) {
+          addExpr(expr, f.get)
+        } else {
+          val exprMap = mutable.HashMap.empty[Expr, 
mutable.ArrayBuffer[Expression]]
+          addExpr(expr, exprMap)
+          structEquivalenceMap.put(e, exprMap)
+        }
+      }
+    }
+  }
+
+  /**
+   * Replaces bound references in given expression by parameterized bound 
references.
    */
-  def addExprTree(expr: Expression): Unit = {
-    val skip = expr.isInstanceOf[LeafExpression] ||
+  private def parameterizedBoundReferences(ctx: CodegenContext, expr: 
Expression): Expression = {
+    expr.transformUp {
+      case b: BoundReference =>
+        val param = ctx.freshName("boundInput")
+        ParameterizedBoundReference(param, b.dataType, b.nullable)
+    }
+  }
+
+  /**
+   * Checks if we skip add sub-expressions for given expression.
+   */
+  private def skipExpr(expr: Expression): Boolean = {
+    expr.isInstanceOf[LeafExpression] ||
       // `LambdaVariable` is usually used as a loop variable, which can't be 
evaluated ahead of the
       // loop. So we can't evaluate sub-expressions containing 
`LambdaVariable` at the beginning.
       expr.find(_.isInstanceOf[LambdaVariable]).isDefined
+  }
+
+
+  // There are some special expressions that we should not recurse into all of 
its children.
+  //   1. CodegenFallback: it's children will not be used to generate code 
(call eval() instead)
+  //   2. If: common subexpressions will always be evaluated at the beginning, 
but the true and
+  //          false expressions in `If` may not get accessed, according to the 
predicate
+  //          expression. We should only recurse into the predicate expression.
+  //   3. CaseWhen: like `If`, the children of `CaseWhen` only get accessed in 
a certain
+  //                condition. We should only recurse into the first condition 
expression as it
+  //                will always get accessed.
+  //   4. Coalesce: it's also a conditional expression, we should only recurse 
into the first
+  //                children, because others may not get accessed.
+  private def childrenToRecurse(expr: Expression): Seq[Expression] = expr 
match {
+    case _: CodegenFallback => Nil
+    case i: If => i.predicate :: Nil
+    case c: CaseWhen => c.children.head :: Nil
+    case c: Coalesce => c.children.head :: Nil
+    case s: SortPrefix => s.child.child :: Nil
+    case other => other.children
+  }
+
+  /**
+   * Adds the expression to this data structure recursively. Stops if a 
matching expression
+   * is found. That is, if `expr` has already been added, its children are not 
added.
+   */
+  def addExprTree(
+      expr: Expression,
+      exprMap: EquivalenceMap = this.equivalenceMap): Unit = {
+    val skip = skipExpr(expr)
 
-    // There are some special expressions that we should not recurse into all 
of its children.
-    //   1. CodegenFallback: it's children will not be used to generate code 
(call eval() instead)
-    //   2. If: common subexpressions will always be evaluated at the 
beginning, but the true and
-    //          false expressions in `If` may not get accessed, according to 
the predicate
-    //          expression. We should only recurse into the predicate 
expression.
-    //   3. CaseWhen: like `If`, the children of `CaseWhen` only get accessed 
in a certain
-    //                condition. We should only recurse into the first 
condition expression as it
-    //                will always get accessed.
-    //   4. Coalesce: it's also a conditional expression, we should only 
recurse into the first
-    //                children, because others may not get accessed.
-    def childrenToRecurse: Seq[Expression] = expr match {
-      case _: CodegenFallback => Nil
-      case i: If => i.predicate :: Nil
-      case c: CaseWhen => c.children.head :: Nil
-      case c: Coalesce => c.children.head :: Nil
-      case other => other.children
+    if (!skip && !addExpr(expr, exprMap)) {
+      childrenToRecurse(expr).foreach(addExprTree(_, exprMap))
     }
+  }
+
+  /**
+   * Adds the expression to structural data structure recursively.  Stops if a 
matching expression
+   * is found.
+   */
+  def addStructuralExprTree(ctx: CodegenContext, expr: Expression): Unit = {
+    val skip = skipExpr(expr)
 
-    if (!skip && !addExpr(expr)) {
-      childrenToRecurse.foreach(addExprTree)
+    if (!skip) {
 
 Review comment:
   We cannot do it like `!skip && !addStructExpr(expr, exprMap)` in the same 
way with `addExprTree`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to