[ https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abhijit Bhole updated SPARK-21034: ---------------------------------- Comment: was deleted (was: In FilterPushdownSuite.scala, it seems this should have been handled? Am I making some mistake in understanding? {code:java} test("nondeterministic: push down part of filter through aggregate with deterministic field") { val originalQuery = testRelation .groupBy('a)('a) .where('a > 5 && Rand(10) > 5) .analyze val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = testRelation .where('a > 5) .groupBy('a)('a) .where(Rand(10) > 5) .analyze comparePlans(optimized, correctAnswer) } {code} ) > Allow filter pushdown filters through non deterministic functions for columns > involved in groupby / join > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-21034 > URL: https://issues.apache.org/jira/browse/SPARK-21034 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 2.1.1, 2.2.0 > Reporter: Abhijit Bhole > > If the column is involved in aggregation / join then pushing down filter > should not change the results. > Here is a sample code - > {code:java} > from pyspark.sql import functions as F > df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" > : 8}, > { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, > "c":7} ]) > df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() > df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) > +- Exchange hashpartitioning(a#15L, 4) > +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) > +- *Project [a#15L, b#16L] > +- *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > >>> > >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) > +- Exchange hashpartitioning(a#15L, 4) > +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), > partial_first(c#17L, false)]) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > {code} > As you can see, the filter is not pushed down when F.first aggregate function > is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org