dilipbiswal opened a new pull request #23769: [SPARK-26864][SQL] Query may 
return incorrect result when python udf is used as a join condition and the udf 
uses attributes from both legs of left semi join.
URL: https://github.com/apache/spark/pull/23769
 
 
   ## What changes were proposed in this pull request?
   In SPARK-25314, we supported the scenario of having a python UDF that refers 
to attributes from both legs of a join condition by rewriting the plan to 
convert an inner join or left semi join to a filter over a cross join. In case 
of left semi join, this transformation may cause incorrect results when the 
right leg of join condition produces duplicate rows based on the join 
condition. This fix disallows the rewrite for left semi join and raises an 
error in the case like we do for other types of join. In future, we should have 
separate rule in optimizer to convert left semi join to inner join (I am aware 
of one case we could do it if we leverage informational constraint i.e when we 
know the right side does not produce duplicates).
   
   Python
   ```SQL
   >>> from pyspark import SparkContext
   >>> from pyspark.sql import SparkSession, Column, Row
   >>> from pyspark.sql.functions import UserDefinedFunction, udf
   >>> from pyspark.sql.types import *
   >>> from pyspark.sql.utils import AnalysisException
   >>>
   >>> spark.conf.set("spark.sql.crossJoin.enabled", "True")
   >>> left = spark.createDataFrame([Row(lc1=1, lc2=1), Row(lc1=2, lc2=2)])
   >>> right = spark.createDataFrame([Row(rc1=1, rc2=1), Row(rc1=1, rc2=2)])
   >>> func = udf(lambda a, b: a == b, BooleanType())
   >>> df = left.join(right, func("lc1", "rc1"), "leftsemi").show()
   19/02/12 16:07:10 WARN PullOutPythonUDFInJoinCondition: The join 
condition:<lambda>(lc1#0L, rc1#4L) of the join plan contains PythonUDF only, it 
will be moved out and the join plan will be turned to cross join.
   +---+---+
   |lc1|lc2|
   +---+---+
   |  1|  1|
   |  1|  1|
   +---+---+
   ```
   
   Scala
   ====
   ```SQL
   scala> val left = Seq((1, 1), (2, 2)).toDF("lc1", "lc2")
   left: org.apache.spark.sql.DataFrame = [lc1: int, lc2: int]
   
   scala> val right = Seq((1, 1), (1, 1)).toDF("rc1", "rc2")
   right: org.apache.spark.sql.DataFrame = [rc1: int, rc2: int]
   
   scala> val equal = udf((p1: Integer, p2: Integer) => {
        |   p1 == p2
        | })
   equal: org.apache.spark.sql.expressions.UserDefinedFunction = 
SparkUserDefinedFunction($Lambda$2141/1101629239@4666f1b5,BooleanType,List(Some(Schema(IntegerType,true)),
 Some(Schema(IntegerType,true))),None,false,true)
   
   scala> val df = left.join(right, equal(col("lc1"), col("rc1")), "leftsemi")
   df: org.apache.spark.sql.DataFrame = [lc1: int, lc2: int]
   
   scala> df.show()
   +---+---+
   |lc1|lc2|
   +---+---+
   |  1|  1|
   +---+---+
   
   ```
   
   ## How was this patch tested?
   Modified existing tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to