[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinaz Belhaj updated SPARK-27213:
---------------------------------
    Summary: Unexpected results when filter is used after distinct  (was: 
Unexpected results due when filter is used after distinct)

> Unexpected results when filter is used after distinct
> -----------------------------------------------------
>
>                 Key: SPARK-27213
>                 URL: https://issues.apache.org/jira/browse/SPARK-27213
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.2, 2.4.0
>            Reporter: Rinaz Belhaj
>            Priority: Minor
>              Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
>  
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
>  
>  
> {panel:title=Output}
>  
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
> |x|y|z|
> |a|123|12.4|
>  
>  
> {panel}
>  
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
>  
> {{df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}
> {{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}
> {{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}
>  
>  
>  
> {panel:title=Output}
>  
> == Parsed Logical Plan ==
> Deduplicate [x#74, y#75, z#76]
> +- AnalysisBarrier
> +- Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Optimized Logical Plan ==
> Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Physical Plan ==
> *(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, 
> z#76])
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, 
> y#75, z#76])
> +- *(1) Project [x#74, y#75, z#76]
> +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
>  
> -------------------------------------------------------------------------------------------------------------------
>  
>  
> == Parsed Logical Plan ==
> 'Filter ('y_n = y)
> +- AnalysisBarrier
> +- Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Project [x#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- Deduplicate [x#74, y#75, z#76]
> +- Project [x#74, y#75, z#76, y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Optimized Logical Plan ==
> Project [x#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77], false
>  
> == Physical Plan ==
> *(3) Project [x#74, y#75, z#76]
> +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], 
> output=[x#74, y#75, z#76, y_n#77])
> +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST], false, 0
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, 
> false)], output=[x#74, y#75, z#76, first#95, valueSet#96])
> +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST], false, 0
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
>  
> {panel}
>  
>  
> The second query. ie 
> *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
> result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to