HyukjinKwon commented on code in PR #50867:
URL: https://github.com/apache/spark/pull/50867#discussion_r2125394487
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala:
##########
@@ -169,24 +169,64 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with
Logging {
e.exists(PythonUDF.isScalarPythonUDF)
}
+ /**
+ * Return true if we should extract the current expression, including all of
its current
+ * children (including UDF expression, and all others), to a logical node.
+ * The children of the expression can be udf expressions, this would be
`chaining`.
+ * If child udf expressions were already extracted before, then this will
just extract
+ * the current udf expression, so they will end up in separate logical
nodes. The child
+ * expressions will have been transformed to Attribute expressions
referencing the child plan
+ * node's output.
+ *
+ * Return false if there is no single continuous chain of UDFs that can be
extracted:
+ * - if there are other expression in-between, e.g., foo(1 + bar(baz())),
return false. The
+ * caller will have to extract bar(baz()) separately first.
+ * - if the eval types of the udf expressions in the chain differ, return
false.
+ * - if a UDF has more than one child, e.g. foo(bar(), baz()), return false
+ * If we return false here, the expectation is that the recursive calls of
+ * collectEvaluableUDFsFromExpressions will then visit the children and
extract them first to
+ * separate nodes.
+ */
@scala.annotation.tailrec
- private def canEvaluateInPython(e: PythonUDF): Boolean = {
+ private def shouldExtractUDFExpressionTree(e: PythonUDF): Boolean = {
e.children match {
- // single PythonUDF child could be chained and evaluated in Python
- case Seq(u: PythonUDF) => correctEvalType(e) == correctEvalType(u) &&
canEvaluateInPython(u)
+ case Seq(child: PythonUDF) => correctEvalType(e) ==
correctEvalType(child) &&
+ shouldExtractUDFExpressionTree(child)
// Python UDF can't be evaluated directly in JVM
case children => !children.exists(hasScalarPythonUDF)
}
}
+ /**
+ * We use the following terminology:
+ * - fusing is the act of combining multiple UDFs into a single logical
node. This can be
+ * accomplished in different cases:
Review Comment:
I would say both belows are examples unless we're absolutely sure that both
are the only cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]