maropu commented on a change in pull request #25717: [WIP][SPARK-29013][SQL] Structurally equivalent subexpression elimination URL: https://github.com/apache/spark/pull/25717#discussion_r321996616
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala ########## @@ -65,35 +89,102 @@ class EquivalentExpressions { } /** - * Adds the expression to this data structure recursively. Stops if a matching expression - * is found. That is, if `expr` has already been added, its children are not added. + * Adds each expression to structural expression data structure, grouping them with existing + * structurally equivalent expressions. Non-recursive. + */ + def addStructExpr(ctx: CodegenContext, expr: Expression): Unit = { + if (expr.deterministic) { + val refs = expr.collect { + case b: BoundReference => b + } + + // For structural equivalent expressions, we need to pass in int type ordinals into + // split functions. If the number of ordinals is more than JVM function limit, we skip + // this expression. + // We calculate function parameter length by the number of ints plus `INPUT_ROW` plus + // a int type result array index. + val parameterLength = CodeGenerator.calculateParamLength(refs.map(_ => Literal(0))) + 2 + if (CodeGenerator.isValidParamLength(parameterLength)) { + val parameterizedExpr = parameterizedBoundReferences(ctx, expr) + + val e: StructuralExpr = StructuralExpr(parameterizedExpr) + val f = structEquivalenceMap.get(e) + if (f.isDefined) { + addExpr(expr, f.get) + } else { + val exprMap = mutable.HashMap.empty[Expr, mutable.ArrayBuffer[Expression]] + addExpr(expr, exprMap) + structEquivalenceMap.put(e, exprMap) + } + } + } + } + + /** + * Replaces bound references in given expression by parameterized bound references. */ - def addExprTree(expr: Expression): Unit = { - val skip = expr.isInstanceOf[LeafExpression] || + private def parameterizedBoundReferences(ctx: CodegenContext, expr: Expression): Expression = { + expr.transformUp { + case b: BoundReference => + val param = ctx.freshName("boundInput") + ParameterizedBoundReference(param, b.dataType, b.nullable) + } + } + + /** + * Checks if we skip add sub-expressions for given expression. + */ + private def skipExpr(expr: Expression): Boolean = { + expr.isInstanceOf[LeafExpression] || // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. expr.find(_.isInstanceOf[LambdaVariable]).isDefined + } + + + // There are some special expressions that we should not recurse into all of its children. + // 1. CodegenFallback: it's children will not be used to generate code (call eval() instead) + // 2. If: common subexpressions will always be evaluated at the beginning, but the true and + // false expressions in `If` may not get accessed, according to the predicate + // expression. We should only recurse into the predicate expression. + // 3. CaseWhen: like `If`, the children of `CaseWhen` only get accessed in a certain + // condition. We should only recurse into the first condition expression as it + // will always get accessed. + // 4. Coalesce: it's also a conditional expression, we should only recurse into the first + // children, because others may not get accessed. + private def childrenToRecurse(expr: Expression): Seq[Expression] = expr match { + case _: CodegenFallback => Nil + case i: If => i.predicate :: Nil + case c: CaseWhen => c.children.head :: Nil + case c: Coalesce => c.children.head :: Nil + case s: SortPrefix => s.child.child :: Nil + case other => other.children + } + + /** + * Adds the expression to this data structure recursively. Stops if a matching expression + * is found. That is, if `expr` has already been added, its children are not added. + */ + def addExprTree( + expr: Expression, + exprMap: EquivalenceMap = this.equivalenceMap): Unit = { + val skip = skipExpr(expr) - // There are some special expressions that we should not recurse into all of its children. - // 1. CodegenFallback: it's children will not be used to generate code (call eval() instead) - // 2. If: common subexpressions will always be evaluated at the beginning, but the true and - // false expressions in `If` may not get accessed, according to the predicate - // expression. We should only recurse into the predicate expression. - // 3. CaseWhen: like `If`, the children of `CaseWhen` only get accessed in a certain - // condition. We should only recurse into the first condition expression as it - // will always get accessed. - // 4. Coalesce: it's also a conditional expression, we should only recurse into the first - // children, because others may not get accessed. - def childrenToRecurse: Seq[Expression] = expr match { - case _: CodegenFallback => Nil - case i: If => i.predicate :: Nil - case c: CaseWhen => c.children.head :: Nil - case c: Coalesce => c.children.head :: Nil - case other => other.children + if (!skip && !addExpr(expr, exprMap)) { + childrenToRecurse(expr).foreach(addExprTree(_, exprMap)) } + } + + /** + * Adds the expression to structural data structure recursively. Stops if a matching expression + * is found. + */ + def addStructuralExprTree(ctx: CodegenContext, expr: Expression): Unit = { + val skip = skipExpr(expr) - if (!skip && !addExpr(expr)) { - childrenToRecurse.foreach(addExprTree) + if (!skip) { Review comment: We cannot do it like `!skip && !addStructExpr(expr, exprMap)` in the same way with `addExprTree`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org