GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/20360

    [SPARK-23177][SQL][PySpark] Extract zero-parameter UDFs from aggregate

    ## What changes were proposed in this pull request?
    
    We extract Python UDFs in logical aggregate which depends on aggregate 
expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python 
UDFs which don't depend on above expressions should also be extracted to avoid 
the issue reported in the JIRA.
    
    A small code snippet to reproduce that issue looks like:
    ```python
    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([(1,2), (3,4)])
    f_udf = f.udf(lambda: str("const_str"))
    df2 = df.distinct().withColumn("a", f_udf())
    df2.show()
    ```
    
    Error exception is raised as:
    ```
    : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#50
            at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
            at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
            at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
            at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
            at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
            at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
            at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
    ```
    
    This exception raises because `HashAggregateExec` tries to bind the aliased 
Python UDF expression (e.g., `pythonUDF0#50 AS a#44`) to grouping key.
    
    ## How was this patch tested?
    
    Added test.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 SPARK-23177

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20360
    
----
commit b6cb6218e539589f37ff8648dff068bef6e810e5
Author: Liang-Chi Hsieh <viirya@...>
Date:   2018-01-23T05:56:45Z

    Extract parameter-less UDFs from aggregate.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to