Ala Luszczak created SPARK-26147:
------------------------------------

             Summary: Python UDFs in join condition fail even when using 
columns from only one side of join
                 Key: SPARK-26147
                 URL: https://issues.apache.org/jira/browse/SPARK-26147
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.0
            Reporter: Ala Luszczak


The rule {{PullOutPythonUDFInJoinCondition}} was implemented in 
[https://github.com/apache/spark/commit/2a8cbfddba2a59d144b32910c68c22d0199093fe]
 As far as I understand, this rule was intended to prevent the use of Python 
UDFs in join condition if they take arguments from both sides of the join, and 
this doesn't make sense in combination with the join type.

The rule {{PullOutPythonUDFInJoinCondition}} seems to make an assumption, that 
if a given UDF is only using columns from a single side of the join, it will be 
already pushed down under the join before this rule is executed.

However, this is not always the case. Here's a simple example that fails, even 
though it looks like it should run just fine (and it does in earlier versions 
of Spark):
{code:java}
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

cars_list = [ Row("NL", "1234AB"), Row("UK", "987654") ]
insurance_list = [ Row("NL-1234AB"), Row("BE-112233") ]

spark.createDataFrame(data = cars_list, schema = ["country", 
"plate_nr"]).createOrReplaceTempView("cars")
spark.createDataFrame(data = insurance_list, schema = 
["insurance_code"]).createOrReplaceTempView("insurance")

to_insurance_code = udf(lambda x, y: x + "-" + y, StringType()) 
sqlContext.udf.register('to_insurance_code', to_insurance_code)

spark.conf.set("spark.sql.crossJoin.enabled", "true")

# This query runs just fine.
sql("""
  SELECT country, plate_nr, insurance_code
  FROM cars LEFT OUTER JOIN insurance
  ON CONCAT(country, '-', plate_nr) = insurance_code
""").show()

# This equivalent query fails with:
# pyspark.sql.utils.AnalysisException: u'Using PythonUDF in join condition of 
join type LeftOuter is not supported.;'
sql("""
  SELECT country, plate_nr, insurance_code
  FROM cars LEFT OUTER JOIN insurance
  ON to_insurance_code(country, plate_nr) = insurance_code
""").show()
{code}
[~cloud_fan] [~XuanYuan] fyi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to