[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rinaz Belhaj updated SPARK-27213: --------------------------------- Description: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.filter("y_n='y'").select('x','y','z').distinct().explain(True) df.select('x','y','z').distinct().filter("y_n='y'").explain(True) {code} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] ------------------------------------------------------------------------------------------------------------------- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. was: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.filter("y_n='y'").select('x','y','z').distinct().explain(True) df.select('x','y','z').distinct().filter("y_n='y'").explain(True) {code} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] ------------------------------------------------------------------------------------------------------------------- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. > Unexpected results when filter is used after distinct > ----------------------------------------------------- > > Key: SPARK-27213 > URL: https://issues.apache.org/jira/browse/SPARK-27213 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.2, 2.4.0 > Reporter: Rinaz Belhaj > Priority: Minor > Labels: distinct, filter > > The following code gives unexpected output due to the filter not getting > pushed down in catalyst optimizer. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.show(5) > df.filter("y_n='y'").select('x','y','z').distinct().show() > df.select('x','y','z').distinct().filter("y_n='y'").show() > {code} > {panel:title=Output} > |x|y|z|y_n| > |a|123|12.3|n| > |a|123|12.3|y| > |a|123|12.4|y| > > |x|y|z| > |a|123|12.3| > |a|123|12.4| > > |x|y|z| > |a|123|12.4| > {panel} > Ideally, the second statement should result in an error since the column used > in the filter is not present in the preceding select statement. But the > catalyst optimizer is using first() on column y_n and then applying the > filter. > Even if the filter was pushed down, the result would have been accurate. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.filter("y_n='y'").select('x','y','z').distinct().explain(True) > df.select('x','y','z').distinct().filter("y_n='y'").explain(True) > {code} > {panel:title=Output} > > == Parsed Logical Plan == > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- AnalysisBarrier > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, > z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] > > > ------------------------------------------------------------------------------------------------------------------- > > > == Parsed Logical Plan == > 'Filter ('y_n = y) > +- AnalysisBarrier > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, > first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS > y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, > y_n#77|#74, y#75, z#76, y_n#77]) > +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, > z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) > +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], > false, 0 > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] > > {panel} > The second query. ie > *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should > result in error rather than giving wrong output. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org