[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-27213:
-------------------------------
    Labels: correctness distinct filter  (was: distinct filter)

> Unexpected results when filter is used after distinct
> -----------------------------------------------------
>
>                 Key: SPARK-27213
>                 URL: https://issues.apache.org/jira/browse/SPARK-27213
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.2, 2.4.0
>            Reporter: Rinaz Belhaj
>            Priority: Major
>              Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> -------------------------------------------------------------------------------------------------------------------
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
>  +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>    
> {panel}
> The second query. ie 
> *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
> result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to