Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205448677
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either 
Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the 
expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the 
first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF 
expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: 
LazyEvalType): Boolean = {
    --- End diff --
    
    I applied you new code but the test I mentioned above still fails.
    
    I think the issue could be when visiting `f2(f1(col('v')))`, firstEvalType 
is set to Scalar Pandas first and isn't set to Batched SQL later so f1 is not 
extracted. It's possible that my code is still different than yours somehow.
    
    But similar to 
https://github.com/apache/spark/pull/21650#issuecomment-407951457, I think the 
state machine of the eval type holder object here is fairly complicated (i.e., 
what is the expected state of the eval type holder and what's the invariance of 
the algo) with your suggested implementation and I found myself think pretty 
hard to prove the state machine is correct in all cases. If we want to go with 
this implementation, we need to carefully think about it and explain it in 
code...
    
    The lazyEvalType implementation is better IMHO because the state machine is 
simpler - lazyEvalType is empty until we find the first evaluable UDF and the 
value doesn't change after we find the first UDF.
    
    The first implementation (two pass, immutable state) is probably the 
simplest in terms of the mental complexity of the algo but is less efficient. 
    
    I think I am ok with both immutable state or the lazy state. I think 
@HyukjinKwon prefers the immutable state one. @BryanCutler WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to