[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238281#comment-16238281
 ] 

Henry Robinson commented on SPARK-22211:
----------------------------------------

Sounds good, thanks both.

> LimitPushDown optimization for FullOuterJoin generates wrong results
> --------------------------------------------------------------------
>
>                 Key: SPARK-22211
>                 URL: https://issues.apache.org/jira/browse/SPARK-22211
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>         Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>            Reporter: Benyi Wang
>            Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/100000th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 100000).toDF("id")
> val dr = shuffle(1 to 100000).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>    :- *Sort [id#10 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#10, 200)
>    :     +- *LocalLimit 1
>    :        +- LocalTableScan [id#10]
>    +- *Sort [id#16 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(id#16, 200)
>          +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> +----+---+
> |id  |id |
> +----+---+
> |null|148|
> +----+---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to