[ 
https://issues.apache.org/jira/browse/SPARK-21479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104708#comment-16104708
 ] 

Abhijit Bhole commented on SPARK-21479:
---------------------------------------

So here is the actual use case - 

{code:java}
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "b" : 2}, { "x" : 'c2', "a": 
3, "b" : 4}])
df2 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "c" : 5}, { "x" : 'c1', "a": 
3, "c" : 6}, { "x" : 'c2', "a": 5, "c" : 8}])

df1.join(df2, ['x', 'a'], 'right_outer').where("b = 2").explain()

df1.join(df2, ['x', 'a'], 'right_outer').where("b = 2").show()

print 

df1 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "b" : 2}, { "x" : 'c2', "a": 
3, "b" : 4}])
df2 = spark.createDataFrame([{ "x" : 'c1', "a": 1, "c" : 5}, { "x" : 'c1', "a": 
3, "c" : 6}, { "x" : 'c2', "a": 5, "c" : 8}])


df1.join(df2, ['x', 'a'], 'right_outer').where("x = 'c1'").explain()

df1.join(df2, ['x', 'a'], 'right_outer').where("x = 'c1'").show()
{code}

Output - 

{code:java}
== Physical Plan ==
*Project [x#458, a#456L, b#450L, c#457L]
+- *SortMergeJoin [x#451, a#449L], [x#458, a#456L], Inner
   :- *Sort [x#451 ASC NULLS FIRST, a#449L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#451, a#449L, 4)
   :     +- *Filter (((isnotnull(b#450L) && (b#450L = 2)) && isnotnull(x#451)) 
&& isnotnull(a#449L))
   :        +- Scan ExistingRDD[a#449L,b#450L,x#451]
   +- *Sort [x#458 ASC NULLS FIRST, a#456L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#458, a#456L, 4)
         +- *Filter (isnotnull(x#458) && isnotnull(a#456L))
            +- Scan ExistingRDD[a#456L,c#457L,x#458]
+---+---+---+---+
|  x|  a|  b|  c|
+---+---+---+---+
| c1|  1|  2|  5|
+---+---+---+---+


== Physical Plan ==
*Project [x#490, a#488L, b#482L, c#489L]
+- SortMergeJoin [x#483, a#481L], [x#490, a#488L], RightOuter
   :- *Sort [x#483 ASC NULLS FIRST, a#481L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#483, a#481L, 4)
   :     +- Scan ExistingRDD[a#481L,b#482L,x#483]
   +- *Sort [x#490 ASC NULLS FIRST, a#488L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#490, a#488L, 4)
         +- *Filter (isnotnull(x#490) && (x#490 = c1))
            +- Scan ExistingRDD[a#488L,c#489L,x#490]
+---+---+----+---+
|  x|  a|   b|  c|
+---+---+----+---+
| c1|  1|   2|  5|
| c1|  3|null|  6|
+---+---+----+---+
{code}

As you can see filter on 'x' column does not get pushed down. In our cases, 'x' 
is a company id in an multi tenant system and it is extremely important to pass 
this filter to both dataframes or else it fetches the entire data for both the 
tables.


> Outer join filter pushdown in null supplying table when condition is on one 
> of the joined columns
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21479
>                 URL: https://issues.apache.org/jira/browse/SPARK-21479
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 2.1.0, 2.1.1, 2.2.0
>            Reporter: Abhijit Bhole
>
> Here are two different query plans - 
> {code:java}
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()
> == Physical Plan ==
> *Project [a#16299L, b#16295L, c#16300L]
> +- *SortMergeJoin [a#16294L], [a#16299L], Inner
>    :- *Sort [a#16294L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(a#16294L, 4)
>    :     +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
> isnotnull(a#16294L))
>    :        +- Scan ExistingRDD[a#16294L,b#16295L]
>    +- *Sort [a#16299L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#16299L, 4)
>          +- *Filter isnotnull(a#16299L)
>             +- Scan ExistingRDD[a#16299L,c#16300L]
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()
> == Physical Plan ==
> *Project [a#16314L, b#16310L, c#16315L]
> +- SortMergeJoin [a#16309L], [a#16314L], RightOuter
>    :- *Sort [a#16309L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(a#16309L, 4)
>    :     +- Scan ExistingRDD[a#16309L,b#16310L]
>    +- *Sort [a#16314L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#16314L, 4)
>          +- *Filter (isnotnull(a#16314L) && (a#16314L = 1))
>             +- Scan ExistingRDD[a#16314L,c#16315L]
> {code}
> If condition on b can be pushed down on df1 then why not condition on a?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to