[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625876#comment-16625876 ] Yuming Wang commented on SPARK-23985: - Thanks [~uzadude] I will deep dive it. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select * from ( >select *, row_number() over (partition by a order by b) from t1 > )z > where a>1 > {code} > it dowsn't work with: > {code:sql} > select * from ( >select *, row_number() over (partition by concat(a,'lit') order by b) from > t1 > )z > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625448#comment-16625448 ] Ohad Raviv commented on SPARK-23985: {quote}You should move where("a>'1'") before withColumn:{quote} this is exactly the issue I've opened. the Optimizer should understand this on its own. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625436#comment-16625436 ] Ohad Raviv commented on SPARK-23985: the same is true for Spark 2.4: {code} sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = sparkSession.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") sparkSession.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} plans: {code} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct == Physical Plan == *(3) Project [a#11, b#12L, id#13L, d#28] +- *(3) Filter (isnotnull(a#11) && (a#11 > 1)) +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], [_w0#29], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#29, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29] +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625432#comment-16625432 ] Yuming Wang commented on SPARK-23985: - This works: {code:scala} import org.apache.spark.sql.functions._ spark.range(10).selectExpr( "cast(id as string) a", "id as b").write.saveAsTable("t1") val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") spark.table("t1").where("a>'1'").withColumn("d", row_number() over windowSpec).explain{code} {noformat} == Physical Plan == *(3) Project [a#8, b#9L, d#13] +- Window [row_number() windowspecdefinition(_w0#19, b#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], [_w0#19], [b#9L ASC NULLS FIRST] +- *(2) Sort [_w0#19 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#19, 5) +- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#19] +- *(1) Filter (isnotnull(a#8) && (a#8 > 1)) +- *(1) FileScan parquet default.t1[a#8,b#9L] Batched: true, DataFilters: [isnotnull(a#8), (a#8 > 1)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/core/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct {noformat} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625422#comment-16625422 ] Ohad Raviv commented on SPARK-23985: you're right. that's very strange. looks like something got lost in translation. when I'm running you're example (which is actually mine..) indeed I get the right plan. However, if I try my original code it is still the un-optimized plan (with Spark 2.3): {code} import org.apache.spark.sql.functions._ spark.range(10).selectExpr( "cast(id as string) a", "id as b").write.saveAsTable("t1") val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") spark.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} {code} == Physical Plan == *(3) Project [a#8, b#9L, d#13] +- *(3) Filter (isnotnull(a#8) && (a#8 > 1)) +- Window [row_number() windowspecdefinition(_w0#14, b#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], [_w0#14], [b#9L ASC NULLS FIRST] +- *(2) Sort [_w0#14 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#14, 2) +- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#14] +- *(1) FileScan parquet unitest.t1[a#8,b#9L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[../t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} can you understand the diff? > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625305#comment-16625305 ] Yuming Wang commented on SPARK-23985: - [~uzadude] It seems already works: {code:scala} withTable("t1") { withSQLConf(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "warn") { spark.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = spark.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain() } } {code} {noformat} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 5) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, DataFilters: [isnotnull(a#11), (a#11 > 1)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/core/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct 17:58:56.582 WARN org.apache.spark.sql.DataFrameSuite: {noformat} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439114#comment-16439114 ] Ohad Raviv commented on SPARK-23985: I see in the Optimizer that filters are getting pushed only if they appear in the partitionSpec as they are. Looks like we need to add to Expression some kind of property that indicates weather we can push through it. More trivial example than Concat could bu Struct. [~cloud_fan] - I see you have dealt with this code about a year ago, could you please take a look? Ohad. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org