Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205185872
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either 
Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the 
expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the 
first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF 
expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: 
LazyEvalType): Boolean = {
    +    if (!lazyEvalType.isSet) {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python 
if eval type is the same
    +        case Seq(u: PythonUDF) =>
    +          // Need to recheck the eval type because lazy eval type will be 
set if child Python UDF is
    +          // evaluable
    +          canEvaluateInPython(u, lazyEvalType) && lazyEvalType.get == 
e.evalType
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => if (!children.exists(hasScalarPythonUDF)) {
    +          // We found the first evaluable expression, set lazy eval type 
to its eval type.
    +          lazyEvalType.set(e.evalType)
    +          true
    +        } else {
    +          false
    +        }
    +      }
    +    } else {
    +      if (e.evalType != lazyEvalType.get) {
    +        false
    +      } else {
    +        e.children match {
    +          case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType)
    --- End diff --
    
    There are 2 paths for recursion here, which is probably not a good idea.  
This method is much more complicated now and a little difficult to follow.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to