Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    I gave it a shot to extract the UDFs in one traversal, using the first 
occurrence of either pandas or batch udf.  I think it's much clearer
    
    ```scala
    object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
    
      private class FirstEvalType() {
        var evalType = -1
        def isEvalTypeSet(): Boolean = evalType >= 0
      }
    
      private def canEvaluateInPython(e: PythonUDF, firstEvalType: 
FirstEvalType): Boolean = {
        if (firstEvalType.isEvalTypeSet() && e.evalType != 
firstEvalType.evalType) {
          false
        } else {
          firstEvalType.evalType = e.evalType
          e.children match {
            // single PythonUDF child could be chained and evaluated in Python
            case Seq(u: PythonUDF) => canEvaluateInPython(u, firstEvalType)
            // Python UDF can't be evaluated directly in JVM
            case children => !children.exists(hasScalarPythonUDF)
          }
        }
      }
    
      private def collectEvaluableUDFs(expr: Expression, firstEvalType: 
FirstEvalType): Seq[PythonUDF] = expr match {
        case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf, firstEvalType) =>
          Seq(udf)
        case e => e.children.flatMap(collectEvaluableUDFs(_, firstEvalType))
      }
    
      private def extract(plan: SparkPlan): SparkPlan = {
        val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, new 
FirstEvalType))
        ...
    ```
    
    This does pass around a mutable object, but I guess you could do about the 
same using an Option that gets returned, but that might not look as nice.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to