[ 
https://issues.apache.org/jira/browse/SPARK-26147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700274#comment-16700274
 ] 

Apache Spark commented on SPARK-26147:
--------------------------------------

User 'cloud-fan' has created a pull request for this issue:
https://github.com/apache/spark/pull/23153

> Python UDFs in join condition fail even when using columns from only one side 
> of join
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-26147
>                 URL: https://issues.apache.org/jira/browse/SPARK-26147
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Ala Luszczak
>            Priority: Major
>
> The rule {{PullOutPythonUDFInJoinCondition}} was implemented in 
> [https://github.com/apache/spark/commit/2a8cbfddba2a59d144b32910c68c22d0199093fe]
>  As far as I understand, this rule was intended to prevent the use of Python 
> UDFs in join condition if they take arguments from both sides of the join, 
> and this doesn't make sense in combination with the join type.
> The rule {{PullOutPythonUDFInJoinCondition}} seems to make an assumption, 
> that if a given UDF is only using columns from a single side of the join, it 
> will be already pushed down under the join before this rule is executed.
> However, this is not always the case. Here's a simple example that fails, 
> even though it looks like it should run just fine (and it does in earlier 
> versions of Spark):
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.types import StringType
> from pyspark.sql.functions import udf
> cars_list = [ Row("NL", "1234AB"), Row("UK", "987654") ]
> insurance_list = [ Row("NL-1234AB"), Row("BE-112233") ]
> spark.createDataFrame(data = cars_list, schema = ["country", 
> "plate_nr"]).createOrReplaceTempView("cars")
> spark.createDataFrame(data = insurance_list, schema = 
> ["insurance_code"]).createOrReplaceTempView("insurance")
> to_insurance_code = udf(lambda x, y: x + "-" + y, StringType())       
> sqlContext.udf.register('to_insurance_code', to_insurance_code)
> spark.conf.set("spark.sql.crossJoin.enabled", "true")
> # This query runs just fine.
> sql("""
>   SELECT country, plate_nr, insurance_code
>   FROM cars LEFT OUTER JOIN insurance
>   ON CONCAT(country, '-', plate_nr) = insurance_code
> """).show()
> # This equivalent query fails with:
> # pyspark.sql.utils.AnalysisException: u'Using PythonUDF in join condition of 
> join type LeftOuter is not supported.;'
> sql("""
>   SELECT country, plate_nr, insurance_code
>   FROM cars LEFT OUTER JOIN insurance
>   ON to_insurance_code(country, plate_nr) = insurance_code
> """).show()
> {code}
> [~cloud_fan] [~XuanYuan] fyi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to