[jira] [Commented] (SPARK-19335) Spark should support doing an efficient DataFrame Upsert via JDBC

2019-12-10 Thread Rinaz Belhaj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-19335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993231#comment-16993231
 ] 

Rinaz Belhaj commented on SPARK-19335:
--

+1 This feature would be very useful. Any updates on this ?

> Spark should support doing an efficient DataFrame Upsert via JDBC
> -
>
> Key: SPARK-19335
> URL: https://issues.apache.org/jira/browse/SPARK-19335
> Project: Spark
>  Issue Type: Improvement
>Reporter: Ilya Ganelin
>Priority: Minor
>
> Doing a database update, as opposed to an insert is useful, particularly when 
> working with streaming applications which may require revisions to previously 
> stored data. 
> Spark DataFrames/DataSets do not currently support an Update feature via the 
> JDBC Writer allowing only Overwrite or Append.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-04-06 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Shepherd: Holden Karau

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Major
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, 

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Priority: Major  (was: Minor)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Major
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|#74, y#75, z#76, 

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
{code}
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
   
{panel}
The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
{code}
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
{panel}
The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
{code}
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
   
{panel}
The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Description: 
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

 
|x|y|z|
|a|123|12.3|
|a|123|12.4|

 
|x|y|z|
|a|123|12.4|
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

{{df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}

{{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}

{{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}

 

 
  
{panel:title=Output}
 
 == Parsed Logical Plan ==
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- AnalysisBarrier
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
output=[x#74, y#75, z#76|#74, y#75, z#76])
 +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
 
---
 
  
 == Parsed Logical Plan ==
 'Filter ('y_n = y)
 +- AnalysisBarrier
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Analyzed Logical Plan ==
 x: string, y: string, z: string
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (y_n#77 = y)
 +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
 +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Optimized Logical Plan ==
 Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77]
 +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
  
 == Physical Plan ==
 *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
 +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
y_n#77|#74, y#75, z#76, y_n#77])
 +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
 +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
 +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0
 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
  
{panel}
 
  

The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.

  was:
The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])

[jira] [Created] (SPARK-27213) Unexpected results due when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)
Rinaz Belhaj created SPARK-27213:


 Summary: Unexpected results due when filter is used after distinct
 Key: SPARK-27213
 URL: https://issues.apache.org/jira/browse/SPARK-27213
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.0, 2.3.2
Reporter: Rinaz Belhaj


The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}


 
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

|x|y|z|
|a|123|12.3|
|a|123|12.4|

|x|y|z|
|a|123|12.4|

 

 
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

{{df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}

{{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}

{{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}

 

 
 
{panel:title=Output}
 
== Parsed Logical Plan ==
Deduplicate [x#74, y#75, z#76]
+- AnalysisBarrier
+- Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Analyzed Logical Plan ==
x: string, y: string, z: string
Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Optimized Logical Plan ==
Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Physical Plan ==
*(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, 
z#76])
+- Exchange hashpartitioning(x#74, y#75, z#76, 10)
+- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, 
y#75, z#76])
+- *(1) Project [x#74, y#75, z#76]
+- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
 
---
 
 
== Parsed Logical Plan ==
'Filter ('y_n = y)
+- AnalysisBarrier
+- Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Analyzed Logical Plan ==
x: string, y: string, z: string
Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76, y_n#77]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Optimized Logical Plan ==
Project [x#74, y#75, z#76]
+- Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS 
y_n#77]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Physical Plan ==
*(3) Project [x#74, y#75, z#76]
+- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], 
output=[x#74, y#75, z#76, y_n#77])
+- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST], false, 0
+- Exchange hashpartitioning(x#74, y#75, z#76, 10)
+- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, 
false)], output=[x#74, y#75, z#76, first#95, valueSet#96])
+- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST], false, 0
+- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
 
{panel}
 
 

The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2019-03-20 Thread Rinaz Belhaj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
-
Summary: Unexpected results when filter is used after distinct  (was: 
Unexpected results due when filter is used after distinct)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Minor
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
>  
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
>  
>  
> {panel:title=Output}
>  
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
> |x|y|z|
> |a|123|12.4|
>  
>  
> {panel}
>  
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
>  
> {{df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}
> {{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}
> {{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}
>  
>  
>  
> {panel:title=Output}
>  
> == Parsed Logical Plan ==
> Deduplicate [x#74, y#75, z#76]
> +- AnalysisBarrier
> +- Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Optimized Logical Plan ==
> Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Physical Plan ==
> *(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, 
> z#76])
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, 
> y#75, z#76])
> +- *(1) Project [x#74, y#75, z#76]
> +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
>  
> ---
>  
>  
> == Parsed Logical Plan ==
> 'Filter ('y_n = y)
> +- AnalysisBarrier
> +- Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76, y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Optimized Logical Plan ==
> Project [x#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Physical Plan ==
> *(3) Project [x#74, y#75, z#76]
> +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], 
> output=[x#74, y#75, z#76, y_n#77])
> +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST], false, 0
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, 
> false)], output=[x#74, y#75, z#76, first#95, valueSet#96])
> +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST], false, 0
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
>  
> {panel}
>  
>  
> The second query. ie 
> *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
> result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org