[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-27213: --------------------------------- Target Version/s: (was: 3.0.0) > Unexpected results when filter is used after distinct > ----------------------------------------------------- > > Key: SPARK-27213 > URL: https://issues.apache.org/jira/browse/SPARK-27213 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.2, 2.4.0 > Reporter: Rinaz Belhaj > Priority: Major > Labels: distinct, filter > > The following code gives unexpected output due to the filter not getting > pushed down in catalyst optimizer. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.show(5) > df.filter("y_n='y'").select('x','y','z').distinct().show() > df.select('x','y','z').distinct().filter("y_n='y'").show() > {code} > {panel:title=Output} > |x|y|z|y_n| > |a|123|12.3|n| > |a|123|12.3|y| > |a|123|12.4|y| > > |x|y|z| > |a|123|12.3| > |a|123|12.4| > > |x|y|z| > |a|123|12.4| > {panel} > Ideally, the second statement should result in an error since the column used > in the filter is not present in the preceding select statement. But the > catalyst optimizer is using first() on column y_n and then applying the > filter. > Even if the filter was pushed down, the result would have been accurate. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.filter("y_n='y'").select('x','y','z').distinct().explain(True) > df.select('x','y','z').distinct().filter("y_n='y'").explain(True) > {code} > {panel:title=Output} > > == Parsed Logical Plan == > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- AnalysisBarrier > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, > z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] > > > ------------------------------------------------------------------------------------------------------------------- > > > == Parsed Logical Plan == > 'Filter ('y_n = y) > +- AnalysisBarrier > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, > first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS > y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, > y_n#77|#74, y#75, z#76, y_n#77]) > +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, > z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) > +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], > false, 0 > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] > > {panel} > The second query. ie > *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should > result in error rather than giving wrong output. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org